from pytrends.request import TrendReq import streamlit as st import pandas as pd import xml.etree.ElementTree as ET import requests from datetime import datetime import pytz import hmac import os import time def parse_url(url): response = requests.get(url) root = ET.fromstring(response.content) return root def convert_into_pd(req_json): wanted_keys = ["entityNames", "title"] final_json = [{ key: ts[key] for key in ts.keys() if key in wanted_keys} for ts in req_json ] result_df = pd.DataFrame(final_json) return result_df def find_details(req_json, gewünschter_titel): gewünschte_details = [] for trend_info in req_json: if trend_info['title'] == gewünschter_titel: for article in trend_info['articles']: article_details = { 'url': article['url'], 'snippet': article['snippet'], 'articleTitle': article['articleTitle'], 'time': article['time'], 'source' : article['source'] } gewünschte_details.append(article_details) return gewünschte_details def find_details2(req_json): gewünschte_details = [] for article in req_json: article_details = { 'url': article['url'], 'snippet': article['snippet'], 'articleTitle': article['title'], 'source' : article['source'] } gewünschte_details.append(article_details) return gewünschte_details if 'reset' not in st.session_state: st.session_state.reset = False def display_trends_from_yesterday(): checkbox_statuses = {} urls = [] timezone = 'Europe/Vienna' today = datetime.now(pytz.timezone(timezone)).date() feed = parse_url(feed_url1) entries = [] ns = {'ht': 'https://trends.google.de/trends/trendingsearches/daily'} # Define namespace for item in feed.findall('.//item'): pubDate = datetime.strptime(item.find('pubDate').text, '%a, %d %b %Y %H:%M:%S %z').date() # Filter: Überspringe, wenn pubDate heute ist if pubDate == today: continue entry = { 'title': item.find('title').text, 'pubDate': item.find('pubDate').text, 'approx_traffic': item.find('ht:approx_traffic', ns).text if item.find('ht:approx_traffic', ns) is not None else None, 'news_items': [] } for news_item in item.findall('ht:news_item', ns): news_details = { 'title': news_item.find('ht:news_item_title', ns).text, 'snippet': news_item.find('ht:news_item_snippet', ns).text, 'url': news_item.find('ht:news_item_url', ns).text, 'source': news_item.find('ht:news_item_source', ns).text } entry['news_items'].append(news_details) entries.append(entry) count = 1 for entry in entries: with st.expander(f"{count}• {entry['title']} | Generated Traffic: {entry['approx_traffic']}"): st.write(f"Veröffentlichungsdatum : {entry['pubDate']}") for count2, link in enumerate(entry['news_items'], start=1): checkbox_label = f"yesterday_{count}_{count2}" if st.session_state.reset: st.session_state[checkbox_label] = False checkbox_statuses[checkbox_label] = st.session_state.get(checkbox_label, False) checkbox_statuses[checkbox_label] = st.checkbox( f"{count2}• {link['title']} | {link['source']} | [Go To →]({link['url']})", value=checkbox_statuses[checkbox_label], key=checkbox_label ) if checkbox_statuses[checkbox_label]: urls.append(link['url']) # Button am Ende des Expanders base_url = os.getenv("url") query_params = "&".join([f"article-links[]={url}" for url in urls]) full_url = f"{base_url}{query_params}" st.link_button("Open All Links" , url= full_url) count += 1 def display_articles_for_category(category): checkbox_statuses = {} urls = [] # Dictionary zur Verwaltung des Status jeder Checkbox for index, row in st.session_state["real_trending_searches"][selected_country][category].iterrows(): articles = find_details(st.session_state["base_data"][selected_country][category], row['title']) for count2, url in enumerate(articles, start=1): checkbox_label = f"{category}_{index}_{count2}" if st.session_state.reset: st.session_state[checkbox_label] = False checkbox_statuses[checkbox_label] = st.session_state.get(checkbox_label, False) for index, row in st.session_state["real_trending_searches"][selected_country][category].iterrows(): count = index + 1 with st.expander(f"{count}• {row['title']} "): articles = find_details(st.session_state["base_data"][selected_country][category], row['title']) for count2, url in enumerate(articles, start=1): checkbox_label = f"{category}_{index}_{count2}" disabled = not checkbox_statuses[checkbox_label] and sum(checkbox_statuses.values()) >= MAX_CHECKED checkbox_statuses[checkbox_label] = st.checkbox( f"{count2}• {url['articleTitle']} | {url['source']} | [Go To →]({url['url']})", value=checkbox_statuses[checkbox_label], key=checkbox_label, disabled=disabled ) if checkbox_statuses[checkbox_label]: urls.append(url['url']) base_url = os.getenv("url") query_params = "&".join([f"article-links[]={url}" for url in urls]) full_url = f"{base_url}{query_params}" st.link_button("Open All Links" , url= full_url) # Funktion zum Rendern von Artikeln für heute def display_articles_for_today(count, index): checkbox_statuses = {} urls = [] # Dictionary zur Verwaltung des Status jeder Checkbox for count2, url in enumerate(index['articles'], start=1): checkbox_label = f"today_{count}_{count2}" if st.session_state.reset: st.session_state[checkbox_label] = False checkbox_statuses[checkbox_label] = st.session_state.get(checkbox_label, False) with st.expander(f"{count+1}• {index['title']['query']} | Generated Traffic: {index['formattedTraffic']}"): articles = find_details2(index['articles']) for count2, url in enumerate(articles, start=1): checkbox_label = f"today_{count}_{count2}" disabled = not checkbox_statuses[checkbox_label] and sum(checkbox_statuses.values()) >= MAX_CHECKED checkbox_statuses[checkbox_label] = st.checkbox( f"{count2}• {url['articleTitle']} | {url['source']} | [Go To →]({url['url']})", value=checkbox_statuses[checkbox_label], key=checkbox_label, disabled=disabled ) if checkbox_statuses[checkbox_label]: urls.append(url['url']) # Button am Ende des Expanders base_url = os.getenv("url") query_params = "&".join([f"article-links[]={url}" for url in urls]) full_url = f"{base_url}{query_params}" st.link_button("Open All Links" , url= full_url) categories = { "Alle": "all", "Gesundheit": "m", "Business": "b", "Headlines": "h", "Sport": "s", "Entertainment": "e", "Technik": "t", } country_list = { "Germamy" : "DE", "Austria" : "AT" } pytrend = TrendReq(hl='de-AT', tz=360, timeout=(10,50)) if 'base_load_finished' not in st.session_state: st.session_state["real_trending_searches"] = {} st.session_state["base_data"] = {} st.session_state["pn"] = "AT" if 'base_load_finished' not in st.session_state or st.session_state.reset: with st.spinner("Loading Trends"): st.session_state["today"] = {} for country_name, pn_option in country_list.items(): st.session_state["base_data"][pn_option] = {} st.session_state["real_trending_searches"][pn_option] = {} st.session_state["today"][pn_option] = pytrend.today_searches(pn=pn_option) for category_name, category_code in categories.items(): st.session_state["base"] = pytrend.realtime_trending_searches(pn=pn_option, cat=category_code, count=75) st.session_state["base_data"][pn_option][category_name] = st.session_state["base"] st.session_state["real_trending_searches"][pn_option][category_name] = convert_into_pd(st.session_state["base"]) MAX_CHECKED = 3 def check_password(): """Returns `True` if the user had the correct password.""" def password_entered(): """Checks whether a password entered by the user is correct.""" if hmac.compare_digest(st.session_state["password"], os.environ.get("PASSWORD")): st.session_state["password_correct"] = True del st.session_state["password"] # Don't store the password. else: st.session_state["password_correct"] = False # Return True if the password is validated. if st.session_state.get("password_correct", False): return True # Show input for password. st.text_input( "Password", type="password", on_change=password_entered, key="password" ) if "password_correct" in st.session_state: st.error("😕 Password incorrect") return False if not check_password(): st.stop() # Do not continue if check_password is not True. if 'selected_option' not in st.session_state: st.session_state['selected_option'] = "default_value" # You can set a default value as needed img = Image.open(r"heute_tensora.png") st.sidebar.image(img) # Now, you can safely use st.session_state['selected_option'] # Selectbox to choose a country selected_country = st.sidebar.selectbox("Choose a Country", ["AT", "DE"]) feed_url1 = f'https://trends.google.de/trends/trendingsearches/daily/rss?geo={selected_country}' # Button to trigger actions if st.sidebar.button("Change Country"): if selected_country == "AT": st.session_state["pn"] = selected_country elif selected_country == "DE": st.session_state["pn"] = selected_country selected_option = st.sidebar.radio("Choose an option", ["Realzeit Anfragen", "Tagesaktuelle Anfragen", "Trends von Gestern"]) st.warning("Die aufgelisteten Keywörter für erhöhte Reichweite in den Überschriften verwenden") if selected_option == "Tagesaktuelle Anfragen": for count, index in enumerate(st.session_state["today"][selected_country], start=0): try: display_articles_for_today(count, index) except Exception as e: st.code(e) continue elif selected_option == "Realzeit Anfragen": choices_list = list(st.session_state["real_trending_searches"][selected_country].keys()) if len(categories) == len(choices_list): st.session_state["base_load_finished"] = True auswahl = st.selectbox("Select Ressort", choices_list) display_articles_for_category(auswahl) elif selected_option == "Trends von Gestern": display_trends_from_yesterday() if st.session_state.reset: st.session_state["reset"] = False