Spaces:
Sleeping
Sleeping
from flask import Flask, jsonify, request | |
import requests | |
from bs4 import BeautifulSoup | |
import os | |
import re | |
import urllib.parse | |
import time | |
import random | |
import base64 | |
from io import BytesIO | |
from googlesearch import search | |
import json | |
app = Flask(__name__) | |
def search_images(query, num_images=5): | |
# Headers to mimic a browser request | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate', | |
'DNT': '1', | |
'Connection': 'keep-alive', | |
} | |
# Format the query for URL | |
formatted_query = urllib.parse.quote(query + " high quality") | |
# Google Images URL | |
url = f"https://www.google.com/search?q={formatted_query}&tbm=isch&safe=active" | |
try: | |
# Get the HTML content | |
response = requests.get(url, headers=headers, timeout=30) | |
response.raise_for_status() | |
# Find all image URLs using regex | |
image_urls = re.findall(r'https?://[^"\']*?(?:jpg|jpeg|png|gif)', response.text) | |
# Remove duplicates while preserving order | |
image_urls = list(dict.fromkeys(image_urls)) | |
# Filter and clean results | |
results = [] | |
for img_url in image_urls: | |
if len(results) >= num_images: | |
break | |
# Skip small thumbnails, icons, and low-quality images | |
if ('gstatic.com' in img_url or | |
'google.com' in img_url or | |
'icon' in img_url.lower() or | |
'thumb' in img_url.lower() or | |
'small' in img_url.lower()): | |
continue | |
try: | |
# Verify the image URL is valid | |
img_response = requests.head(img_url, headers=headers, timeout=5) | |
if img_response.status_code == 200: | |
content_type = img_response.headers.get('Content-Type', '') | |
if content_type.startswith('image/'): | |
results.append({ | |
'url': img_url, | |
'content_type': content_type | |
}) | |
except Exception as e: | |
print(f"Error checking image URL: {str(e)}") | |
continue | |
# Add a small delay between checks | |
time.sleep(random.uniform(0.2, 0.5)) | |
return results | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
return [] | |
def get_cover_image(query): | |
"""Get a high-quality cover image URL for a given query""" | |
try: | |
# Search for images | |
images = search_images(query, num_images=3) # Get top 3 images to choose from | |
if not images: | |
return None | |
# Return the first valid image URL | |
return images[0]['url'] | |
except Exception as e: | |
print(f"Error getting cover image: {str(e)}") | |
return None | |
def api_search_images(): | |
try: | |
# Get query parameters | |
query = request.args.get('query', '') | |
num_images = int(request.args.get('num_images', 5)) | |
if not query: | |
return jsonify({'error': 'Query parameter is required'}), 400 | |
if num_images < 1 or num_images > 20: | |
return jsonify({'error': 'Number of images must be between 1 and 20'}), 400 | |
# Search for images | |
results = search_images(query, num_images) | |
return jsonify({ | |
'success': True, | |
'query': query, | |
'results': results | |
}) | |
except Exception as e: | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def scrape_site_content(query, num_sites=5): | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate', | |
'DNT': '1', | |
'Connection': 'keep-alive', | |
} | |
results = [] | |
scraped = 0 | |
retries = 2 # Number of retries per URL | |
timeout = 5 # Reduced timeout to 5 seconds | |
try: | |
# Get more URLs than needed to account for failures | |
search_results = list(search(query, num_results=num_sites * 2)) | |
# Process each found URL | |
for url in search_results: | |
if scraped >= num_sites: | |
break | |
success = False | |
for attempt in range(retries): | |
try: | |
# Get the HTML content | |
print(f"Trying {url} (attempt {attempt + 1}/{retries})") | |
response = requests.get( | |
url, | |
headers=headers, | |
timeout=timeout, | |
verify=False # Skip SSL verification | |
) | |
response.raise_for_status() | |
# Verify it's HTML content | |
content_type = response.headers.get('Content-Type', '').lower() | |
if 'text/html' not in content_type: | |
print(f"Skipping {url} - not HTML content") | |
break | |
# Parse the HTML content | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
# Extract text content (limit to first 10000 characters) | |
text_content = soup.get_text(separator='\n', strip=True)[:10000] | |
# Skip if not enough content | |
if len(text_content.split()) < 100: # Skip if less than 100 words | |
print(f"Skipping {url} - not enough content") | |
break | |
# Extract all links (limit to first 10) | |
links = [] | |
for link in soup.find_all('a', href=True)[:10]: | |
href = link['href'] | |
if href.startswith('http'): | |
links.append({ | |
'text': link.get_text(strip=True), | |
'url': href | |
}) | |
# Extract meta information | |
title = soup.title.string if soup.title else '' | |
meta_description = '' | |
meta_keywords = '' | |
meta_desc_tag = soup.find('meta', attrs={'name': 'description'}) | |
if meta_desc_tag: | |
meta_description = meta_desc_tag.get('content', '') | |
meta_keywords_tag = soup.find('meta', attrs={'name': 'keywords'}) | |
if meta_keywords_tag: | |
meta_keywords = meta_keywords_tag.get('content', '') | |
results.append({ | |
'url': url, | |
'title': title, | |
'meta_description': meta_description, | |
'meta_keywords': meta_keywords, | |
'text_content': text_content, | |
'links': links | |
}) | |
scraped += 1 | |
success = True | |
# Add a random delay between scrapes | |
time.sleep(random.uniform(0.5, 1)) | |
break # Break retry loop on success | |
except requests.Timeout: | |
print(f"Timeout on {url} (attempt {attempt + 1}/{retries})") | |
if attempt == retries - 1: # Last attempt | |
print(f"Skipping {url} after {retries} timeout attempts") | |
except requests.RequestException as e: | |
print(f"Error scraping {url} (attempt {attempt + 1}/{retries}): {str(e)}") | |
if attempt == retries - 1: # Last attempt | |
print(f"Skipping {url} after {retries} failed attempts") | |
# Add a longer delay between retries | |
if not success and attempt < retries - 1: | |
time.sleep(random.uniform(1, 2)) | |
# If we haven't found enough valid content and have more URLs, continue | |
if scraped < num_sites and len(results) < len(search_results): | |
continue | |
return results | |
except Exception as e: | |
print(f"Error in search/scraping process: {str(e)}") | |
# Return whatever results we've managed to gather | |
return results | |
def api_scrape_sites(): | |
try: | |
# Get query parameters | |
query = request.args.get('query', '') | |
num_sites = int(request.args.get('num_sites', 10)) | |
if not query: | |
return jsonify({'error': 'Query parameter is required'}), 400 | |
if num_sites < 1 or num_sites > 20: | |
return jsonify({'error': 'Number of sites must be between 1 and 20'}), 400 | |
# Scrape the websites | |
results = scrape_site_content(query, num_sites) | |
return jsonify({ | |
'success': True, | |
'query': query, | |
'results': results | |
}) | |
except Exception as e: | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def analyze_with_gpt(scraped_content, research_query): | |
"""Analyze scraped content using OpenRouter's Gemini model""" | |
try: | |
headers = { | |
'Authorization': f'Bearer {os.getenv("OPENROUTER_API_KEY")}', | |
'HTTP-Referer': 'http://localhost:5001', | |
'X-Title': 'Research Assistant' | |
} | |
# Prepare the prompt | |
prompt = f"""You are a research assistant analyzing web content to provide comprehensive research. | |
Research Query: {research_query} | |
Below is content scraped from various web sources. Analyze this content and provide a detailed, well-structured research response. | |
Make sure to cite sources when making specific claims. | |
Scraped Content: | |
{json.dumps(scraped_content, indent=2)} | |
Please provide: | |
1. A comprehensive analysis of the topic | |
2. Key findings and insights | |
3. Supporting evidence from the sources | |
4. Any additional considerations or caveats | |
Format your response in markdown with proper headings and citations.""" | |
response = requests.post( | |
'https://openrouter.ai/api/v1/chat/completions', | |
headers=headers, | |
json={ | |
'model': 'google/gemini-2.0-flash-thinking-exp:free', | |
'messages': [{ | |
'role': 'user', | |
'content': prompt | |
}] | |
}, | |
timeout=60 | |
) | |
if response.status_code != 200: | |
raise Exception(f"OpenRouter API error: {response.text}") | |
return response.json()['choices'][0]['message']['content'] | |
except Exception as e: | |
print(f"Error in analyze_with_gpt: {str(e)}") | |
return f"Error analyzing content: {str(e)}" | |
def research_topic(query, num_sites=5): | |
"""Research a topic using web scraping and GPT analysis""" | |
try: | |
# First get web content using existing scrape_site_content function | |
scraped_results = scrape_site_content(query, num_sites) | |
# Format scraped content for analysis | |
formatted_content = [] | |
for result in scraped_results: | |
formatted_content.append({ | |
'source': result['url'], | |
'title': result['title'], | |
'content': result['text_content'][:2000], # Limit content length for GPT | |
'meta_info': { | |
'description': result['meta_description'], | |
'keywords': result['meta_keywords'] | |
} | |
}) | |
# Get AI analysis of the scraped content | |
analysis = analyze_with_gpt(formatted_content, query) | |
return { | |
'success': True, | |
'query': query, | |
'analysis': analysis, | |
'sources': formatted_content | |
} | |
except Exception as e: | |
return { | |
'success': False, | |
'error': str(e) | |
} | |
def api_research(): | |
try: | |
query = request.args.get('query', '') | |
# Always use 5 sites for consistency | |
num_sites = 5 | |
if not query: | |
return jsonify({'error': 'Query parameter is required'}), 400 | |
results = research_topic(query, num_sites) | |
return jsonify(results) | |
except Exception as e: | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=5000) | |