|
import requests |
|
from bs4 import BeautifulSoup |
|
import html2text |
|
import re |
|
import os |
|
from modules import app_constants, file_utils, app_logger |
|
import json |
|
from langchain_openai import ChatOpenAI |
|
from langchain.schema import HumanMessage, SystemMessage |
|
import spacy |
|
from duckduckgo_search import DDGS |
|
nlp = spacy.load("en_core_web_sm") |
|
|
|
|
|
app_logger = app_logger.app_logger |
|
|
|
TMP_DIRECTORY = app_constants.WORKSPACE_DIRECTORY + 'tmp' |
|
DEFAULT_SEARCH_COUNT = app_constants.SEARCH_COUNT |
|
|
|
def download_and_clean(url): |
|
try: |
|
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:99.0) Gecko/20100101 Firefox/99.0"} |
|
response = requests.get(url, headers=headers) |
|
response.raise_for_status() |
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
for script in soup(["script", "style", "img", "a"]): |
|
script.extract() |
|
|
|
body_text = soup.get_text() |
|
h = html2text.HTML2Text() |
|
h.ignore_links = True |
|
h.ignore_images = True |
|
h.ignore_emphasis = True |
|
h.ignore_tables = True |
|
clean_text = h.handle(body_text) |
|
clean_text = re.sub(r'[^\w\s\n<>/\.]+', '', clean_text) |
|
clean_text = re.sub(r'\s+', ' ', clean_text).strip() |
|
return clean_text |
|
|
|
except requests.exceptions.RequestException as e: |
|
app_logger.error(f"Error while downloading and cleaning URL {url}: {str(e)}") |
|
return None |
|
|
|
def save_notes_to_file(topic, note, source_url): |
|
|
|
doc = nlp(note) |
|
|
|
|
|
if not os.path.exists(TMP_DIRECTORY): |
|
os.makedirs(TMP_DIRECTORY) |
|
|
|
|
|
sanitized_filename = file_utils.sanitize_filename(topic)+'.jsonl' |
|
full_path = os.path.join(TMP_DIRECTORY, sanitized_filename) |
|
|
|
|
|
text_block = "" |
|
word_count = 0 |
|
|
|
|
|
with open(full_path, 'a') as file: |
|
for sent in doc.sents: |
|
sentence_word_count = len(sent.text.split()) |
|
if word_count + sentence_word_count > 240: |
|
|
|
if word_count >= 120: |
|
data = { |
|
"note": text_block, |
|
"source_url": source_url |
|
} |
|
file.write(json.dumps(data) + '\n') |
|
|
|
text_block = sent.text |
|
word_count = sentence_word_count |
|
else: |
|
|
|
text_block += ' ' + sent.text if text_block else sent.text |
|
word_count += sentence_word_count |
|
|
|
|
|
if word_count >= 300: |
|
data = { |
|
"note": text_block, |
|
"source_url": source_url |
|
} |
|
file.write(json.dumps(data) + '\n') |
|
|
|
app_logger.info(f"Notes saved to file {full_path}") |
|
return full_path |
|
|
|
|
|
def url_list_downloader(url_list, topic): |
|
notes_file = None |
|
for url in url_list: |
|
try: |
|
text = download_and_clean(url) |
|
if text: |
|
notes_file = save_notes_to_file(topic, text, url) |
|
except Exception as e: |
|
app_logger.error(f"Error during processing for URL {url}: {e}") |
|
return notes_file |
|
|
|
def search_term_ddg(topic,count=DEFAULT_SEARCH_COUNT): |
|
try: |
|
llm = ChatOpenAI( |
|
model_name=app_constants.MODEL_NAME, |
|
openai_api_key=app_constants.openai_api_key, |
|
base_url=app_constants.local_model_uri, |
|
streaming=True |
|
) |
|
prompt = [ |
|
SystemMessage(content="Generate 5 plain keywords in comma separated based on user input. For example ['cat','bat','monkey','donkey','eagel']"), |
|
HumanMessage(content=topic), |
|
] |
|
response = llm(prompt) |
|
|
|
if hasattr(response, 'content'): |
|
search_keywords = response.content |
|
else: |
|
raise ValueError("Invalid response format") |
|
|
|
|
|
search_keywords = [keyword.strip() for keyword in search_keywords.split(',')] |
|
|
|
|
|
search_keywords = search_keywords[:8] |
|
|
|
urls = [] |
|
|
|
with DDGS(timeout=3) as ddgs: |
|
for term in search_keywords: |
|
|
|
results = ddgs.text(f"{topic} {term}", max_results=count) |
|
for result in results: |
|
url = result['href'] |
|
if not url.endswith(('.pdf', '.ppt', '.pptx', '.doc', '.docx')): |
|
urls.append(url) |
|
return sorted(set(urls)) |
|
|
|
except Exception as e: |
|
app_logger.error(f"An error occurred while searching for topic {topic}: {e}") |
|
return [] |
|
|
|
def explore_url_on_internet(topic, count=DEFAULT_SEARCH_COUNT): |
|
app_logger.info(f"Starting research on topic {topic}") |
|
|
|
sanitized_filename = file_utils.sanitize_filename(topic)+'.jsonl' |
|
full_path = os.path.join(TMP_DIRECTORY, sanitized_filename) |
|
|
|
|
|
if os.path.exists(full_path): |
|
app_logger.info(f"File already exists skipping download: ",full_path) |
|
note_file = full_path |
|
else: |
|
url_list = search_term_ddg(topic,count) |
|
note_file = url_list_downloader(url_list, topic) |
|
app_logger.info(f"Research on Internet completed for {topic}, file: {note_file}") |
|
return note_file |
|
|