|
import os |
|
import tempfile |
|
import logging |
|
import gradio as gr |
|
import PyPDF2 |
|
from pdf2image import convert_from_path |
|
import docx |
|
from llama_index.core import VectorStoreIndex, Document |
|
from llama_index.embeddings.openai import OpenAIEmbedding |
|
from llama_index.llms.openai import OpenAI |
|
from llama_index.core import get_response_synthesizer |
|
from dotenv import load_dotenv |
|
from sentence_transformers import SentenceTransformer, util |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import numpy as np |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s') |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
vector_index = None |
|
query_log = [] |
|
sentence_model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
def extract_text_from_pdf(pdf_path): |
|
text = "" |
|
image_count = 0 |
|
total_pages = 0 |
|
|
|
try: |
|
with open(pdf_path, 'rb') as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
total_pages = len(pdf_reader.pages) |
|
|
|
for page_num, page in enumerate(pdf_reader.pages, 1): |
|
page_text = page.extract_text() |
|
if page_text.strip(): |
|
text += page_text |
|
else: |
|
image_count += 1 |
|
text += f"[Image detected on page {page_num}]\n" |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing PDF {pdf_path}: {str(e)}") |
|
return f"[Error processing PDF: {str(e)}]\n" |
|
|
|
if image_count == total_pages: |
|
summary = f"This document consists of {total_pages} page(s) of images.\n" |
|
summary += "No text could be extracted. Consider manual review or image processing techniques.\n" |
|
summary += f"File path: {pdf_path}\n" |
|
return summary |
|
elif image_count > 0: |
|
text = f"This document contains both text and images.\n" + \ |
|
f"Total pages: {total_pages}\n" + \ |
|
f"Pages with images: {image_count}\n" + \ |
|
f"Extracted text:\n\n" + text |
|
|
|
return text |
|
|
|
def load_docx_file(docx_path): |
|
try: |
|
doc = docx.Document(docx_path) |
|
return '\n'.join([para.text for para in doc.paragraphs]) |
|
except Exception as e: |
|
logging.error(f"Error processing DOCX {docx_path}: {str(e)}") |
|
return f"[Error processing DOCX: {str(e)}]\n" |
|
|
|
def load_txt_file(txt_path): |
|
try: |
|
with open(txt_path, 'r', encoding='utf-8') as f: |
|
return f.read() |
|
except Exception as e: |
|
logging.error(f"Error processing TXT {txt_path}: {str(e)}") |
|
return f"[Error processing TXT: {str(e)}]\n" |
|
|
|
def load_file_based_on_extension(file_path): |
|
if file_path.lower().endswith('.pdf'): |
|
return extract_text_from_pdf(file_path) |
|
elif file_path.lower().endswith('.docx'): |
|
return load_docx_file(file_path) |
|
elif file_path.lower().endswith('.txt'): |
|
return load_txt_file(file_path) |
|
else: |
|
raise ValueError(f"Unsupported file format: {file_path}") |
|
|
|
def process_upload(api_key, files): |
|
global vector_index |
|
|
|
if not api_key: |
|
return "Please provide a valid OpenAI API Key.", None |
|
|
|
if not files: |
|
return "No files uploaded.", None |
|
|
|
documents = [] |
|
error_messages = [] |
|
image_heavy_docs = [] |
|
|
|
for file_path in files: |
|
try: |
|
text = load_file_based_on_extension(file_path) |
|
if "This document consists of" in text and "page(s) of images" in text: |
|
image_heavy_docs.append(os.path.basename(file_path)) |
|
documents.append(Document(text=text)) |
|
except Exception as e: |
|
error_message = f"Error processing file {file_path}: {str(e)}" |
|
logging.error(error_message) |
|
error_messages.append(error_message) |
|
|
|
if documents: |
|
try: |
|
embed_model = OpenAIEmbedding(model="text-embedding-3-large", api_key=api_key) |
|
vector_index = VectorStoreIndex.from_documents(documents, embed_model=embed_model) |
|
|
|
success_message = f"Successfully indexed {len(documents)} files." |
|
if image_heavy_docs: |
|
success_message += f"\nNote: The following documents consist mainly of images and may require manual review: {', '.join(image_heavy_docs)}" |
|
if error_messages: |
|
success_message += f"\nErrors: {'; '.join(error_messages)}" |
|
|
|
return success_message, vector_index |
|
except Exception as e: |
|
return f"Error creating index: {str(e)}", None |
|
else: |
|
return f"No valid documents were indexed. Errors: {'; '.join(error_messages)}", None |
|
|
|
def calculate_similarity(response, ground_truth): |
|
response_embedding = sentence_model.encode(response, convert_to_tensor=True) |
|
truth_embedding = sentence_model.encode(ground_truth, convert_to_tensor=True) |
|
|
|
|
|
response_embedding = response_embedding / np.linalg.norm(response_embedding) |
|
truth_embedding = truth_embedding / np.linalg.norm(truth_embedding) |
|
|
|
|
|
similarity = cosine_similarity(response_embedding.reshape(1, -1), truth_embedding.reshape(1, -1))[0][0] |
|
return similarity * 100 |
|
|
|
def query_app(query, model_name, use_similarity_check, openai_api_key): |
|
global vector_index, query_log |
|
|
|
if vector_index is None: |
|
logging.error("No documents indexed yet. Please upload documents first.") |
|
return "No documents indexed yet. Please upload documents first.", None |
|
|
|
if not openai_api_key: |
|
logging.error("No OpenAI API Key provided.") |
|
return "Please provide a valid OpenAI API Key.", None |
|
|
|
try: |
|
llm = OpenAI(model=model_name, api_key=openai_api_key) |
|
except Exception as e: |
|
logging.error(f"Error initializing the OpenAI model: {e}") |
|
return f"Error initializing the OpenAI model: {e}", None |
|
|
|
response_synthesizer = get_response_synthesizer(llm=llm) |
|
query_engine = vector_index.as_query_engine(llm=llm, response_synthesizer=response_synthesizer) |
|
|
|
try: |
|
response = query_engine.query(query) |
|
except Exception as e: |
|
logging.error(f"Error during query processing: {e}") |
|
return f"Error during query processing: {e}", None |
|
|
|
generated_response = response.response |
|
query_log.append({ |
|
"query_id": str(len(query_log) + 1), |
|
"query": query, |
|
"gt_answer": "Placeholder ground truth answer", |
|
"response": generated_response, |
|
"retrieved_context": [{"text": doc.text} for doc in response.source_nodes] |
|
}) |
|
|
|
metrics = {} |
|
|
|
if use_similarity_check: |
|
try: |
|
logging.info("Similarity check is enabled. Calculating similarity.") |
|
similarity = calculate_similarity(generated_response, "Placeholder ground truth answer") |
|
metrics['similarity'] = similarity |
|
logging.info(f"Similarity calculated: {similarity}") |
|
except Exception as e: |
|
logging.error(f"Error during similarity calculation: {e}") |
|
metrics['error'] = f"Error during similarity calculation: {e}" |
|
|
|
return generated_response, metrics if use_similarity_check else None |
|
|
|
def main(): |
|
with gr.Blocks(title="Document Processing App") as demo: |
|
gr.Markdown("# π Document Processing and Querying App") |
|
|
|
with gr.Tab("π€ Upload Documents"): |
|
gr.Markdown("### Enter your OpenAI API Key and Upload PDF, DOCX, or TXT files to index") |
|
|
|
api_key_input = gr.Textbox(label="Enter OpenAI API Key", placeholder="Paste your OpenAI API Key here") |
|
|
|
with gr.Row(): |
|
file_upload = gr.File(label="Upload Files", file_count="multiple", type="filepath") |
|
upload_button = gr.Button("Upload and Index") |
|
upload_status = gr.Textbox(label="Status", interactive=False) |
|
|
|
upload_button.click( |
|
fn=process_upload, |
|
inputs=[api_key_input, file_upload], |
|
outputs=[upload_status] |
|
) |
|
|
|
with gr.Tab("β Ask a Question"): |
|
gr.Markdown("### Query the indexed documents") |
|
with gr.Column(): |
|
query_input = gr.Textbox(label="Enter your question", placeholder="Type your question here...") |
|
model_dropdown = gr.Dropdown( |
|
choices=["gpt-4o", "gpt-4o-mini"], |
|
value="gpt-4o", |
|
label="Select Model" |
|
) |
|
similarity_checkbox = gr.Checkbox(label="Use Similarity Check", value=False) |
|
query_button = gr.Button("Ask") |
|
with gr.Column(): |
|
answer_output = gr.Textbox(label="Answer", interactive=False) |
|
metrics_output = gr.JSON(label="Metrics") |
|
|
|
query_button.click( |
|
fn=query_app, |
|
inputs=[query_input, model_dropdown, similarity_checkbox, api_key_input], |
|
outputs=[answer_output, metrics_output] |
|
) |
|
|
|
gr.Markdown(""" |
|
--- |
|
**Note:** Ensure you upload documents before attempting to query. Enter a valid OpenAI API Key to interact with the models. |
|
""") |
|
|
|
demo.launch() |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|