Spaces:
Running
on
T4
Running
on
T4
import asyncio | |
from base_chunker import BaseChunker | |
from typing import List | |
import numpy as np | |
from functools import partial | |
from dense_embed import embed_text | |
from recursive_token_chunker import RecursiveTokenChunker | |
from infinity_emb import AsyncEmbeddingEngine | |
def bert_token_count(tokenizer, text: str)-> int: | |
"""Returns the number of tokens in a text string.""" | |
return len(tokenizer.convert_tokens_to_ids(tokenizer.tokenize(text, add_special_tokens=True))) | |
class ClusterSemanticChunker(BaseChunker): | |
def __init__(self, embed_model: AsyncEmbeddingEngine =None, max_chunk_size=400, min_chunk_size=50, length_function=bert_token_count): | |
self.splitter = RecursiveTokenChunker( | |
chunk_size=min_chunk_size, | |
chunk_overlap=0, | |
length_function=partial(length_function, embed_model._model.tokenizer), | |
separators = ["\n\n", "\n", ".", "?", "!", " ", ""] | |
) | |
self._chunk_size = max_chunk_size | |
self.max_cluster = max_chunk_size//min_chunk_size | |
self.embed_model = embed_model | |
def _get_similarity_matrix(self, embed_model , sentences): | |
BATCH_SIZE = 500 | |
N = len(sentences) | |
embedding_matrix = None | |
for i in range(0, N, BATCH_SIZE): | |
batch_sentences = sentences[i:i+BATCH_SIZE] | |
embeddings, token_usage = asyncio.run(embed_text(embed_model[0], batch_sentences)) | |
# Convert embeddings list of lists to numpy array | |
batch_embedding_matrix = np.array(embeddings) | |
# Append the batch embedding matrix to the main embedding matrix | |
if embedding_matrix is None: | |
embedding_matrix = batch_embedding_matrix | |
else: | |
embedding_matrix = np.concatenate((embedding_matrix, batch_embedding_matrix), axis=0) | |
similarity_matrix = np.dot(embedding_matrix, embedding_matrix.T) | |
return similarity_matrix | |
def _calculate_reward(self, matrix, start, end): | |
sub_matrix = matrix[start:end+1, start:end+1] | |
return np.sum(sub_matrix) | |
def _optimal_segmentation(self, matrix, max_cluster_size, window_size=3): | |
mean_value = np.mean(matrix[np.triu_indices(matrix.shape[0], k=1)]) | |
matrix = matrix - mean_value # Normalize the matrix | |
np.fill_diagonal(matrix, 0) # Set diagonal to 1 to avoid trivial solutions | |
n = matrix.shape[0] | |
dp = np.zeros(n) | |
segmentation = np.zeros(n, dtype=int) | |
for i in range(n): | |
for size in range(1, max_cluster_size + 1): | |
if i - size + 1 >= 0: | |
# local_density = calculate_local_density(matrix, i, window_size) | |
reward = self._calculate_reward(matrix, i - size + 1, i) | |
# Adjust reward based on local density | |
adjusted_reward = reward | |
if i - size >= 0: | |
adjusted_reward += dp[i - size] | |
if adjusted_reward > dp[i]: | |
dp[i] = adjusted_reward | |
segmentation[i] = i - size + 1 | |
clusters = [] | |
i = n - 1 | |
while i >= 0: | |
start = segmentation[i] | |
clusters.append((start, i)) | |
i = start - 1 | |
clusters.reverse() | |
return clusters | |
def split_text(self, text: str) -> List[str]: | |
sentences = self.splitter.split_text(text) | |
similarity_matrix = self._get_similarity_matrix(self.embed_model, sentences) | |
clusters = self._optimal_segmentation(similarity_matrix, max_cluster_size=self.max_cluster) | |
docs = [' '.join(sentences[start:end+1]) for start, end in clusters] | |
return docs |