import os import time from googleapiclient.discovery import build import asyncio import httpx from bs4 import BeautifulSoup from dotenv import load_dotenv import html2text import requests import unicodedata import fitz load_dotenv() API_KEY = os.environ.get("GOOGLE_SEARCH_API_KEY") CSE_KEY = os.environ.get("GOOGLE_SEARCH_CSE_ID") # Number of pages to scrape NUM_PAGES = 10 # load html2text and set up configs h2t = html2text.HTML2Text() h2t.bodywidth = 0 # No wrapping h2t.ignore_links = True # Ignore hyperlinks h2t.ignore_images = True # Ignore images h2t.ignore_emphasis = True # Ignore emphasis h2t.ignore_tables = False # Include tables h2t.skip_internal_links = True # Skip internal links h2t.skip_external_links = True # Skip external links h2t.single_line_break = True # Use single line breaks h2t.protect_links = True # Protect links from being split h2t.default_image_alt = "[image]" # Default alt text for images def clean_html(text): text = h2t.handle(text) text = unicodedata.normalize("NFKD", text).encode("ASCII", "ignore").decode("ASCII") # Remove non-ASCII characters return text def build_results_beautifulsoup(url_list): print("Starting to scrape URLs...") start_time = time.perf_counter() # scrape URLs in list soups = asyncio.run(parallel_scrap(url_list)) scraping_time = time.perf_counter() - start_time print(f"Scraping processing time: {scraping_time:.2f} seconds") result_content = {} count = 0 print("Starting to process each URL...") for url, soup in zip(url_list, soups): if count >= NUM_PAGES: print(f"Reached the limit of {NUM_PAGES} pages. Stopping processing.") break if soup: print(f"Processing URL: {url}") text = clean_html(soup.text) if len(text) > 500: print(f"Adding content from URL: {url}, content length: {len(text)}") result_content[url] = text count += 1 else: print(f"Skipped URL: {url}, content too short (length: {len(text)})") else: print(f"Skipped URL: {url}, no soup content available.") print("Finished processing URLs.") return result_content def build_results_extractor(url_list): try: endpoint = "https://extractorapi.com/api/v1/extractor" result_content = {} count = 0 for url in url_list: if count >= NUM_PAGES: break params = {"apikey": os.environ.get("EXTRACTOR_API_KEY"), "url": url} r = requests.get(endpoint, params=params) if r.status_code == 200: text = r.json()["text"] if len(text) > 500: result_content[url] = text count += 1 if r.status_code == 403: raise Exception(f"Error with API; using default implementaion instead") return result_content except Exception as e: print(e) return build_results_beautifulsoup(url_list) months = { "January": "01", "February": "02", "March": "03", "April": "04", "May": "05", "June": "06", "July": "07", "August": "08", "September": "09", "October": "10", "November": "11", "December": "12", } domain_list = ["com", "org", "net", "int", "edu", "gov", "mil"] def build_date(year=2024, month="March", day=1): return f"{year}{months[month]}{day}" async def get_url_data(url, client): try: r = await client.get(url) if r.status_code == 200: content_type = r.headers.get("Content-Type", "").lower() # detect if pdf if "application/pdf" in content_type or url.lower().endswith(".pdf"): pdf_content = await extract_pdf_text(r.content) return BeautifulSoup(pdf_content, "html.parser") else: return BeautifulSoup(r.content, "html.parser") except Exception: return None async def extract_pdf_text(content): try: with fitz.open(stream=content, filetype="pdf") as doc: text = "" for page in doc: text += page.get_text() return f"
{text}
" # Wrap in a div to make it valid HTML except Exception as e: print(f"Error extracting PDF text: {str(e)}") return "
Error extracting PDF text
" async def parallel_scrap(urls): async with httpx.AsyncClient(timeout=30) as client: tasks = [] for url in urls: tasks.append(get_url_data(url=url, client=client)) results = await asyncio.gather(*tasks, return_exceptions=True) return results def scrap(urls): client = httpx.Client() soups = [] for url in urls: soups.append(get_url_data(url=url, client=client)) return soups def google_search_urls( text, sorted_date, domains_to_include, api_key, cse_id, **kwargs, ): service = build("customsearch", "v1", developerKey=api_key) results = service.cse().list(q=text, cx=cse_id, sort=sorted_date, **kwargs).execute() url_list = [] if "items" in results and len(results["items"]) > 0: for count, link in enumerate(results["items"]): # skip user selected domains if (domains_to_include is None) or not any( ("." + domain) in link["link"] for domain in domains_to_include ): continue url = link["link"] if url not in url_list: url_list.append(url) return url_list def google_search( topic, sorted_date, domains_to_include, ): api_key = os.environ.get("GOOGLE_SEARCH_API_KEY") cse_id = os.environ.get("GOOGLE_SEARCH_CSE_ID") start_time = time.perf_counter() url_list = google_search_urls( topic, sorted_date, domains_to_include, api_key, cse_id, ) print("Google Search processing time: ", time.perf_counter() - start_time) result_content = build_results_beautifulsoup(url_list) return result_content