capradeepgujaran's picture
Update app.py
3303167 verified
raw
history blame
9.46 kB
import os
import tempfile
import logging
import gradio as gr
import PyPDF2
from pdf2image import convert_from_path
import docx
from llama_index.core import VectorStoreIndex, Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.core import get_response_synthesizer
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer, util
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
# Set up logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s')
# Load environment variables from .env file
load_dotenv()
# Initialize global variables
vector_index = None
query_log = []
sentence_model = SentenceTransformer('all-MiniLM-L6-v2')
def extract_text_from_pdf(pdf_path):
text = ""
image_count = 0
total_pages = 0
try:
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
total_pages = len(pdf_reader.pages)
for page_num, page in enumerate(pdf_reader.pages, 1):
page_text = page.extract_text()
if page_text.strip():
text += page_text
else:
image_count += 1
text += f"[Image detected on page {page_num}]\n"
except Exception as e:
logging.error(f"Error processing PDF {pdf_path}: {str(e)}")
return f"[Error processing PDF: {str(e)}]\n"
if image_count == total_pages:
summary = f"This document consists of {total_pages} page(s) of images.\n"
summary += "No text could be extracted. Consider manual review or image processing techniques.\n"
summary += f"File path: {pdf_path}\n"
return summary
elif image_count > 0:
text = f"This document contains both text and images.\n" + \
f"Total pages: {total_pages}\n" + \
f"Pages with images: {image_count}\n" + \
f"Extracted text:\n\n" + text
return text
def load_docx_file(docx_path):
try:
doc = docx.Document(docx_path)
return '\n'.join([para.text for para in doc.paragraphs])
except Exception as e:
logging.error(f"Error processing DOCX {docx_path}: {str(e)}")
return f"[Error processing DOCX: {str(e)}]\n"
def load_txt_file(txt_path):
try:
with open(txt_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
logging.error(f"Error processing TXT {txt_path}: {str(e)}")
return f"[Error processing TXT: {str(e)}]\n"
def load_file_based_on_extension(file_path):
if file_path.lower().endswith('.pdf'):
return extract_text_from_pdf(file_path)
elif file_path.lower().endswith('.docx'):
return load_docx_file(file_path)
elif file_path.lower().endswith('.txt'):
return load_txt_file(file_path)
else:
raise ValueError(f"Unsupported file format: {file_path}")
def process_upload(api_key, files):
global vector_index
if not api_key:
return "Please provide a valid OpenAI API Key.", None
if not files:
return "No files uploaded.", None
documents = []
error_messages = []
image_heavy_docs = []
for file_path in files:
try:
text = load_file_based_on_extension(file_path)
if "This document consists of" in text and "page(s) of images" in text:
image_heavy_docs.append(os.path.basename(file_path))
documents.append(Document(text=text))
except Exception as e:
error_message = f"Error processing file {file_path}: {str(e)}"
logging.error(error_message)
error_messages.append(error_message)
if documents:
try:
embed_model = OpenAIEmbedding(model="text-embedding-3-large", api_key=api_key)
vector_index = VectorStoreIndex.from_documents(documents, embed_model=embed_model)
success_message = f"Successfully indexed {len(documents)} files."
if image_heavy_docs:
success_message += f"\nNote: The following documents consist mainly of images and may require manual review: {', '.join(image_heavy_docs)}"
if error_messages:
success_message += f"\nErrors: {'; '.join(error_messages)}"
return success_message, vector_index
except Exception as e:
return f"Error creating index: {str(e)}", None
else:
return f"No valid documents were indexed. Errors: {'; '.join(error_messages)}", None
def calculate_similarity(response, ground_truth):
# Encode the response and ground truth
response_embedding = sentence_model.encode(response, convert_to_tensor=True)
truth_embedding = sentence_model.encode(ground_truth, convert_to_tensor=True)
# Explicitly normalize the embeddings (should result in unit vectors)
response_embedding = response_embedding / response_embedding.norm(p=2)
truth_embedding = truth_embedding / truth_embedding.norm(p=2)
# Calculate cosine similarity using sklearn's cosine_similarity function
similarity = cosine_similarity(response_embedding.reshape(1, -1), truth_embedding.reshape(1, -1))[0][0]
return similarity * 100 # Convert to percentage
def query_app(query, model_name, use_similarity_check, openai_api_key):
global vector_index, query_log
if vector_index is None:
logging.error("No documents indexed yet. Please upload documents first.")
return "No documents indexed yet. Please upload documents first.", None
if not openai_api_key:
logging.error("No OpenAI API Key provided.")
return "Please provide a valid OpenAI API Key.", None
try:
llm = OpenAI(model=model_name, api_key=openai_api_key)
except Exception as e:
logging.error(f"Error initializing the OpenAI model: {e}")
return f"Error initializing the OpenAI model: {e}", None
response_synthesizer = get_response_synthesizer(llm=llm)
query_engine = vector_index.as_query_engine(llm=llm, response_synthesizer=response_synthesizer)
try:
response = query_engine.query(query)
except Exception as e:
logging.error(f"Error during query processing: {e}")
return f"Error during query processing: {e}", None
generated_response = response.response
query_log.append({
"query_id": str(len(query_log) + 1),
"query": query,
"gt_answer": "Placeholder ground truth answer",
"response": generated_response,
"retrieved_context": [{"text": doc.text} for doc in response.source_nodes]
})
metrics = {}
if use_similarity_check:
try:
logging.info("Similarity check is enabled. Calculating similarity.")
similarity = calculate_similarity(generated_response, "Placeholder ground truth answer")
metrics['similarity'] = similarity
logging.info(f"Similarity calculated: {similarity}")
except Exception as e:
logging.error(f"Error during similarity calculation: {e}")
metrics['error'] = f"Error during similarity calculation: {e}"
return generated_response, metrics if use_similarity_check else None
def main():
with gr.Blocks(title="Document Processing App") as demo:
gr.Markdown("# πŸ“„ Document Processing and Querying App")
with gr.Tab("πŸ“€ Upload Documents"):
gr.Markdown("### Enter your OpenAI API Key and Upload PDF, DOCX, or TXT files to index")
api_key_input = gr.Textbox(label="Enter OpenAI API Key", placeholder="Paste your OpenAI API Key here")
with gr.Row():
file_upload = gr.File(label="Upload Files", file_count="multiple", type="filepath")
upload_button = gr.Button("Upload and Index")
upload_status = gr.Textbox(label="Status", interactive=False)
upload_button.click(
fn=process_upload,
inputs=[api_key_input, file_upload],
outputs=[upload_status]
)
with gr.Tab("❓ Ask a Question"):
gr.Markdown("### Query the indexed documents")
with gr.Column():
query_input = gr.Textbox(label="Enter your question", placeholder="Type your question here...")
model_dropdown = gr.Dropdown(
choices=["gpt-4o", "gpt-4o-mini"],
value="gpt-4o",
label="Select Model"
)
similarity_checkbox = gr.Checkbox(label="Use Similarity Check", value=False)
query_button = gr.Button("Ask")
with gr.Column():
answer_output = gr.Textbox(label="Answer", interactive=False)
metrics_output = gr.JSON(label="Metrics")
query_button.click(
fn=query_app,
inputs=[query_input, model_dropdown, similarity_checkbox, api_key_input],
outputs=[answer_output, metrics_output]
)
gr.Markdown("""
---
**Note:** Ensure you upload documents before attempting to query. Enter a valid OpenAI API Key to interact with the models.
""")
demo.launch()
if __name__ == "__main__":
main()