|
import tempfile |
|
import time |
|
import os |
|
from utils import compute_sha1_from_file |
|
from langchain.schema import Document |
|
import streamlit as st |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from stats import add_usage |
|
|
|
def process_file(vector_store, file, loader_class, file_suffix, stats_db=None): |
|
try: |
|
print("=== Starting file processing ===") |
|
documents = [] |
|
file_name = file.name |
|
file_size = file.size |
|
if st.secrets.self_hosted == "false": |
|
if file_size > 1000000: |
|
st.error("File size is too large. Please upload a file smaller than 1MB or self host.") |
|
return |
|
|
|
dateshort = time.strftime("%Y%m%d") |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=file_suffix) as tmp_file: |
|
tmp_file.write(file.getvalue()) |
|
tmp_file.flush() |
|
loader = loader_class(tmp_file.name) |
|
documents = loader.load() |
|
file_sha1 = compute_sha1_from_file(tmp_file.name) |
|
os.remove(tmp_file.name) |
|
|
|
chunk_size = st.session_state['chunk_size'] |
|
chunk_overlap = st.session_state['chunk_overlap'] |
|
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=chunk_size, chunk_overlap=chunk_overlap) |
|
|
|
documents = text_splitter.split_documents(documents) |
|
|
|
|
|
docs_with_metadata = [] |
|
for i, doc in enumerate(documents): |
|
try: |
|
|
|
if not isinstance(doc.page_content, str): |
|
print(f"Skipping document {i}: Invalid content type {type(doc.page_content)}") |
|
continue |
|
|
|
if not doc.page_content.strip(): |
|
print(f"Skipping document {i}: Empty content") |
|
continue |
|
|
|
|
|
content = doc.page_content.strip() |
|
if len(content) < 10: |
|
print(f"Skipping document {i}: Content too short ({len(content)} chars)") |
|
continue |
|
|
|
new_doc = Document( |
|
page_content=content, |
|
metadata={ |
|
"file_sha1": file_sha1, |
|
"file_size": file_size, |
|
"file_name": file_name, |
|
"chunk_size": chunk_size, |
|
"chunk_overlap": chunk_overlap, |
|
"date": dateshort, |
|
"user": st.session_state["username"] |
|
} |
|
) |
|
docs_with_metadata.append(new_doc) |
|
except Exception as e: |
|
print(f"Error processing document {i}: {str(e)}") |
|
continue |
|
|
|
print(f"Processed {len(docs_with_metadata)} valid documents") |
|
|
|
|
|
BATCH_SIZE = 50 |
|
for i in range(0, len(docs_with_metadata), BATCH_SIZE): |
|
batch = docs_with_metadata[i:i + BATCH_SIZE] |
|
try: |
|
print(f"Processing batch {i//BATCH_SIZE + 1} of {(len(docs_with_metadata) + BATCH_SIZE - 1)//BATCH_SIZE}") |
|
|
|
texts = [doc.page_content for doc in batch] |
|
metadatas = [doc.metadata for doc in batch] |
|
|
|
print(f"Sample text from batch (first 200 chars): {texts[0][:200] if texts else 'No texts'}") |
|
|
|
|
|
try: |
|
embeddings = vector_store._embedding.embed_documents(texts) |
|
print(f"Successfully generated embeddings for batch. First embedding shape: {len(embeddings[0]) if embeddings else 'No embeddings'}") |
|
except Exception as e: |
|
print(f"Embedding error: {str(e)}") |
|
print(f"Embedding type: {type(vector_store._embedding).__name__}") |
|
|
|
raise |
|
|
|
vector_store.add_documents(batch) |
|
print(f"Successfully added batch to vector store") |
|
|
|
except Exception as e: |
|
print(f"Error processing batch {i//BATCH_SIZE + 1}: {str(e)}") |
|
print(f"First document in failed batch (truncated):") |
|
if batch: |
|
print(batch[0].page_content[:200]) |
|
raise |
|
|
|
if stats_db: |
|
add_usage(stats_db, "embedding", "file", metadata={ |
|
"file_name": file_name, |
|
"file_type": file_suffix, |
|
"chunk_size": chunk_size, |
|
"chunk_overlap": chunk_overlap |
|
}) |
|
|
|
except Exception as e: |
|
print(f"\n=== General Processing Error ===") |
|
print(f"Exception occurred during file processing: {str(e)}") |
|
print(f"Exception type: {type(e).__name__}") |
|
raise |