Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Request, Query, status | |
from fastapi.responses import StreamingResponse | |
import os | |
import logging | |
import uuid | |
from datetime import datetime | |
from pydantic import BaseModel, Field | |
from typing import Optional, List, Any | |
from urllib.parse import urlparse | |
import shutil | |
# from app.wrapper.llm_wrapper import * | |
from app.crud.process_file import load_file_with_markitdown, process_uploaded_file | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def is_url(path: str) -> bool: | |
""" | |
Determines if the given path is a URL. | |
Args: | |
path (str): The path or URL to check. | |
Returns: | |
bool: True if it's a URL, False otherwise. | |
""" | |
try: | |
result = urlparse(path) | |
return all([result.scheme, result.netloc]) | |
except Exception: | |
return False | |
file_router = APIRouter() | |
# Configure logging to file with date-based filenames | |
log_filename = f"document_logs_{datetime.now().strftime('%Y-%m-%d')}.txt" | |
file_handler = logging.FileHandler(log_filename) | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
file_handler.setFormatter(formatter) | |
# Create a logger for document processing | |
doc_logger = logging.getLogger('document_logger') | |
doc_logger.setLevel(logging.INFO) | |
doc_logger.addHandler(file_handler) | |
# Also configure the general logger if not already configured | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
from app.search.rag_pipeline import RAGSystem | |
from sentence_transformers import SentenceTransformer | |
async def load_file_with_markdown(request: Request, filepaths: List[str]): | |
try: | |
# Ensure RAG system is initialized | |
try: | |
rag_system = request.app.state.rag_system | |
if rag_system is None: | |
raise AttributeError("RAG system is not initialized in app state") | |
except AttributeError: | |
logger.error("RAG system is not initialized in app state") | |
raise HTTPException(status_code=500, detail="RAG system not initialized in app state") | |
processed_files = [] | |
pages = [] | |
# Process each file path or URL | |
for path in filepaths: | |
if is_url(path): | |
logger.info(f"Processing URL: {path}") | |
try: | |
# Generate a unique UUID for the document | |
doc_id = str(uuid.uuid4()) | |
# Process the URL | |
document = await process_uploaded_file(id=doc_id, file_path=path, rag_system=rag_system) | |
# Append the document details to pages | |
pages.append({ | |
"metadata": {"title": document.title}, | |
"page_content": document.text_content, | |
}) | |
logger.info(f"Successfully processed URL: {path} with ID: {doc_id}") | |
# Log the ID and a 100-character snippet of the document | |
snippet = document.text_content[:100].replace('\n', ' ').replace('\r', ' ') | |
# Ensure 'doc_logger' is defined; if not, use 'logger' or define 'doc_logger' | |
doc_logger.info(f"ID: {doc_id}_{document.title}, Snippet: {snippet}") | |
except Exception as e: | |
logger.error(f"Error processing URL {path}: {str(e)}") | |
processed_files.append({"path": path, "status": "error", "message": str(e)}) | |
else: | |
logger.info(f"Processing local file: {path}") | |
if os.path.exists(path): | |
try: | |
# Generate a unique UUID for the document | |
doc_id = str(uuid.uuid4()) | |
# Process the local file | |
document = await process_uploaded_file(id=doc_id, file_path=path, rag_system=rag_system) | |
# Append the document details to pages | |
pages.append({ | |
"metadata": {"title": document.title}, | |
"page_content": document.text_content, | |
}) | |
logger.info(f"Successfully processed file: {path} with ID: {doc_id}") | |
# Log the ID and a 100-character snippet of the document | |
snippet = document.text_content[:100].replace('\n', ' ').replace('\r', ' ') | |
# Ensure 'doc_logger' is defined; if not, use 'logger' or define 'doc_logger' | |
logger.info(f"ID: {doc_id}, Snippet: {snippet}") | |
except Exception as e: | |
logger.error(f"Error processing file {path}: {str(e)}") | |
processed_files.append({"path": path, "status": "error", "message": str(e)}) | |
else: | |
logger.error(f"File path does not exist: {path}") | |
processed_files.append({"path": path, "status": "not found"}) | |
# Get total tokens from RAG system | |
total_tokens = rag_system.get_total_tokens() if hasattr(rag_system, "get_total_tokens") else 0 | |
return { | |
"message": "File processing completed", | |
"total_tokens": total_tokens, | |
"document_count": len(filepaths), | |
"pages": pages, | |
"errors": processed_files, # Include details about files that couldn't be processed | |
} | |
except Exception as e: | |
logger.exception("Unexpected error during file processing") | |
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") | |
async def load_file_with_markdown_function(filepaths: List[str], | |
rag_system: Any): | |
try: | |
# Ensure RAG system is initialized | |
try: | |
rag_system = rag_system | |
except AttributeError: | |
logger.error("RAG system is not initialized in app state") | |
raise HTTPException(status_code=500, detail="RAG system not initialized in app state") | |
processed_files = [] | |
pages = [] | |
# Process each file path or URL | |
for path in filepaths: | |
if is_url(path): | |
logger.info(f"Processing URL: {path}") | |
try: | |
# Generate a unique UUID for the document | |
doc_id = str(uuid.uuid4()) | |
# Process the URL | |
document = await process_uploaded_file(id=doc_id, file_path=path, rag_system=rag_system) | |
# Append the document details to pages | |
pages.append({ | |
"metadata": {"title": document.title}, | |
"page_content": document.text_content, | |
}) | |
logger.info(f"Successfully processed URL: {path} with ID: {doc_id}") | |
# Log the ID and a 100-character snippet of the document | |
snippet = document.text_content[:100].replace('\n', ' ').replace('\r', ' ') | |
# Ensure 'doc_logger' is defined; if not, use 'logger' or define 'doc_logger' | |
doc_logger(f"ID: {doc_id}, Snippet: {snippet}") | |
logger.info(f"ID: {doc_id}, Snippet: {snippet}") | |
except Exception as e: | |
logger.error(f"Error processing URL {path}: {str(e)}") | |
processed_files.append({"path": path, "status": "error", "message": str(e)}) | |
else: | |
logger.info(f"Processing local file: {path}") | |
if os.path.exists(path): | |
try: | |
# Generate a unique UUID for the document | |
doc_id = str(uuid.uuid4()) | |
# Process the local file | |
document = await process_uploaded_file(id=doc_id, file_path=path, rag_system=rag_system) | |
# Append the document details to pages | |
pages.append({ | |
"metadata": {"title": document.title}, | |
"page_content": document.text_content, | |
}) | |
logger.info(f"Successfully processed file: {path} with ID: {doc_id}") | |
# Log the ID and a 100-character snippet of the document | |
snippet = document.text_content[:100].replace('\n', ' ').replace('\r', ' ') | |
# Ensure 'doc_logger' is defined; if not, use 'logger' or define 'doc_logger' | |
logger.info(f"ID: {doc_id}, Snippet: {snippet}") | |
except Exception as e: | |
logger.error(f"Error processing file {path}: {str(e)}") | |
processed_files.append({"path": path, "status": "error", "message": str(e)}) | |
else: | |
logger.error(f"File path does not exist: {path}") | |
processed_files.append({"path": path, "status": "not found"}) | |
# Get total tokens from RAG system | |
total_tokens = rag_system.get_total_tokens() if hasattr(rag_system, "get_total_tokens") else 0 | |
return { | |
"message": "File processing completed", | |
"total_tokens": total_tokens, | |
"document_count": len(filepaths), | |
"pages": pages, | |
"errors": processed_files, # Include details about files that couldn't be processed | |
} | |
except Exception as e: | |
logger.exception("Unexpected error during file processing") | |
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") | |
async def document_exists(request: Request, doc_id: str): | |
try: | |
rag_system = request.app.state.rag_system | |
except AttributeError: | |
logger.error("RAG system is not initialized in app state") | |
raise HTTPException(status_code=500, detail="RAG system not initialized in app state") | |
exists = doc_id in rag_system.doc_ids | |
return {"document_id": doc_id, "exists": exists} | |
async def delete_document(request: Request, doc_id: str): | |
try: | |
rag_system = request.app.state.rag_system | |
except AttributeError: | |
logger.error("RAG system is not initialized in app state") | |
raise HTTPException(status_code=500, detail="RAG system not initialized in app state") | |
try: | |
rag_system.delete_document(doc_id) | |
logger.info(f"Deleted document with ID: {doc_id}") | |
return {"message": f"Document with ID {doc_id} has been deleted."} | |
except Exception as e: | |
logger.error(f"Error deleting document with ID {doc_id}: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Failed to delete document: {str(e)}") | |