Spaces:
Running
Running
File size: 4,106 Bytes
00b85ba 45dab0c 00b85ba 30f2d20 00b85ba 2c190d7 00b85ba e7e93f9 00b85ba 95b3aff 00b85ba 009a07b 95b3aff 9a3db39 00b85ba 95b3aff 9a3db39 95b3aff 00b85ba 95b3aff 57f5fd7 95b3aff 00b85ba 95b3aff 00b85ba 95b3aff 00b85ba 57f5fd7 95b3aff 00b85ba 95b3aff 00b85ba 95b3aff 00b85ba |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from webdriver_manager.chrome import ChromeDriverManager
import os
import time
# Configure Chrome options
chrome_options = Options()
chrome_options.add_argument("--headless") # Run in headless mode
chrome_options.add_argument("--disable-gpu")
complete_starttime = time.time()
# URL of the Google Trends page
#script_dir = os.path.dirname(os.path.abspath(__file__))
#driver_path = os.path.join(script_dir, 'chromedriver')
def setup_driver():
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
wd = webdriver.Chrome(options=options)
return wd
def process_selenium_row(index, selenium_rows, driver):
"""Extract dynamic data using Selenium by clicking on the row."""
max_retries = 3
for attempt in range(max_retries):
try:
articles = {}
# Refresh the rows before processing
#selenium_rows = driver.find_elements(By.CSS_SELECTOR, '[jsname="oKdM2c"]')
row = selenium_rows[index]
driver.execute_script("arguments[0].click();", row) # Use JavaScript click for stability
# Wait for the articles to load dynamically
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CLASS_NAME, "xZCHj"))
)
# Fetch only the newly loaded articles
articles = driver.find_elements(By.CLASS_NAME, "xZCHj")
print(articles)
# Extract data from the current row only
dynamic_data = {
"article": [
{
"href": article.get_attribute("href"),
"title": article.text
}
for article in articles
]
}
# Clear previously fetched articles and return current ones
return dynamic_data
except Exception as e:
print(f"Error processing row {index} (Attempt {attempt + 1}): {e}")
time.sleep(1) # Add delay before retry
print(f"Failed to process row {index} after {max_retries} attempts.")
return {"article": []}
def scrape_google_trends(driver, url):
"""Scrape Google Trends data and save to JSON."""
all_data = []
try:
driver.get(url)
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '[jsname="oKdM2c"]'))
)
soup = BeautifulSoup(driver.page_source, "html.parser")
tables = soup.select('[jsname="cC57zf"]')
for table in tables:
rows_bs = table.find_all("tr")
selenium_rows = driver.find_elements(By.CSS_SELECTOR, '[jsname="oKdM2c"]')
for index, row_bs in enumerate(rows_bs):
static_data = [
[div.get_text(strip=True) for div in cell.find_all("div")]
for cell in row_bs.find_all("td")[1:4]
]
dynamic_data = process_selenium_row(index, selenium_rows, driver)
combined_row = {
"static_data": static_data,
"dynamic_data": dynamic_data
}
all_data.append(combined_row)
return all_data
except Exception as e:
print(f"An error occurred: {e}")
finally:
driver.quit()
def crawl_url(url):
"""Main function to be called from another script."""
driver = setup_driver()
return scrape_google_trends(driver, url)
if __name__ == "__main__":
#crawl_url(url="https://trends.google.com/trends/trendingsearches/daily?geo=AT&category=2")
driver = setup_driver()
|