Spaces:
Sleeping
Sleeping
# Import libraries | |
import os | |
import requests | |
import re | |
from yt_dlp import YoutubeDL | |
from langchain.document_loaders import PyPDFLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from sentence_transformers import SentenceTransformer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
from huggingface_hub import login | |
import arxiv | |
import numpy as np | |
import torch # Add torch to explicitly set the device | |
import gradio as gr | |
# Access the Hugging Face token from the environment variable | |
HF_TOKEN = os.getenv("HF_Token") | |
login(token=HF_TOKEN) | |
# Initialize the embedding model | |
embedding_model = SentenceTransformer('all-MiniLM-L6-v2') | |
# Define paths for downloaded content and database | |
file_paths = { | |
"video": "./Machine Learning.mp4", # Replace with actual paths | |
"paper": "./DeepSeek_v3.pdf", | |
} | |
download_path = "./downloads" | |
papers_path = "./papers" | |
os.makedirs(download_path, exist_ok=True) | |
os.makedirs(papers_path, exist_ok=True) | |
# Load LLaMA 2 (set to use CPU) | |
model_name = "meta-llama/Llama-3.2-1B-Instruct" | |
tokenizer = AutoTokenizer.from_pretrained(model_name, use_auth_token=True) | |
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float32) # Ensure float32 for CPU | |
model.to("cpu") # Explicitly set the model to use the CPU | |
# Define utility functions | |
def compute_similarity(query_embedding, content_embeddings): | |
"""Compute cosine similarity between query and content embeddings.""" | |
similarities = cosine_similarity([query_embedding], content_embeddings).flatten() | |
return similarities | |
def add_local_files(module): | |
"""Add local files from the database to the metadata.""" | |
if module not in file_paths: | |
return [] | |
file_path = file_paths[module] | |
if module == "video": | |
return [{"title": os.path.basename(file_path), "url": None, "file_path": file_path, "type": "video"}] | |
elif module == "paper": | |
return [{"title": os.path.basename(file_path), "url": None, "file_path": file_path, "type": "paper"}] | |
def download_youtube_video(video_url, output_dir, title=None): | |
"""Download a YouTube video using yt_dlp.""" | |
sanitized_title = re.sub(r'[\\/*?:"<>|]', '_', title) if title else "unknown_title" | |
ydl_opts = { | |
'quiet': True, | |
'outtmpl': f"{output_dir}/{sanitized_title}.%(ext)s", | |
'format': 'best', | |
} | |
try: | |
with YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(video_url, download=True) | |
downloaded_file = ydl.prepare_filename(info) | |
return downloaded_file | |
except Exception as e: | |
print(f"Failed to download video {video_url}. Error: {e}") | |
return None | |
def fetch_and_download_youtube_video(query, output_dir="./videos"): | |
"""Fetch and download a YouTube video based on a query.""" | |
print(f"Fetching YouTube video for query: '{query}'") | |
ydl_opts = { | |
'quiet': True, | |
'format': 'best', | |
'outtmpl': f"{output_dir}/%(title)s.%(ext)s", # Default template | |
} | |
try: | |
with YoutubeDL(ydl_opts) as ydl: | |
search_results = ydl.extract_info(f"ytsearch:{query}", download=False) | |
if 'entries' not in search_results or len(search_results['entries']) == 0: | |
print(f"No YouTube results found for query: '{query}'") | |
return [] | |
video_info = search_results['entries'][0] | |
video_title = video_info.get("title", "unknown_title") | |
video_url = video_info.get("webpage_url", None) | |
if not video_url: | |
print("No URL found for the video.") | |
return [] | |
local_path = download_youtube_video(video_url, output_dir, title=video_title) | |
if not local_path: | |
return [] | |
print(f"Successfully downloaded video: {video_title}") | |
return [{"title": video_title, "url": video_url, "file_path": local_path, "type": "video"}] | |
except Exception as e: | |
print(f"Error fetching YouTube video for query '{query}': {e}") | |
return [] | |
def fetch_from_arxiv(query="machine learning", max_results=2, output_dir="./papers"): | |
"""Fetch papers from arXiv and download their PDFs.""" | |
print(f"Fetching papers for query: {query}") | |
client = arxiv.Client() | |
search = arxiv.Search( | |
query=query, | |
max_results=max_results, | |
sort_by=arxiv.SortCriterion.Relevance | |
) | |
metadata = [] | |
for i, result in enumerate(client.results(search)): | |
pdf_url = result.pdf_url | |
filename = f"{query.replace(' ', '_')}_arxiv_{i}.pdf" | |
local_path = os.path.join(output_dir, filename) | |
try: | |
response = requests.get(pdf_url) | |
if response.status_code == 200: | |
with open(local_path, 'wb') as f: | |
f.write(response.content) | |
print(f"Downloaded paper: {filename}") | |
metadata.append({"title": result.title, "url": pdf_url, "file_path": local_path, "type": "paper"}) | |
else: | |
print(f"Failed to download paper: {pdf_url}. Status code: {response.status_code}") | |
except Exception as e: | |
print(f"Error downloading paper: {e}") | |
return metadata | |
def generate_llama_response(query, context=None): | |
"""Generate a response using LLaMA 2.""" | |
input_text = f"Query: {query}\n" | |
if context: | |
input_text += f"Context: {context}\n" | |
input_text += "Answer:" | |
inputs = tokenizer(input_text, return_tensors="pt") | |
outputs = model.generate(inputs["input_ids"], max_length=40, temperature=0.7) | |
response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
return response | |
def hybrid_rag_system_with_llama(query): | |
"""Use LLaMA 2 to generate a final response after retrieving the best video and paper.""" | |
modules = ["video", "paper"] | |
final_results = {} | |
query_embedding = embedding_model.encode(query) | |
for module in modules: | |
metadata = [] | |
metadata.extend(add_local_files(module)) | |
if module == "video": | |
metadata.extend(fetch_and_download_youtube_video(query, output_dir=download_path)) | |
elif module == "paper": | |
metadata.extend(fetch_from_arxiv(query, max_results=2, output_dir=papers_path)) | |
if metadata: | |
descriptions = [f"{item['title']} ({item['type']})" for item in metadata] | |
description_embeddings = [embedding_model.encode(description) for description in descriptions] | |
similarities = compute_similarity(query_embedding, description_embeddings) | |
for idx, item in enumerate(metadata): | |
item["similarity"] = similarities[idx] | |
best_match_idx = np.argmax(similarities) | |
final_results[module] = { | |
"best_match": metadata[best_match_idx], | |
"similarity": similarities[best_match_idx], | |
"all_metadata": metadata, | |
} | |
else: | |
final_results[module] = {"best_match": None, "similarity": None, "all_metadata": []} | |
video_context = f"Best Video: {final_results['video']['best_match']['title']}" if final_results['video']['best_match'] else "No relevant video found." | |
paper_context = f"Best Paper: {final_results['paper']['best_match']['title']}" if final_results['paper']['best_match'] else "No relevant paper found." | |
context = f"{video_context}\n{paper_context}" | |
final_response = generate_llama_response(query, context) | |
return final_results, final_response | |
# Define Gradio interface | |
def gradio_interface(query): | |
"""Gradio wrapper for hybrid RAG system.""" | |
_, final_response = hybrid_rag_system_with_llama(query) | |
return final_response | |
# Create Gradio app | |
interface = gr.Interface( | |
fn=gradio_interface, | |
inputs=gr.Textbox(label="Enter your query", placeholder="e.g., short easy machine learning"), | |
outputs=gr.Textbox(label="Generated Response"), | |
title="Hybrid RAG System with LLaMA", | |
description="Enter a query to retrieve relevant resources and generate a response using LLaMA." | |
) | |
# Launch Gradio app | |
if __name__ == "__main__": | |
interface.launch() | |