Spaces:
Sleeping
Sleeping
import gradio as gr | |
import PyPDF2 | |
import io | |
import os | |
from dotenv import load_dotenv | |
from pinecone import Pinecone, ServerlessSpec | |
from openai import OpenAI | |
import uuid | |
import re | |
import time | |
# Load environment variables from .env file | |
load_dotenv() | |
# Initialize OpenAI client | |
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
# Initialize Pinecone | |
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENVIRONMENT") | |
INDEX_NAME = "ghana" | |
EMBEDDING_MODEL = "text-embedding-3-large" | |
EMBEDDING_DIMENSION = 3072 | |
# Initialize Pinecone | |
pc = Pinecone(api_key=PINECONE_API_KEY) | |
# Check if the index exists | |
if INDEX_NAME not in pc.list_indexes().names(): | |
# Create the index with updated dimensions | |
pc.create_index( | |
name=INDEX_NAME, | |
dimension=EMBEDDING_DIMENSION, | |
metric="cosine", | |
spec=ServerlessSpec( | |
cloud=PINECONE_ENVIRONMENT.split('-')[0], # Assuming environment is in format 'gcp-starter' | |
region=PINECONE_ENVIRONMENT.split('-')[1] | |
) | |
) | |
else: | |
# Optionally, verify the existing index's dimension matches | |
existing_index = pc.describe_index(INDEX_NAME) | |
if existing_index.dimension != EMBEDDING_DIMENSION: | |
raise ValueError(f"Existing index '{INDEX_NAME}' has dimension {existing_index.dimension}, expected {EMBEDDING_DIMENSION}. Please choose a different index name or adjust accordingly.") | |
# Connect to the Pinecone index | |
index = pc.Index(INDEX_NAME) | |
def transcribe_pdf(pdf_file): | |
print("Starting PDF transcription...") | |
# Read PDF and extract text | |
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file)) | |
text = "" | |
for page in pdf_reader.pages: | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n" | |
print(f"Extracted {len(text)} characters from PDF.") | |
# Dynamic Chunking | |
chunks = dynamic_chunking(text, max_tokens=500, overlap=50) | |
print(f"Created {len(chunks)} chunks from the extracted text.") | |
# Process chunks one by one | |
for i, chunk in enumerate(chunks): | |
print(f"Processing chunk {i+1}/{len(chunks)}...") | |
# Generate embedding for the chunk | |
embedding = get_embedding(chunk) | |
# Prepare upsert data | |
upsert_data = [(str(uuid.uuid4()), embedding, {"text": chunk})] | |
# Upsert to Pinecone | |
print(f"Upserting vector to Pinecone index '{INDEX_NAME}'...") | |
index.upsert(vectors=upsert_data) | |
# Optional: Add a small delay to avoid potential rate limits | |
time.sleep(0.5) | |
return f"Successfully processed and upserted {len(chunks)} chunks to Pinecone index '{INDEX_NAME}'." | |
def dynamic_chunking(text, max_tokens=500, overlap=50): | |
print(f"Starting dynamic chunking with max_tokens={max_tokens} and overlap={overlap}...") | |
tokens = re.findall(r'\S+', text) | |
chunks = [] | |
start = 0 | |
while start < len(tokens): | |
end = start + max_tokens | |
chunk = ' '.join(tokens[start:end]) | |
chunks.append(chunk) | |
start += max_tokens - overlap | |
print(f"Dynamic chunking complete. Created {len(chunks)} chunks.") | |
return chunks | |
def get_embedding(chunk): | |
print("Generating embedding for chunk...") | |
print(chunk) | |
try: | |
response = client.embeddings.create( | |
input=chunk, # Now we can pass the chunk directly | |
model=EMBEDDING_MODEL | |
) | |
print(chunk) | |
embedding = response.data[0].embedding | |
print("Successfully generated embedding.") | |
return embedding | |
except Exception as e: | |
print(f"Error during embedding generation: {str(e)}") | |
raise e | |
def clear_database(): | |
print("Clearing the Pinecone index...") | |
try: | |
index.delete(delete_all=True) | |
return "Successfully cleared all vectors from the Pinecone index." | |
except Exception as e: | |
print(f"Error clearing the Pinecone index: {str(e)}") | |
return f"Error clearing the Pinecone index: {str(e)}" | |
# Create the Gradio app using Blocks | |
with gr.Blocks() as app: | |
gr.Markdown("# PDF Transcription and Pinecone Database Management") | |
with gr.Tab("Transcribe PDF"): | |
gr.Markdown("Upload a PDF file to extract its text content, chunk it dynamically, and upsert the chunks to a Pinecone index named 'ghana'.") | |
pdf_input = gr.File(label="Upload PDF", type="binary") | |
transcribe_button = gr.Button("Transcribe and Upsert") | |
transcription_output = gr.Textbox(label="Transcription Result") | |
transcribe_button.click(fn=transcribe_pdf, inputs=pdf_input, outputs=transcription_output) | |
with gr.Tab("Clear Database"): | |
gr.Markdown("Click the button to clear all vectors from the Pinecone index.") | |
clear_button = gr.Button("Clear Database") | |
clear_output = gr.Textbox(label="Clear Database Result") | |
clear_button.click(fn=clear_database, outputs=clear_output) | |
if __name__ == "__main__": | |
app.launch() |