from selenium.common.exceptions import ElementClickInterceptedException from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, as_completed from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import json import time # Configure Chrome options def setup_driver(): options = Options() options.add_argument("--headless") options.add_argument("--disable-gpu") options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument("--lang=de") return webdriver.Chrome(options=options) def click_and_scrape(driver, url): """Click each li element and scrape data.""" result_dict = {} try: driver.get(url) for attempt in range(4): try: button = WebDriverWait(driver, 20).until( EC.element_to_be_clickable(( By.XPATH, "//button[@aria-label='Alle Kategorien, Kategorie auswählen']" )) ) print("Button located.") driver.execute_script("arguments[0].scrollIntoView();", button) print(button.get_attribute("outerHTML")) button.click() print("Button clicked successfully.") break except ElementClickInterceptedException: print(f"Attempt {attempt + 1}: Click intercepted. Retrying...") try: ul_element = WebDriverWait(driver, 20).until( EC.presence_of_element_located(( By.XPATH, "//ul[@aria-label='Kategorie']" )) ) li_elements = ul_element.find_elements(By.TAG_NAME, "li") except Exception as e: print(f"Error locating ul_element: {e}") selected_elements = [li_elements[2]] + li_elements[4:] for index, li in enumerate(selected_elements): try: driver.execute_script("arguments[0].scrollIntoView();", li) driver.execute_script("arguments[0].click();", li) print(f"Clicked LI {index} using JavaScript.") time.sleep(2) try: span = li.find_element(By.CLASS_NAME, "W7g1Rb-rymPhb-fpDzbe-fmcmS") span_content = span.get_attribute("innerText") print(f"Extracted span content for LI {index}: {span_content}") data = scrape_google_trends(driver) result_dict[f"{span_content}"] = data except Exception as e: print(f"Could not find or extract span content in LI {index}: {e}") span_content = f"iteration_{index}" result_dict[f"{span_content}"] = [] except Exception as e: print(f"Error interacting with LI {index}: {e}") except Exception as e: print(f"Error during click and scrape: {e}") finally: driver.quit() return result_dict def process_selenium_row(index, rows, driver): """Extract dynamic data using Selenium by clicking on the row.""" max_retries = 3 for attempt in range(max_retries): try: articles = {} driver.execute_script("arguments[0].click();", rows[index]) WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CLASS_NAME, "xZCHj")) ) articles = driver.find_elements(By.CLASS_NAME, "xZCHj") articles = articles[:3] dynamic_data = { "article": [ { "href": article.get_attribute("href"), "title": article.text } for article in articles ] } # Clear previously fetched articles and return current ones return dynamic_data except Exception as e: error = e print(f"Failed to process row {index} after {max_retries} attempts.") return {"article": []} def scrape_google_trends(driver): """Scrape data dynamically from the current page.""" all_data = [] try: selenium_rows = None WebDriverWait(driver, 2).until( EC.presence_of_element_located((By.CSS_SELECTOR, '[jsname="oKdM2c"]')) ) soup = BeautifulSoup(driver.page_source, "html.parser") selenium_rows = driver.find_elements(By.CSS_SELECTOR, '[jsname="oKdM2c"]') tables = soup.select('[jsname="cC57zf"]') for table in tables: rows_bs = table.find_all("tr") for index, row_bs in enumerate(rows_bs): static_data = [ [div.get_text(strip=True) for div in cell.find_all("div")] for cell in row_bs.find_all("td")[1:4] ] dynamic_data = process_selenium_row(index, selenium_rows, driver) combined_row = { "static_data": static_data, "dynamic_data": dynamic_data } all_data.append(combined_row) return all_data except Exception as e: with open(f"page_source_debug.html", "w", encoding="utf-8") as f: f.write(driver.page_source) print(f"An error occurred during scraping: {e}") return [] def process_li_element(index, li_data, url): """Process a single li element.""" driver = setup_driver() try: driver.get(url) WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]")) ) ul_element = driver.find_element(By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]") li_elements = ul_element.find_elements(By.TAG_NAME, "li") selected_li = li_elements[li_data['index']] driver.execute_script("arguments[0].scrollIntoView();", selected_li) driver.execute_script("arguments[0].click();", selected_li) time.sleep(2) span_content = selected_li.find_element(By.CLASS_NAME, "W7g1Rb-rymPhb-fpDzbe-fmcmS").get_attribute("innerText") print(f"LI {li_data['index']} clicked: {span_content}") data = scrape_google_trends(driver) return {span_content: data} except Exception as e: print(f"Error processing LI {index}: {e}") return {} finally: driver.quit() def crawl_url(url): """Click each li element and scrape data in parallel.""" driver = setup_driver() result_dict = {} try: driver.get(url) WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]")) ) ul_element = driver.find_element(By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]") li_elements = ul_element.find_elements(By.TAG_NAME, "li") selected_elements = [{"index": i} for i in range(2, len(li_elements)) if i != 3] with ThreadPoolExecutor() as executor: futures = [executor.submit(process_li_element, idx, li_data, url) for idx, li_data in enumerate(selected_elements)] for future in as_completed(futures): result = future.result() result_dict.update(result) except Exception as e: print(f"Error during click and scrape: {e}") finally: driver.quit() return result_dict