|
import copy |
|
import json |
|
import logging |
|
import os |
|
import time |
|
import traceback |
|
import urllib.parse as en |
|
import warnings |
|
from concurrent.futures import ThreadPoolExecutor |
|
from itertools import zip_longest |
|
|
|
import requests |
|
from zenrows import ZenRowsClient |
|
|
|
from llmsearch import utilityV2 as ut |
|
|
|
logger = logging.getLogger("agent_logger") |
|
|
|
|
|
def search(msg, query_phrase): |
|
try: |
|
|
|
|
|
google_text = "" |
|
try: |
|
print(f"asking google {msg}; rephrased: {query_phrase}") |
|
google_text = search_google(msg, query_phrase) |
|
except: |
|
traceback.print_exc() |
|
|
|
print("\n\nFinal response: ") |
|
|
|
for item in google_text: |
|
print( |
|
f"\n##############################################################################################\nSource: {item['source']}" |
|
) |
|
print(f"{item['text']}") |
|
print(f"URL: {item['url']}") |
|
return google_text |
|
except KeyboardInterrupt: |
|
traceback.print_exc() |
|
raise KeyboardInterrupt |
|
except: |
|
traceback.print_exc() |
|
return "" |
|
|
|
|
|
|
|
def process_url(url): |
|
processed_page = {} |
|
start_time = time.time() |
|
try: |
|
with warnings.catch_warnings(): |
|
warnings.simplefilter("ignore") |
|
try: |
|
client = ZenRowsClient(os.getenv('zenrows_api_key')) |
|
|
|
|
|
|
|
response = client.get(url) |
|
print(f'got response, status: {response.status_code}') |
|
result = response.text |
|
if len(result) > 0: |
|
if "an error has occurred" not in result.lower() and "permission to view this page" not in result.lower() and "403 ERROR" not in result.lower() and "have been blocked" not in result.lower() and "too many requests" not in result.lower(): |
|
processed_page = { |
|
"source": ut.extract_domain(url), |
|
"url": url, |
|
"text": result, |
|
} |
|
print(f"Processed {url}: {len(result)} {int((time.time() - start_time) * 1000)} ms") |
|
return processed_page |
|
except Exception: |
|
traceback.print_exc() |
|
return processed_page |
|
except Exception: |
|
traceback.print_exc() |
|
return processed_page |
|
|
|
|
|
def process_urls(urls): |
|
print(f"entering process urls: {len(urls)} found. {urls}") |
|
start_time = time.time() |
|
results = [] |
|
|
|
try: |
|
with ThreadPoolExecutor(max_workers=len(urls)) as pool: |
|
for result in pool.map(process_url, urls): |
|
results.append(result) |
|
except: |
|
traceback.print_exc() |
|
|
|
print( |
|
f"\n*****processed all urls {len(results)} {int(time.time() - start_time)} secs" |
|
) |
|
return results |
|
|
|
|
|
def extract_subtext(text): |
|
return ut.reform(text) |
|
|
|
|
|
def request_google(query_phrase): |
|
print(f"***** search {query_phrase}") |
|
sort = "&sort=date-sdate:d:w" |
|
if "today" in query_phrase or "latest" in query_phrase: |
|
sort = "&sort=date-sdate:d:s" |
|
print(f"search for: {query_phrase}") |
|
google_query = en.quote(query_phrase) |
|
response = [] |
|
try: |
|
start_wall_time = time.time() |
|
url = ( |
|
"https://www.googleapis.com/customsearch/v1?key=" |
|
+ ut.google_key |
|
+ "&cx=" |
|
+ ut.google_cx |
|
+ "&num=4" |
|
+ sort |
|
+ "&q=" |
|
+ google_query |
|
) |
|
response = requests.get(url) |
|
response_json = json.loads(response.text) |
|
print(f"***** google search {int((time.time() - start_wall_time) * 10) / 10} sec") |
|
except: |
|
traceback.print_exc() |
|
return [] |
|
|
|
|
|
if "items" not in response_json.keys(): |
|
print("no return from google ...", response, response_json.keys()) |
|
return [] |
|
|
|
urls = [] |
|
for i in range(len(response_json["items"])): |
|
url = response_json["items"][i]["link"].lstrip().rstrip() |
|
site = ut.extract_site(url) |
|
if site not in ut.sites or ut.sites[site] == 1: |
|
|
|
if "reddit" not in url and "youtube" not in url and "facebook" not in url: |
|
urls.append(url) |
|
return urls |
|
|
|
|
|
def search_google(original_query, query_phrase): |
|
full_text = "" |
|
|
|
try: |
|
orig_phrase_urls = [] |
|
if len(original_query) > 0: |
|
orig_phrase_urls = request_google(original_query[: min(len(original_query), 128)]) |
|
gpt_phrase_urls = [] |
|
if len(query_phrase) > 0: |
|
gpt_phrase_urls = request_google(query_phrase) |
|
if len(orig_phrase_urls) == 0 and len(gpt_phrase_urls) == 0: |
|
return "", [], 0, [""], 0, [""] |
|
|
|
for url in orig_phrase_urls: |
|
if url in gpt_phrase_urls: |
|
gpt_phrase_urls.remove(url) |
|
|
|
|
|
urls = [ |
|
val |
|
for tup in zip_longest(orig_phrase_urls, gpt_phrase_urls) |
|
for val in tup |
|
if val is not None |
|
] |
|
all_urls = copy.deepcopy(urls) |
|
start_wall_time = time.time() |
|
full_text = process_urls(all_urls) |
|
print(f"***** urls_processed {int((time.time() - start_wall_time) * 10) / 10} sec") |
|
except: |
|
traceback.print_exc() |
|
return full_text |
|
|