meraKB / loaders /common.py
codelion's picture
Update loaders/common.py
7b99e28 verified
import tempfile
import time
import os
from utils import compute_sha1_from_file
from langchain.schema import Document
import streamlit as st
from langchain.text_splitter import RecursiveCharacterTextSplitter
from stats import add_usage
def process_file(vector_store, file, loader_class, file_suffix, stats_db=None):
try:
print("=== Starting file processing ===")
documents = []
file_name = file.name
file_size = file.size
if st.secrets.self_hosted == "false":
if file_size > 1000000:
st.error("File size is too large. Please upload a file smaller than 1MB or self host.")
return
dateshort = time.strftime("%Y%m%d")
# Load documents
with tempfile.NamedTemporaryFile(delete=False, suffix=file_suffix) as tmp_file:
tmp_file.write(file.getvalue())
tmp_file.flush()
loader = loader_class(tmp_file.name)
documents = loader.load()
file_sha1 = compute_sha1_from_file(tmp_file.name)
os.remove(tmp_file.name)
chunk_size = st.session_state['chunk_size']
chunk_overlap = st.session_state['chunk_overlap']
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
documents = text_splitter.split_documents(documents)
# Create documents with metadata and validate content
docs_with_metadata = []
for i, doc in enumerate(documents):
try:
# Validate content is string and not empty
if not isinstance(doc.page_content, str):
print(f"Skipping document {i}: Invalid content type {type(doc.page_content)}")
continue
if not doc.page_content.strip():
print(f"Skipping document {i}: Empty content")
continue
# Basic content validation
content = doc.page_content.strip()
if len(content) < 10: # Skip very short contents
print(f"Skipping document {i}: Content too short ({len(content)} chars)")
continue
new_doc = Document(
page_content=content,
metadata={
"file_sha1": file_sha1,
"file_size": file_size,
"file_name": file_name,
"chunk_size": chunk_size,
"chunk_overlap": chunk_overlap,
"date": dateshort,
"user": st.session_state["username"]
}
)
docs_with_metadata.append(new_doc)
except Exception as e:
print(f"Error processing document {i}: {str(e)}")
continue
print(f"Processed {len(docs_with_metadata)} valid documents")
# Process in smaller batches
BATCH_SIZE = 50
for i in range(0, len(docs_with_metadata), BATCH_SIZE):
batch = docs_with_metadata[i:i + BATCH_SIZE]
try:
print(f"Processing batch {i//BATCH_SIZE + 1} of {(len(docs_with_metadata) + BATCH_SIZE - 1)//BATCH_SIZE}")
# Debug embedding process
texts = [doc.page_content for doc in batch]
metadatas = [doc.metadata for doc in batch]
print(f"Sample text from batch (first 200 chars): {texts[0][:200] if texts else 'No texts'}")
# Try to get embeddings directly first
try:
embeddings = vector_store._embedding.embed_documents(texts)
print(f"Successfully generated embeddings for batch. First embedding shape: {len(embeddings[0]) if embeddings else 'No embeddings'}")
except Exception as e:
print(f"Embedding error: {str(e)}")
print(f"Embedding type: {type(vector_store._embedding).__name__}")
# You might want to add retry logic here
raise
vector_store.add_documents(batch)
print(f"Successfully added batch to vector store")
except Exception as e:
print(f"Error processing batch {i//BATCH_SIZE + 1}: {str(e)}")
print(f"First document in failed batch (truncated):")
if batch:
print(batch[0].page_content[:200])
raise
if stats_db:
add_usage(stats_db, "embedding", "file", metadata={
"file_name": file_name,
"file_type": file_suffix,
"chunk_size": chunk_size,
"chunk_overlap": chunk_overlap
})
except Exception as e:
print(f"\n=== General Processing Error ===")
print(f"Exception occurred during file processing: {str(e)}")
print(f"Exception type: {type(e).__name__}")
raise