Spaces:
Running
Running
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.chrome.service import Service as ChromeService | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from bs4 import BeautifulSoup | |
from webdriver_manager.chrome import ChromeDriverManager | |
import os | |
import time | |
# Configure Chrome options | |
chrome_options = Options() | |
chrome_options.add_argument("--headless") # Run in headless mode | |
chrome_options.add_argument("--disable-gpu") | |
complete_starttime = time.time() | |
# URL of the Google Trends page | |
#script_dir = os.path.dirname(os.path.abspath(__file__)) | |
#driver_path = os.path.join(script_dir, 'chromedriver') | |
def setup_driver(): | |
options = webdriver.ChromeOptions() | |
options.add_argument('--headless') | |
options.add_argument('--no-sandbox') | |
options.add_argument('--disable-dev-shm-usage') | |
wd = webdriver.Chrome(options=options) | |
return wd | |
def process_selenium_row(index, selenium_rows, driver): | |
"""Extract dynamic data using Selenium by clicking on the row.""" | |
max_retries = 3 | |
for attempt in range(max_retries): | |
try: | |
articles = {} | |
# Refresh the rows before processing | |
#selenium_rows = driver.find_elements(By.CSS_SELECTOR, '[jsname="oKdM2c"]') | |
row = selenium_rows[index] | |
driver.execute_script("arguments[0].click();", row) # Use JavaScript click for stability | |
# Wait for the articles to load dynamically | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_all_elements_located((By.CLASS_NAME, "xZCHj")) | |
) | |
# Fetch only the newly loaded articles | |
articles = driver.find_elements(By.CLASS_NAME, "xZCHj") | |
print(articles) | |
# Extract data from the current row only | |
dynamic_data = { | |
"article": [ | |
{ | |
"href": article.get_attribute("href"), | |
"title": article.text | |
} | |
for article in articles | |
] | |
} | |
# Clear previously fetched articles and return current ones | |
return dynamic_data | |
except Exception as e: | |
print(f"Error processing row {index} (Attempt {attempt + 1}): {e}") | |
time.sleep(1) # Add delay before retry | |
print(f"Failed to process row {index} after {max_retries} attempts.") | |
return {"article": []} | |
def scrape_google_trends(driver, url): | |
"""Scrape Google Trends data and save to JSON.""" | |
all_data = [] | |
try: | |
driver.get(url) | |
WebDriverWait(driver, 20).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, '[jsname="oKdM2c"]')) | |
) | |
soup = BeautifulSoup(driver.page_source, "html.parser") | |
tables = soup.select('[jsname="cC57zf"]') | |
for table in tables: | |
rows_bs = table.find_all("tr") | |
selenium_rows = driver.find_elements(By.CSS_SELECTOR, '[jsname="oKdM2c"]') | |
for index, row_bs in enumerate(rows_bs): | |
static_data = [ | |
[div.get_text(strip=True) for div in cell.find_all("div")] | |
for cell in row_bs.find_all("td")[1:4] | |
] | |
dynamic_data = process_selenium_row(index, selenium_rows, driver) | |
combined_row = { | |
"static_data": static_data, | |
"dynamic_data": dynamic_data | |
} | |
all_data.append(combined_row) | |
return all_data | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
finally: | |
driver.quit() | |
def crawl_url(url): | |
"""Main function to be called from another script.""" | |
driver = setup_driver() | |
return scrape_google_trends(driver, url) | |
if __name__ == "__main__": | |
#crawl_url(url="https://trends.google.com/trends/trendingsearches/daily?geo=AT&category=2") | |
driver = setup_driver() | |