File size: 7,369 Bytes
fae0258 1ce6811 fae0258 1a16755 fae0258 1ce6811 fae0258 1a16755 fae0258 1a16755 fae0258 1ce6811 fae0258 4b1e958 fae0258 1a16755 fae0258 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
import os
import tempfile
import gradio as gr
import fitz # PyMuPDF for reading PDF files
import pytesseract
from PIL import Image
import docx # for reading .docx files
from ragchecker import RAGResults, RAGChecker
from ragchecker.metrics import all_metrics
from llama_index.core import VectorStoreIndex, Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.core import get_response_synthesizer
from dotenv import load_dotenv
from bert_score import score as bert_score
# Load environment variables from .env file
load_dotenv()
# Set the path for Tesseract OCR (only needed on Windows)
# On Linux-based systems (like Hugging Face Spaces), Tesseract is usually available via apt
# Uncomment and adjust if necessary
# pytesseract.pytesseract.tesseract_cmd = r'/usr/bin/tesseract'
# Initialize global variables
vector_index = None
query_log = [] # Store queries and results for RAGChecker
# Function to handle PDF and OCR for scanned PDFs
def load_pdf_manually(pdf_path):
doc = fitz.open(pdf_path)
text = ""
for page_num in range(doc.page_count):
page = doc[page_num]
page_text = page.get_text()
# If no text (i.e., scanned PDF), use OCR
if not page_text.strip():
pix = page.get_pixmap()
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
page_text = pytesseract.image_to_string(img)
text += page_text
return text
# Function to handle .docx files
def load_docx_file(docx_path):
doc = docx.Document(docx_path)
full_text = []
for para in doc.paragraphs:
full_text.append(para.text)
return '\n'.join(full_text)
# Function to handle .txt files
def load_txt_file(txt_path):
with open(txt_path, 'r', encoding='utf-8') as f:
return f.read()
# General function to load a file based on its extension
def load_file_based_on_extension(file_path):
if file_path.endswith('.pdf'):
return load_pdf_manually(file_path)
elif file_path.endswith('.docx'):
return load_docx_file(file_path)
elif file_path.endswith('.txt'):
return load_txt_file(file_path)
else:
raise ValueError(f"Unsupported file format: {file_path}")
# Function to process uploaded files and create/update the vector index
def process_upload(files):
global vector_index
if not files:
return "No files uploaded.", None
documents = []
for file_path in files:
try:
text = load_file_based_on_extension(file_path)
documents.append(Document(text=text))
except ValueError as e:
return f"Skipping unsupported file: {file_path} ({e})", None
except Exception as e:
return f"Error processing file {file_path}: {e}", None
if documents:
embed_model = OpenAIEmbedding(model="text-embedding-3-large")
vector_index = VectorStoreIndex.from_documents(documents, embed_model=embed_model)
return f"Successfully indexed {len(documents)} files.", vector_index
else:
return "No valid documents were indexed.", None
# Function to handle queries
def query_app(query, model_name, use_rag_checker):
global vector_index, query_log
if vector_index is None:
return "No documents indexed yet. Please upload documents first.", None
# Initialize the LLM with the selected model
llm = OpenAI(model=model_name)
# Create a query engine and query the indexed documents
response_synthesizer = get_response_synthesizer(llm=llm)
query_engine = vector_index.as_query_engine(llm=llm, response_synthesizer=response_synthesizer)
try:
response = query_engine.query(query)
except Exception as e:
return f"Error during query processing: {e}", None
# Log query and generated response
generated_response = response.response
query_log.append({
"query_id": str(len(query_log) + 1),
"query": query,
"gt_answer": "Placeholder ground truth answer", # Replace with actual ground truth if available
"response": generated_response,
"retrieved_context": [{"text": doc.text} for doc in response.source_nodes]
})
# Initialize metrics dictionary
metrics = {}
# Calculate BERTScore if RAGChecker is selected
if use_rag_checker:
try:
rag_results = RAGResults.from_dict({"results": query_log})
evaluator = RAGChecker(
extractor_name="openai/gpt-4o-mini",
checker_name="openai/gpt-4o-mini",
batch_size_extractor=32,
batch_size_checker=32
)
evaluator.evaluate(rag_results, all_metrics)
metrics = rag_results.metrics
# Calculate BERTScore as an additional metric
gt_answer = ["Placeholder ground truth answer"] # Replace with actual ground truth
candidate = [generated_response]
P, R, F1 = bert_score(candidate, gt_answer, lang="en", verbose=False)
metrics['bertscore'] = {
"precision": P.mean().item() * 100,
"recall": R.mean().item() * 100,
"f1": F1.mean().item() * 100
}
except Exception as e:
metrics['error'] = f"Error calculating metrics: {e}"
if use_rag_checker:
return generated_response, metrics
else:
return generated_response, None
# Define the Gradio interface
def main():
with gr.Blocks(title="Document Processing App") as demo:
gr.Markdown("# π Document Processing and Querying App")
with gr.Tab("π€ Upload Documents"):
gr.Markdown("### Upload PDF, DOCX, or TXT files to index")
with gr.Row():
file_upload = gr.File(label="Upload Files", file_count="multiple", type="filepath")
upload_button = gr.Button("Upload and Index")
upload_status = gr.Textbox(label="Status", interactive=False)
upload_button.click(
fn=process_upload,
inputs=[file_upload],
outputs=[upload_status]
)
with gr.Tab("β Ask a Question"):
gr.Markdown("### Query the indexed documents")
with gr.Column():
query_input = gr.Textbox(label="Enter your question", placeholder="Type your question here...")
model_dropdown = gr.Dropdown(
choices=["gpt-3.5-turbo", "gpt-4"],
value="gpt-3.5-turbo",
label="Select Model"
)
rag_checkbox = gr.Checkbox(label="Use RAG Checker", value=True)
query_button = gr.Button("Ask")
with gr.Column():
answer_output = gr.Textbox(label="Answer", interactive=False)
metrics_output = gr.JSON(label="Metrics")
query_button.click(
fn=query_app,
inputs=[query_input, model_dropdown, rag_checkbox],
outputs=[answer_output, metrics_output]
)
gr.Markdown("""
---
**Note:** Ensure you upload documents before attempting to query. Metrics are calculated only if RAG Checker is enabled.
""")
demo.launch()
if __name__ == "__main__":
main()
|