Dharma20's picture
Upload 5 files
9cac175 verified
raw
history blame
9.36 kB
from setup import *
import tempfile
import requests
from langchain_community.document_loaders import PyPDFLoader, WebBaseLoader
# from langchain_text_splitters import RecursiveCharacterTextSplitter
from urllib.parse import urlparse
from langchain.docstore.document import Document
def extract_urls(agentstate_result):
urls = []
content=[]
for item in agentstate_result['link_list']:
urls.append(item['url'])
content.append(item['content'])
return urls, content
# Function to classify URL based on file extension
def classify_url_by_extension(url):
"""
Classifies a URL based on its file extension.
Focuses only on pdf and html, classifying others as unknown.
"""
if not isinstance(url, str):
raise ValueError(f"Expected a string, but got {type(url)}")
# Extract the file extension from the URL
try:
file_extension = urlparse(url).path.split('.')[-1].lower()
if file_extension == 'pdf':
return 'pdf'
elif file_extension in ['html', 'htm']:
return 'html'
else:
return 'unknown'
except Exception as e:
print(f"Error while parsing URL: {url} - {e}")
return 'unknown'
# Function to classify based on HTTP Content-Type header (optional, for extra accuracy)
def classify_url_by_header(url):
"""
Classifies a URL based on the HTTP Content-Type header.
Focuses only on pdf and html, classifying others as unknown.
"""
try:
response = requests.head(url, timeout=5) # Use HEAD request to fetch headers
content_type = response.headers.get('Content-Type', '').lower()
if 'pdf' in content_type:
return 'pdf'
elif 'html' in content_type:
return 'html'
else:
return 'unknown'
except requests.RequestException as e:
print(f"Error while making HEAD request: {url} - {e}")
return 'unknown'
# Function to classify a list of URLs
def urls_classify_list(urls):
"""
Classifies a list of URLs into pdf, html, and unknown.
Returns two separate lists: one for pdf URLs and one for html URLs.
"""
if not isinstance(urls, list):
raise ValueError("Expected a list of URLs")
pdf_urls = []
html_urls = []
# Classify each URL
for url in urls:
file_type = classify_url_by_extension(url) # First, try classifying by extension
if file_type == 'unknown':
# If extension-based classification failed, fall back to HTTP header classification
file_type = classify_url_by_header(url)
if file_type == 'pdf':
pdf_urls.append(url)
elif file_type == 'html':
html_urls.append(url)
return pdf_urls, html_urls
def urls_classify_list(urls: list):
pdf_urls=[]
html_urls=[]
# Classify the URLs
for url in urls:
file_type = classify_url_by_extension(url) # First, try classifying by extension
if file_type == 'unknown':
# If extension-based classification failed, fall back to HTTP header classification
file_type = classify_url_by_header(url)
if file_type == 'pdf':
pdf_urls.append(url)
if file_type == 'html' or file_type == 'unknown':
html_urls.append(url)
return pdf_urls, html_urls
def clean_and_extract_html_data(html_urls, chunk_size=100, chunk_overlap=25):
"""
Loads HTML content from URLs, cleans the data, and splits it into smaller chunks.
Args:
html_urls (list): List of HTML URLs to process.
chunk_size (int): Maximum size of each chunk.
chunk_overlap (int): Overlap between chunks.
Returns:
list: List of document chunks.
"""
def clean_content(content):
"""
Cleans the content by removing unwanted patterns and short lines.
"""
cleaned_content = content.strip() # Remove leading/trailing whitespace
lines = cleaned_content.split('\n') # Split by newlines
meaningful_lines = [line.strip() for line in lines if len(line.strip()) > 3] # Keep meaningful lines
return '\n'.join(meaningful_lines)
def split_document(doc_content, chunk_size, chunk_overlap):
"""
Splits a document into smaller chunks with overlap.
"""
chunks = []
start = 0
while start < len(doc_content):
end = start + chunk_size
chunk = doc_content[start:end]
chunks.append(chunk)
start = end - chunk_overlap if end < len(doc_content) else len(doc_content)
return chunks
# Step 1: Load documents from URLs
docs = []
for url in html_urls:
try:
loader = WebBaseLoader(url)
data = loader.load()
docs.extend(data)
except Exception as e:
print(f"Error loading URL {url}: {e}")
# Step 2: Clean the content to remove unwanted data
cleaned_docs = []
for doc in docs:
cleaned_content = clean_content(doc.page_content)
if cleaned_content: # Exclude empty documents
doc.page_content = cleaned_content
cleaned_docs.append(doc)
# Step 3: Split the cleaned documents into chunks
doc_splits = []
for doc in cleaned_docs:
chunks = split_document(doc.page_content, chunk_size, chunk_overlap)
for chunk in chunks:
doc_splits.append(Document(page_content=chunk, metadata=doc.metadata))
return doc_splits
# def extract_pdf_from_url(url):
# """
# Extract text from a PDF available at a URL.
# Args:
# url (str): The URL of the PDF file.
# Returns:
# str: Extracted text from the PDF.
# """
# # Step 1: Download the PDF from the URL
# response = requests.get(url)
# if response.status_code == 200:
# pdf_content = response.content
# else:
# raise ValueError(f"Failed to fetch the PDF. HTTP Status Code: {response.status_code}")
# # Step 2: Save PDF content to a temporary file
# with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf:
# temp_pdf.write(pdf_content)
# temp_pdf_path = temp_pdf.name # Get the file path
# # Step 3: Load the PDF using PyPDFLoader
# loader = PyPDFLoader(temp_pdf_path)
# documents = loader.load()
# # Step 4: Extract text from all pages
# extracted_text = "\n".join(doc.page_content for doc in documents)
# return extracted_text
# def clean_and_split_pdf_text(pdf_text, chunk_size=100, chunk_overlap=25):
# """
# Cleans and splits the extracted PDF text into smaller chunks.
# Args:
# pdf_text (str): Extracted text from a PDF.
# chunk_size (int): Maximum size of each chunk.
# chunk_overlap (int): Overlap between chunks.
# Returns:
# list: List of document chunks.
# """
# def clean_content(content):
# """
# Cleans the text by removing unwanted patterns and short lines.
# """
# content = content.strip() # Remove leading/trailing whitespace
# lines = content.split('\n') # Split into lines
# meaningful_lines = [line.strip() for line in lines if len(line.strip()) > 3] # Exclude short lines
# return '\n'.join(meaningful_lines)
# def split_text(content, chunk_size, chunk_overlap):
# """
# Splits cleaned text into smaller chunks with overlap.
# """
# chunks = []
# start = 0
# while start < len(content):
# end = start + chunk_size
# chunks.append(content[start:end])
# start = end - chunk_overlap if end < len(content) else len(content)
# return chunks
# # Step 1: Clean the text
# cleaned_text = clean_content(pdf_text)
# # Step 2: Split the cleaned text
# return split_text(cleaned_text, chunk_size, chunk_overlap)
# def pdf_extraction(pdf_urls, chunk_size=100, chunk_overlap=25):
# """
# Extracts and processes text from a list of PDF URLs.
# Args:
# pdf_urls (list): List of PDF URLs.
# chunk_size (int): Maximum size of each chunk.
# chunk_overlap (int): Overlap between chunks.
# Returns:
# list: List of Document objects containing split text.
# """
# all_chunks = []
# for pdf_url in pdf_urls:
# try:
# # Extract text from the PDF
# extracted_text = extract_pdf_from_url(pdf_url)
# # Clean and split the text
# chunks = clean_and_split_pdf_text(extracted_text, chunk_size, chunk_overlap)
# # Convert chunks into Document objects
# for chunk in chunks:
# all_chunks.append(Document(page_content=chunk, metadata={"source": pdf_url}))
# except Exception as e:
# print(f"Error processing PDF URL {pdf_url}: {e}")
# return all_chunks