# services/pdf_service.py from pathlib import Path from typing import List, Dict, Any, Optional from PyPDF2 import PdfReader from langchain.text_splitter import RecursiveCharacterTextSplitter import faiss import numpy as np import asyncio from concurrent.futures import ThreadPoolExecutor import logging from datetime import datetime from config.config import settings logger = logging.getLogger(__name__) class PDFService: def __init__(self, model_service): self.embedder = model_service.embedder self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=settings.CHUNK_SIZE, chunk_overlap=settings.CHUNK_OVERLAP, separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] ) self.index = None self.chunks = [] self.last_update = None self.pdf_metadata = {} def process_pdf(self, pdf_path: Path) -> List[Dict[str, Any]]: """Process a single PDF file - now synchronous""" try: reader = PdfReader(str(pdf_path)) chunks = [] # Extract metadata metadata = { 'title': reader.metadata.get('/Title', ''), 'author': reader.metadata.get('/Author', ''), 'creation_date': reader.metadata.get('/CreationDate', ''), 'pages': len(reader.pages), 'filename': pdf_path.name } self.pdf_metadata[pdf_path.name] = metadata # Process each page for page_num, page in enumerate(reader.pages): text = page.extract_text() if not text: continue page_chunks = self.text_splitter.split_text(text) for i, chunk in enumerate(page_chunks): chunks.append({ 'text': chunk, 'source': pdf_path.name, 'page': page_num + 1, 'chunk_index': i, 'metadata': metadata, 'timestamp': datetime.now().isoformat() }) print("--------------------------- chunks ----------------------------------") print("--------------------------- chunks ----------------------------------") print(chunks) return chunks except Exception as e: logger.error(f"Error processing PDF {pdf_path}: {e}") return [] async def index_pdfs(self, pdf_folder: Path = settings.PDF_FOLDER) -> None: """Index all PDFs in the specified folder""" try: pdf_files = list(pdf_folder.glob('*.pdf')) if not pdf_files: logger.warning(f"No PDF files found in {pdf_folder}") return # Process PDFs using thread pool loop = asyncio.get_running_loop() with ThreadPoolExecutor() as executor: chunk_lists = await loop.run_in_executor( executor, lambda: [self.process_pdf(pdf_file) for pdf_file in pdf_files] ) # Combine all chunks self.chunks = [] for chunk_list in chunk_lists: self.chunks.extend(chunk_list) if not self.chunks: logger.warning("No text chunks extracted from PDFs") return # Create FAISS index texts = [chunk['text'] for chunk in self.chunks] embeddings = await loop.run_in_executor( None, lambda: self.embedder.encode( texts, convert_to_tensor=True, show_progress_bar=True ).cpu().detach().numpy() ) dimension = embeddings.shape[1] self.index = faiss.IndexFlatL2(dimension) self.index.add(embeddings) self.last_update = datetime.now() logger.info(f"Indexed {len(self.chunks)} chunks from {len(pdf_files)} PDFs") except Exception as e: logger.error(f"Error indexing PDFs: {e}") raise async def search( self, query: str, top_k: int = 5, min_score: float = 0.5 ) -> List[Dict[str, Any]]: """Search indexed PDFs with debug logs""" print("--------------------------- query ----------------------------------") print(query) if not self.index or not self.chunks: await self.index_pdfs() try: # Create query embedding query_embedding = self.embedder.encode([query], convert_to_tensor=True) query_embedding_np = query_embedding.cpu().detach().numpy() print("Query Embedding Shape:", query_embedding_np.shape) # Search in FAISS index distances, indices = self.index.search(query_embedding_np, top_k) print("Distances:", distances) print("Indices:", indices) # Process results results = [] for i, idx in enumerate(indices[0]): if idx >= len(self.chunks): continue # Skip invalid indices score = 1 - distances[0][i] # Convert distance to similarity score print(f"Chunk Index: {idx}, Distance: {distances[0][i]}, Score: {score}") print("----- score < min_score") print(score < min_score) if score < min_score: print("skipped ---- ") #continue # Skip low scores chunk = self.chunks[idx].copy() chunk['score'] = score print("---- chuck " ) print(chunk) results.append(chunk) # Sort by score and take top_k results.sort(key=lambda x: x['score'], reverse=True) print("--------------------------- results ----------------------------------") print(results) return results[:top_k] except Exception as e: logger.error(f"Error searching PDFs: {e}") raise