Spaces:
Running
on
T4
Running
on
T4
File size: 3,744 Bytes
5b7923a fd32247 c7bc4eb 5b7923a fd32247 5b7923a fd32247 c7bc4eb c446ecb 6f68bae c446ecb fd32247 5b7923a fd32247 5b7923a fd32247 5b7923a fd32247 5b7923a fd32247 e479ed0 fd32247 5b7923a fd32247 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
import asyncio
from base_chunker import BaseChunker
from typing import List
import numpy as np
from functools import partial
from dense_embed import embed_text
from recursive_token_chunker import RecursiveTokenChunker
from infinity_emb import AsyncEmbeddingEngine
def bert_token_count(tokenizer, text: str)-> int:
"""Returns the number of tokens in a text string."""
return len(tokenizer.convert_tokens_to_ids(tokenizer.tokenize(text, add_special_tokens=True)))
class ClusterSemanticChunker(BaseChunker):
def __init__(self, embed_model: AsyncEmbeddingEngine =None, max_chunk_size=400, min_chunk_size=50, length_function=bert_token_count):
self.splitter = RecursiveTokenChunker(
chunk_size=min_chunk_size,
chunk_overlap=0,
length_function=partial(length_function, embed_model._model.tokenizer),
separators = ["\n\n", "\n", ".", "?", "!", " ", ""]
)
self._chunk_size = max_chunk_size
self.max_cluster = max_chunk_size//min_chunk_size
self.embed_model = embed_model
def _get_similarity_matrix(self, embed_model , sentences):
BATCH_SIZE = 500
N = len(sentences)
embedding_matrix = None
for i in range(0, N, BATCH_SIZE):
batch_sentences = sentences[i:i+BATCH_SIZE]
embeddings, token_usage = asyncio.run(embed_text(embed_model[0], batch_sentences))
# Convert embeddings list of lists to numpy array
batch_embedding_matrix = np.array(embeddings)
# Append the batch embedding matrix to the main embedding matrix
if embedding_matrix is None:
embedding_matrix = batch_embedding_matrix
else:
embedding_matrix = np.concatenate((embedding_matrix, batch_embedding_matrix), axis=0)
similarity_matrix = np.dot(embedding_matrix, embedding_matrix.T)
return similarity_matrix
def _calculate_reward(self, matrix, start, end):
sub_matrix = matrix[start:end+1, start:end+1]
return np.sum(sub_matrix)
def _optimal_segmentation(self, matrix, max_cluster_size, window_size=3):
mean_value = np.mean(matrix[np.triu_indices(matrix.shape[0], k=1)])
matrix = matrix - mean_value # Normalize the matrix
np.fill_diagonal(matrix, 0) # Set diagonal to 1 to avoid trivial solutions
n = matrix.shape[0]
dp = np.zeros(n)
segmentation = np.zeros(n, dtype=int)
for i in range(n):
for size in range(1, max_cluster_size + 1):
if i - size + 1 >= 0:
# local_density = calculate_local_density(matrix, i, window_size)
reward = self._calculate_reward(matrix, i - size + 1, i)
# Adjust reward based on local density
adjusted_reward = reward
if i - size >= 0:
adjusted_reward += dp[i - size]
if adjusted_reward > dp[i]:
dp[i] = adjusted_reward
segmentation[i] = i - size + 1
clusters = []
i = n - 1
while i >= 0:
start = segmentation[i]
clusters.append((start, i))
i = start - 1
clusters.reverse()
return clusters
def split_text(self, text: str) -> List[str]:
sentences = self.splitter.split_text(text)
similarity_matrix = self._get_similarity_matrix(self.embed_model, sentences)
clusters = self._optimal_segmentation(similarity_matrix, max_cluster_size=self.max_cluster)
docs = [' '.join(sentences[start:end+1]) for start, end in clusters]
return docs |