import os import tempfile import logging import gradio as gr import PyPDF2 from pdf2image import convert_from_path import pytesseract from PIL import Image import docx from llama_index.core import VectorStoreIndex, Document from llama_index.embeddings.openai import OpenAIEmbedding from llama_index.llms.openai import OpenAI from llama_index.core import get_response_synthesizer from dotenv import load_dotenv from sentence_transformers import SentenceTransformer, util from sklearn.metrics.pairwise import cosine_similarity import numpy as np # Set up logging configuration logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s') # Load environment variables from .env file load_dotenv() # Tesseract language options langs = os.popen('tesseract --list-langs').read().split('\n')[1:-1] # Initialize global variables vector_index = None query_log = [] sentence_model = SentenceTransformer('all-MiniLM-L6-v2') def extract_text_from_pdf(pdf_path, lang=None): text = "" image_count = 0 total_pages = 0 try: with open(pdf_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) total_pages = len(pdf_reader.pages) for page_num, page in enumerate(pdf_reader.pages, 1): page_text = page.extract_text() # If text is not found, consider the page as an image and use OCR if not page_text.strip(): images = convert_from_path(pdf_path, first_page=page_num, last_page=page_num) for image in images: ocr_text = pytesseract.image_to_string(image, lang=None if lang == [] else lang) text += ocr_text image_count += 1 text += f"\n[OCR applied on image detected on page {page_num}]\n" else: text += page_text except Exception as e: logging.error(f"Error processing PDF {pdf_path}: {str(e)}") return f"[Error processing PDF: {str(e)}]\n" if image_count == total_pages: summary = f"This document consists of {total_pages} page(s) of images.\n" summary += "No text could be extracted directly. OCR was applied to images.\n" summary += f"File path: {pdf_path}\n" return summary elif image_count > 0: text = f"This document contains both text and images.\n" + \ f"Total pages: {total_pages}\n" + \ f"Pages with images: {image_count}\n" + \ f"Extracted text (including OCR):\n\n" + text return text def load_docx_file(docx_path): try: doc = docx.Document(docx_path) return '\n'.join([para.text for para in doc.paragraphs]) except Exception as e: logging.error(f"Error processing DOCX {docx_path}: {str(e)}") return f"[Error processing DOCX: {str(e)}]\n" def load_txt_file(txt_path): try: with open(txt_path, 'r', encoding='utf-8') as f: return f.read() except Exception as e: logging.error(f"Error processing TXT {txt_path}: {str(e)}") return f"[Error processing TXT: {str(e)}]\n" def load_file_based_on_extension(file_path, lang=None): if file_path.lower().endswith('.pdf'): return extract_text_from_pdf(file_path, lang) elif file_path.lower().endswith('.docx'): return load_docx_file(file_path) elif file_path.lower().endswith('.txt'): return load_txt_file(file_path) else: raise ValueError(f"Unsupported file format: {file_path}") def process_upload(api_key, files, lang): global vector_index if not api_key: return "Please provide a valid OpenAI API Key.", None if not files: return "No files uploaded.", None documents = [] error_messages = [] image_heavy_docs = [] for file_path in files: try: text = load_file_based_on_extension(file_path, lang) if "This document consists of" in text and "page(s) of images" in text: image_heavy_docs.append(os.path.basename(file_path)) documents.append(Document(text=text)) except Exception as e: error_message = f"Error processing file {file_path}: {str(e)}" logging.error(error_message) error_messages.append(error_message) if documents: try: embed_model = OpenAIEmbedding(model="text-embedding-3-large", api_key=api_key) vector_index = VectorStoreIndex.from_documents(documents, embed_model=embed_model) success_message = f"Successfully indexed {len(documents)} files." if image_heavy_docs: success_message += f"\nNote: The following documents consist mainly of images and may require manual review: {', '.join(image_heavy_docs)}" if error_messages: success_message += f"\nErrors: {'; '.join(error_messages)}" return success_message, vector_index except Exception as e: return f"Error creating index: {str(e)}", None else: return f"No valid documents were indexed. Errors: {'; '.join(error_messages)}", None # This is the missing query_app function that needs to be defined def query_app(query, model_name, use_similarity_check, openai_api_key): global vector_index, query_log if vector_index is None: logging.error("No documents indexed yet. Please upload documents first.") return "No documents indexed yet. Please upload documents first.", None if not openai_api_key: logging.error("No OpenAI API Key provided.") return "Please provide a valid OpenAI API Key.", None try: llm = OpenAI(model=model_name, api_key=openai_api_key) except Exception as e: logging.error(f"Error initializing the OpenAI model: {e}") return f"Error initializing the OpenAI model: {e}", None response_synthesizer = get_response_synthesizer(llm=llm) query_engine = vector_index.as_query_engine(llm=llm, response_synthesizer=response_synthesizer) try: response = query_engine.query(query) except Exception as e: logging.error(f"Error during query processing: {e}") return f"Error during query processing: {e}", None generated_response = response.response query_log.append({ "query_id": str(len(query_log) + 1), "query": query, "gt_answer": "Placeholder ground truth answer", "response": generated_response, "retrieved_context": [{"text": doc.text} for doc in response.source_nodes] }) metrics = {} if use_similarity_check: try: logging.info("Similarity check is enabled. Calculating similarity.") similarity = calculate_similarity(generated_response, "Placeholder ground truth answer") metrics['similarity'] = similarity logging.info(f"Similarity calculated: {similarity}") except Exception as e: logging.error(f"Error during similarity calculation: {e}") metrics['error'] = f"Error during similarity calculation: {e}" return generated_response, metrics if use_similarity_check else None def main(): with gr.Blocks(title="Document Processing App") as demo: gr.Markdown("# 📄 Document Processing and Querying App") with gr.Tab("📤 Upload Documents"): gr.Markdown("### Enter your OpenAI API Key and Upload PDF, DOCX, or TXT files to index") api_key_input = gr.Textbox(label="Enter OpenAI API Key", placeholder="Paste your OpenAI API Key here") with gr.Row(): file_upload = gr.File(label="Upload Files", file_count="multiple", type="filepath") lang_dropdown = gr.Dropdown(choices=langs, label="Select OCR Language", value='eng') upload_button = gr.Button("Upload and Index") upload_status = gr.Textbox(label="Status", interactive=False) upload_button.click( fn=process_upload, inputs=[api_key_input, file_upload, lang_dropdown], outputs=[upload_status] ) with gr.Tab("❓ Ask a Question"): gr.Markdown("### Query the indexed documents") with gr.Column(): query_input = gr.Textbox(label="Enter your question", placeholder="Type your question here...") model_dropdown = gr.Dropdown( choices=["gpt-4o", "gpt-4o-mini"], value="gpt-4o", label="Select Model" ) similarity_checkbox = gr.Checkbox(label="Use Similarity Check", value=False) query_button = gr.Button("Ask") with gr.Column(): answer_output = gr.Textbox(label="Answer", interactive=False) metrics_output = gr.JSON(label="Metrics") query_button.click( fn=query_app, inputs=[query_input, model_dropdown, similarity_checkbox, api_key_input], outputs=[answer_output, metrics_output] ) gr.Markdown(""" --- **Note:** Ensure you upload documents before attempting to query. Enter a valid OpenAI API Key to interact with the models. """) demo.launch() if __name__ == "__main__": main()