Spaces:
Running
Running
import os | |
import sys | |
import re | |
from sentence_transformers import util | |
script_dir = os.path.dirname(os.path.realpath(__file__)) | |
parent_dir = os.path.dirname(script_dir) | |
sys.path.append(parent_dir) | |
# autopep8: off | |
from routers.tool_find_related import EMBEDDING_CTX | |
# autopep8: on | |
MANUAL_DIR = "D:/BlenderDev/blender-manual/manual/" | |
BASE_URL = "https://docs.blender.org/manual/en/dev" | |
def process_text(text): | |
# Remove repeated characters | |
text = re.sub(r'%{2,}', '', text) | |
text = re.sub(r'#{2,}', '', text) | |
text = re.sub(r'={3,}', '', text) | |
text = re.sub(r'\*{3,}', '', text) | |
text = re.sub(r'\^{3,}', '', text) | |
text = re.sub(r'-{3,}', '', text) | |
# Remove patterns ".. word:: " and ":word:" | |
text = re.sub(r'\.\. \S+', '', text) | |
text = re.sub(r':\w+:', '', text) | |
text = re.sub(r'(\s*\n\s*)+', '\n', text) | |
return text | |
def parse_file(filedir, filename): | |
with open(os.path.join(filedir, filename), 'r', encoding='utf-8') as file: | |
content = file.read() | |
parsed_data = {} | |
if not filename.endswith('index.rst'): | |
body = content.strip() | |
else: | |
parts = content.split(".. toctree::") | |
body = parts[0].strip() | |
if len(parts) > 1: | |
parsed_data["toctree"] = {} | |
for part in parts[1:]: | |
toctree_entries = part.split('\n') | |
line = toctree_entries[0] | |
for entry in toctree_entries[1:]: | |
entry = entry.strip() | |
if not entry: | |
continue | |
if entry.startswith('/'): | |
# relative path. | |
continue | |
if not entry.endswith('.rst'): | |
continue | |
if entry.endswith('/index.rst'): | |
entry_name = entry[:-10] | |
filedir_ = os.path.join(filedir, entry_name) | |
filename_ = 'index.rst' | |
else: | |
entry_name = entry[:-4] | |
filedir_ = filedir | |
filename_ = entry | |
parsed_data['toctree'][entry_name] = parse_file( | |
filedir_, filename_) | |
processed_text = process_text(body) | |
tokens = EMBEDDING_CTX.model.tokenizer.tokenize(processed_text) | |
if len(tokens) > EMBEDDING_CTX.model.max_seq_length: | |
pass | |
# parsed_data['body'] = body | |
parsed_data['processed_text'] = processed_text | |
parsed_data['n_tokens'] = len(tokens) | |
return parsed_data | |
# Function to split the text into chunks of a maximum number of tokens | |
def split_into_many(text, max_tokens): | |
# Split the text into sentences | |
paragraphs = text.split('.\n') | |
# Get the number of tokens for each sentence | |
n_tokens = [len(EMBEDDING_CTX.model.tokenizer.tokenize(" " + sentence)) | |
for sentence in paragraphs] | |
chunks = [] | |
tokens_so_far = 0 | |
chunk = [] | |
# Loop through the sentences and tokens joined together in a tuple | |
for sentence, token in zip(paragraphs, n_tokens): | |
# If the number of tokens so far plus the number of tokens in the current sentence is greater | |
# than the max number of tokens, then add the chunk to the list of chunks and reset | |
# the chunk and tokens so far | |
if tokens_so_far + token > max_tokens: | |
chunks.append((".\n".join(chunk) + ".", tokens_so_far)) | |
chunk = [] | |
tokens_so_far = 0 | |
# If the number of tokens in the current sentence is greater than the max number of | |
# tokens, go to the next sentence | |
if token > max_tokens: | |
continue | |
# Otherwise, add the sentence to the chunk and add the number of tokens to the total | |
chunk.append(sentence) | |
tokens_so_far += token + 1 | |
if chunk: | |
chunks.append((".\n".join(chunk) + ".", tokens_so_far)) | |
return chunks | |
def get_texts(data, path): | |
result = [] | |
processed_texts = [data['processed_text']] | |
processed_tokens = [data['n_tokens']] | |
max_tokens = EMBEDDING_CTX.model.max_seq_length | |
data_ = data | |
for key in path: | |
data_ = data_['toctree'][key] | |
processed_texts.append(data_['processed_text']) | |
processed_tokens.append(data_['n_tokens']) | |
if processed_tokens[-1] > max_tokens: | |
chunks = split_into_many(processed_texts[-1], max_tokens) | |
else: | |
chunks = [(processed_texts[-1], processed_tokens[-1])] | |
for text, n_tokens in chunks: | |
# Add context to the text if we have space | |
for i in range(len(processed_texts) - 2, -1, -1): | |
n_tokens_parent = processed_tokens[i] | |
if n_tokens + n_tokens_parent >= max_tokens: | |
break | |
text_parent = processed_texts[i] | |
text = text_parent + '\n' + text | |
n_tokens += n_tokens_parent | |
result.append([path, text]) | |
try: | |
for key in data_['toctree'].keys(): | |
result.extend(get_texts(data, path + [key])) | |
except KeyError: | |
pass | |
return result | |
def _sort_similarity(chunks, embeddings, text_to_search, limit): | |
results = [] | |
query_emb = EMBEDDING_CTX.encode([text_to_search]) | |
ret = util.semantic_search( | |
query_emb, embeddings, top_k=limit, score_function=util.dot_score) | |
for score in ret[0]: | |
corpus_id = score['corpus_id'] | |
chunk = chunks[corpus_id] | |
path = chunk[0] | |
results.append(path) | |
return results | |
if __name__ == '__main__': | |
# path = 'addons/3d_view' | |
data = parse_file(MANUAL_DIR, 'index.rst') | |
data['toctree']["copyright"] = parse_file(MANUAL_DIR, 'copyright.rst') | |
# Create a list to store the text files | |
chunks = [] | |
chunks.extend(get_texts(data, [])) | |
embeddings = EMBEDDING_CTX.encode([text for path, text in chunks]) | |
result = _sort_similarity(chunks, embeddings, "Set Snap Base", 50) | |
print(result) | |