search_agent / web_crawler.py
Eddie Pick
Improvements
6f80de5 unverified
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import quote
import os
import io
from trafilatura import extract
from selenium.common.exceptions import TimeoutException
from langchain_core.documents.base import Document
from langchain_experimental.text_splitter import SemanticChunker
from langchain.text_splitter import RecursiveCharacterTextSplitter, TokenTextSplitter
from langchain_community.vectorstores.faiss import FAISS
from langsmith import traceable
import requests
import pdfplumber
@traceable(run_type="tool", name="get_sources")
def get_sources(query, max_pages=10, domain=None):
search_query = query
if domain:
search_query += f" site:{domain}"
url = f"https://api.search.brave.com/res/v1/web/search?q={quote(search_query)}&count={max_pages}"
headers = {
'Accept': 'application/json',
'Accept-Encoding': 'gzip',
'X-Subscription-Token': os.getenv("BRAVE_SEARCH_API_KEY")
}
try:
response = requests.get(url, headers=headers, timeout=30)
if response.status_code != 200:
return []
json_response = response.json()
if 'web' not in json_response or 'results' not in json_response['web']:
print(response.text)
raise Exception('Invalid API response format')
final_results = [{
'title': result['title'],
'link': result['url'],
'snippet': extract(result['description'], output_format='txt', include_tables=False, include_images=False, include_formatting=True),
'favicon': result.get('profile', {}).get('img', '')
} for result in json_response['web']['results']]
return final_results
except Exception as error:
print('Error fetching search results:', error)
raise
def fetch_with_selenium(url, driver, timeout=8,):
try:
driver.set_page_load_timeout(timeout)
driver.get(url)
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
html = driver.page_source
except TimeoutException:
print(f"Page load timed out after {timeout} seconds.")
html = None
finally:
driver.quit()
return html
def fetch_with_timeout(url, timeout=8):
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
return response
except requests.RequestException as error:
return None
def process_source(source):
url = source['link']
response = fetch_with_timeout(url, 2)
if response:
content_type = response.headers.get('Content-Type')
if content_type:
if content_type.startswith('application/pdf'):
# The response is a PDF file
pdf_content = response.content
# Create a file-like object from the bytes
pdf_file = io.BytesIO(pdf_content)
# Extract text from PDF using pdfplumber
with pdfplumber.open(pdf_file) as pdf:
text = ""
for page in pdf.pages:
text += page.extract_text()
return {**source, 'page_content': text}
elif content_type.startswith('text/html'):
# The response is an HTML file
html = response.text
main_content = extract(html, output_format='txt', include_links=True)
return {**source, 'page_content': main_content}
else:
print(f"Skipping {url}! Unsupported content type: {content_type}")
return {**source, 'page_content': source['snippet']}
else:
print(f"Skipping {url}! No content type")
return {**source, 'page_content': source['snippet']}
return {**source, 'page_content': None}
@traceable(run_type="tool", name="get_links_contents")
def get_links_contents(sources, get_driver_func=None, use_selenium=False):
with ThreadPoolExecutor() as executor:
results = list(executor.map(process_source, sources))
if get_driver_func is None or not use_selenium:
return [result for result in results if result is not None and result['page_content']]
for result in results:
if result['page_content'] is None:
url = result['link']
print(f"Fetching with selenium {url}")
driver = get_driver_func()
html = fetch_with_selenium(url, driver)
main_content = extract(html, output_format='txt', include_links=True)
if main_content:
result['page_content'] = main_content
return results
@traceable(run_type="embedding")
def vectorize(contents, embedding_model):
documents = []
for content in contents:
try:
page_content = content['page_content']
if page_content:
metadata = {'title': content['title'], 'source': content['link']}
doc = Document(page_content=content['page_content'], metadata=metadata)
documents.append(doc)
except Exception as e:
print(f"Error processing content for {content['link']}: {e}")
# Initialize recursive text splitter
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
# Split documents
split_documents = text_splitter.split_documents(documents)
# Create vector store
vector_store = None
batch_size = 250 # Slightly less than 256 to be safe
for i in range(0, len(split_documents), batch_size):
batch = split_documents[i:i+batch_size]
if vector_store is None:
vector_store = FAISS.from_documents(batch, embedding_model)
else:
texts = [doc.page_content for doc in batch]
metadatas = [doc.metadata for doc in batch]
embeddings = embedding_model.embed_documents(texts)
vector_store.add_embeddings(
list(zip(texts, embeddings)),
metadatas
)
return vector_store