File size: 9,363 Bytes
9cac175 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
from setup import *
import tempfile
import requests
from langchain_community.document_loaders import PyPDFLoader, WebBaseLoader
# from langchain_text_splitters import RecursiveCharacterTextSplitter
from urllib.parse import urlparse
from langchain.docstore.document import Document
def extract_urls(agentstate_result):
urls = []
content=[]
for item in agentstate_result['link_list']:
urls.append(item['url'])
content.append(item['content'])
return urls, content
# Function to classify URL based on file extension
def classify_url_by_extension(url):
"""
Classifies a URL based on its file extension.
Focuses only on pdf and html, classifying others as unknown.
"""
if not isinstance(url, str):
raise ValueError(f"Expected a string, but got {type(url)}")
# Extract the file extension from the URL
try:
file_extension = urlparse(url).path.split('.')[-1].lower()
if file_extension == 'pdf':
return 'pdf'
elif file_extension in ['html', 'htm']:
return 'html'
else:
return 'unknown'
except Exception as e:
print(f"Error while parsing URL: {url} - {e}")
return 'unknown'
# Function to classify based on HTTP Content-Type header (optional, for extra accuracy)
def classify_url_by_header(url):
"""
Classifies a URL based on the HTTP Content-Type header.
Focuses only on pdf and html, classifying others as unknown.
"""
try:
response = requests.head(url, timeout=5) # Use HEAD request to fetch headers
content_type = response.headers.get('Content-Type', '').lower()
if 'pdf' in content_type:
return 'pdf'
elif 'html' in content_type:
return 'html'
else:
return 'unknown'
except requests.RequestException as e:
print(f"Error while making HEAD request: {url} - {e}")
return 'unknown'
# Function to classify a list of URLs
def urls_classify_list(urls):
"""
Classifies a list of URLs into pdf, html, and unknown.
Returns two separate lists: one for pdf URLs and one for html URLs.
"""
if not isinstance(urls, list):
raise ValueError("Expected a list of URLs")
pdf_urls = []
html_urls = []
# Classify each URL
for url in urls:
file_type = classify_url_by_extension(url) # First, try classifying by extension
if file_type == 'unknown':
# If extension-based classification failed, fall back to HTTP header classification
file_type = classify_url_by_header(url)
if file_type == 'pdf':
pdf_urls.append(url)
elif file_type == 'html':
html_urls.append(url)
return pdf_urls, html_urls
def urls_classify_list(urls: list):
pdf_urls=[]
html_urls=[]
# Classify the URLs
for url in urls:
file_type = classify_url_by_extension(url) # First, try classifying by extension
if file_type == 'unknown':
# If extension-based classification failed, fall back to HTTP header classification
file_type = classify_url_by_header(url)
if file_type == 'pdf':
pdf_urls.append(url)
if file_type == 'html' or file_type == 'unknown':
html_urls.append(url)
return pdf_urls, html_urls
def clean_and_extract_html_data(html_urls, chunk_size=100, chunk_overlap=25):
"""
Loads HTML content from URLs, cleans the data, and splits it into smaller chunks.
Args:
html_urls (list): List of HTML URLs to process.
chunk_size (int): Maximum size of each chunk.
chunk_overlap (int): Overlap between chunks.
Returns:
list: List of document chunks.
"""
def clean_content(content):
"""
Cleans the content by removing unwanted patterns and short lines.
"""
cleaned_content = content.strip() # Remove leading/trailing whitespace
lines = cleaned_content.split('\n') # Split by newlines
meaningful_lines = [line.strip() for line in lines if len(line.strip()) > 3] # Keep meaningful lines
return '\n'.join(meaningful_lines)
def split_document(doc_content, chunk_size, chunk_overlap):
"""
Splits a document into smaller chunks with overlap.
"""
chunks = []
start = 0
while start < len(doc_content):
end = start + chunk_size
chunk = doc_content[start:end]
chunks.append(chunk)
start = end - chunk_overlap if end < len(doc_content) else len(doc_content)
return chunks
# Step 1: Load documents from URLs
docs = []
for url in html_urls:
try:
loader = WebBaseLoader(url)
data = loader.load()
docs.extend(data)
except Exception as e:
print(f"Error loading URL {url}: {e}")
# Step 2: Clean the content to remove unwanted data
cleaned_docs = []
for doc in docs:
cleaned_content = clean_content(doc.page_content)
if cleaned_content: # Exclude empty documents
doc.page_content = cleaned_content
cleaned_docs.append(doc)
# Step 3: Split the cleaned documents into chunks
doc_splits = []
for doc in cleaned_docs:
chunks = split_document(doc.page_content, chunk_size, chunk_overlap)
for chunk in chunks:
doc_splits.append(Document(page_content=chunk, metadata=doc.metadata))
return doc_splits
# def extract_pdf_from_url(url):
# """
# Extract text from a PDF available at a URL.
# Args:
# url (str): The URL of the PDF file.
# Returns:
# str: Extracted text from the PDF.
# """
# # Step 1: Download the PDF from the URL
# response = requests.get(url)
# if response.status_code == 200:
# pdf_content = response.content
# else:
# raise ValueError(f"Failed to fetch the PDF. HTTP Status Code: {response.status_code}")
# # Step 2: Save PDF content to a temporary file
# with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf:
# temp_pdf.write(pdf_content)
# temp_pdf_path = temp_pdf.name # Get the file path
# # Step 3: Load the PDF using PyPDFLoader
# loader = PyPDFLoader(temp_pdf_path)
# documents = loader.load()
# # Step 4: Extract text from all pages
# extracted_text = "\n".join(doc.page_content for doc in documents)
# return extracted_text
# def clean_and_split_pdf_text(pdf_text, chunk_size=100, chunk_overlap=25):
# """
# Cleans and splits the extracted PDF text into smaller chunks.
# Args:
# pdf_text (str): Extracted text from a PDF.
# chunk_size (int): Maximum size of each chunk.
# chunk_overlap (int): Overlap between chunks.
# Returns:
# list: List of document chunks.
# """
# def clean_content(content):
# """
# Cleans the text by removing unwanted patterns and short lines.
# """
# content = content.strip() # Remove leading/trailing whitespace
# lines = content.split('\n') # Split into lines
# meaningful_lines = [line.strip() for line in lines if len(line.strip()) > 3] # Exclude short lines
# return '\n'.join(meaningful_lines)
# def split_text(content, chunk_size, chunk_overlap):
# """
# Splits cleaned text into smaller chunks with overlap.
# """
# chunks = []
# start = 0
# while start < len(content):
# end = start + chunk_size
# chunks.append(content[start:end])
# start = end - chunk_overlap if end < len(content) else len(content)
# return chunks
# # Step 1: Clean the text
# cleaned_text = clean_content(pdf_text)
# # Step 2: Split the cleaned text
# return split_text(cleaned_text, chunk_size, chunk_overlap)
# def pdf_extraction(pdf_urls, chunk_size=100, chunk_overlap=25):
# """
# Extracts and processes text from a list of PDF URLs.
# Args:
# pdf_urls (list): List of PDF URLs.
# chunk_size (int): Maximum size of each chunk.
# chunk_overlap (int): Overlap between chunks.
# Returns:
# list: List of Document objects containing split text.
# """
# all_chunks = []
# for pdf_url in pdf_urls:
# try:
# # Extract text from the PDF
# extracted_text = extract_pdf_from_url(pdf_url)
# # Clean and split the text
# chunks = clean_and_split_pdf_text(extracted_text, chunk_size, chunk_overlap)
# # Convert chunks into Document objects
# for chunk in chunks:
# all_chunks.append(Document(page_content=chunk, metadata={"source": pdf_url}))
# except Exception as e:
# print(f"Error processing PDF URL {pdf_url}: {e}")
# return all_chunks
|