Spaces:
Building
on
T4
Building
on
T4
Create chunkers.py
Browse files- chunkers.py +87 -0
chunkers.py
ADDED
@@ -0,0 +1,87 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from base_chunker import BaseChunker
|
2 |
+
from typing import List
|
3 |
+
|
4 |
+
import numpy as np
|
5 |
+
import tiktoken
|
6 |
+
|
7 |
+
class ClusterSemanticChunker(BaseChunker):
|
8 |
+
def __init__(self, embedding_function=None, max_chunk_size=400, min_chunk_size=50, length_function=openai_token_count):
|
9 |
+
self.splitter = RecursiveTokenChunker(
|
10 |
+
chunk_size=min_chunk_size,
|
11 |
+
chunk_overlap=0,
|
12 |
+
length_function=openai_token_count,
|
13 |
+
separators = ["\n\n", "\n", ".", "?", "!", " ", ""]
|
14 |
+
)
|
15 |
+
|
16 |
+
self._chunk_size = max_chunk_size
|
17 |
+
self.max_cluster = max_chunk_size//min_chunk_size
|
18 |
+
self.embedding_function = embedding_function
|
19 |
+
|
20 |
+
def _get_similarity_matrix(self, embedding_function, sentences):
|
21 |
+
BATCH_SIZE = 500
|
22 |
+
N = len(sentences)
|
23 |
+
embedding_matrix = None
|
24 |
+
|
25 |
+
for i in range(0, N, BATCH_SIZE):
|
26 |
+
batch_sentences = sentences[i:i+BATCH_SIZE]
|
27 |
+
embeddings = embedding_function(batch_sentences)
|
28 |
+
|
29 |
+
# Convert embeddings list of lists to numpy array
|
30 |
+
batch_embedding_matrix = np.array(embeddings)
|
31 |
+
|
32 |
+
# Append the batch embedding matrix to the main embedding matrix
|
33 |
+
if embedding_matrix is None:
|
34 |
+
embedding_matrix = batch_embedding_matrix
|
35 |
+
else:
|
36 |
+
embedding_matrix = np.concatenate((embedding_matrix, batch_embedding_matrix), axis=0)
|
37 |
+
|
38 |
+
similarity_matrix = np.dot(embedding_matrix, embedding_matrix.T)
|
39 |
+
|
40 |
+
return similarity_matrix
|
41 |
+
|
42 |
+
def _calculate_reward(self, matrix, start, end):
|
43 |
+
sub_matrix = matrix[start:end+1, start:end+1]
|
44 |
+
return np.sum(sub_matrix)
|
45 |
+
|
46 |
+
def _optimal_segmentation(self, matrix, max_cluster_size, window_size=3):
|
47 |
+
mean_value = np.mean(matrix[np.triu_indices(matrix.shape[0], k=1)])
|
48 |
+
matrix = matrix - mean_value # Normalize the matrix
|
49 |
+
np.fill_diagonal(matrix, 0) # Set diagonal to 1 to avoid trivial solutions
|
50 |
+
|
51 |
+
n = matrix.shape[0]
|
52 |
+
dp = np.zeros(n)
|
53 |
+
segmentation = np.zeros(n, dtype=int)
|
54 |
+
|
55 |
+
for i in range(n):
|
56 |
+
for size in range(1, max_cluster_size + 1):
|
57 |
+
if i - size + 1 >= 0:
|
58 |
+
# local_density = calculate_local_density(matrix, i, window_size)
|
59 |
+
reward = self._calculate_reward(matrix, i - size + 1, i)
|
60 |
+
# Adjust reward based on local density
|
61 |
+
adjusted_reward = reward
|
62 |
+
if i - size >= 0:
|
63 |
+
adjusted_reward += dp[i - size]
|
64 |
+
if adjusted_reward > dp[i]:
|
65 |
+
dp[i] = adjusted_reward
|
66 |
+
segmentation[i] = i - size + 1
|
67 |
+
|
68 |
+
clusters = []
|
69 |
+
i = n - 1
|
70 |
+
while i >= 0:
|
71 |
+
start = segmentation[i]
|
72 |
+
clusters.append((start, i))
|
73 |
+
i = start - 1
|
74 |
+
|
75 |
+
clusters.reverse()
|
76 |
+
return clusters
|
77 |
+
|
78 |
+
def split_text(self, text: str) -> List[str]:
|
79 |
+
sentences = self.splitter.split_text(text)
|
80 |
+
|
81 |
+
similarity_matrix = self._get_similarity_matrix(self.embedding_function, sentences)
|
82 |
+
|
83 |
+
clusters = self._optimal_segmentation(similarity_matrix, max_cluster_size=self.max_cluster)
|
84 |
+
|
85 |
+
docs = [' '.join(sentences[start:end+1]) for start, end in clusters]
|
86 |
+
|
87 |
+
return docs
|