|
|
|
|
|
|
|
import os |
|
from langchain_community.vectorstores import Chroma |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from langchain.text_splitter import CharacterTextSplitter |
|
from modules import common_utils,file_utils |
|
from modules import app_logger |
|
|
|
|
|
from modules import app_constants |
|
|
|
app_logger = app_logger.app_logger |
|
|
|
TEMP_DIR = app_constants.WORKSPACE_DIRECTORY + "tmp" |
|
DB_DIR = app_constants.WORKSPACE_DIRECTORY + "db" |
|
|
|
processed_files_record = os.path.join(app_constants.WORKSPACE_DIRECTORY, app_constants.PROCESSED_DOCS) |
|
|
|
def load_documents_from_jsonl(file_path, loader_class): |
|
try: |
|
loader = loader_class(file_path, json_lines=True, text_content=False, jq_schema='.') |
|
return loader.load() |
|
except Exception as e: |
|
app_logger.error(f"Error loading documents from JSONL file {file_path}: {e}") |
|
return None |
|
|
|
def update_processed_files_record(file_md5,module, file_path): |
|
try: |
|
with open(processed_files_record, 'a') as file: |
|
file.write(f"{file_md5},{module},{file_path}\n") |
|
except Exception as e: |
|
app_logger.error(f"Error updating processed files record: {e}") |
|
|
|
def is_file_processed(file_md5): |
|
if os.path.exists(processed_files_record): |
|
with open(processed_files_record, 'r') as file: |
|
for line in file: |
|
md5, _ = line.strip().split(',', 1) |
|
if md5 == file_md5: |
|
return True |
|
return False |
|
|
|
def get_chroma_index(file_path, current_page="nav_playbooks", is_persistent=True): |
|
app_logger.info(f"Starting get_chroma_index for {file_path}") |
|
file_md5 = file_utils.compute_md5(file_path) |
|
if is_file_processed(file_md5): |
|
app_logger.info(f"File {file_path} has already been processed. Skipping.") |
|
db = None |
|
return False |
|
|
|
_, file_extension = os.path.splitext(file_path) |
|
loader_class = app_constants.DOCUMENT_MAP.get(file_extension.lower(), None) |
|
|
|
if not loader_class: |
|
app_logger.error(f"No suitable loader found for file type {file_extension}") |
|
return None, False |
|
|
|
embedding_model = app_constants.EMBEDDING_MODEL_NAME |
|
chunk_size = app_constants.CHUNK_SIZE |
|
chunk_overlap = app_constants.CHUNK_OVERLAP |
|
|
|
storage_dir = DB_DIR if is_persistent else TEMP_DIR |
|
|
|
base_filename = f"{current_page}_chroma_db" if is_persistent else f"{os.path.splitext(os.path.basename(file_path))[0]}_chroma_db" |
|
sanitized_base_filename = file_utils.sanitize_filename(base_filename) |
|
chroma_persist_directory = os.path.join(storage_dir, sanitized_base_filename) |
|
|
|
|
|
embeddings = HuggingFaceEmbeddings(model_name=embedding_model) |
|
try: |
|
if file_extension.lower() == '.jsonl': |
|
documents = load_documents_from_jsonl(file_path, loader_class) |
|
else: |
|
loader = loader_class(file_path) |
|
documents = loader.load() |
|
|
|
if not documents: |
|
app_logger.error(f"No documents loaded from {file_path}.") |
|
db = None |
|
return False |
|
|
|
text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) |
|
docs = text_splitter.split_documents(documents) |
|
|
|
if not docs: |
|
app_logger.error(f"No documents to process after splitting from {file_path}.") |
|
db = None |
|
return False |
|
|
|
db = Chroma.from_documents(docs, embeddings, persist_directory=chroma_persist_directory, client_settings=app_constants.CHROMA_SETTINGS) |
|
update_processed_files_record(file_md5,current_page, file_path) |
|
app_logger.info("Created index and saved to disk") |
|
db.persist() |
|
except Exception as e: |
|
app_logger.error(f"Error in get_chroma_index for {file_path}: {e}") |
|
db = None |
|
return False |
|
app_logger.info("Completed get_chroma_index operation") |
|
db = None |
|
return True |