Spaces:
Running
Running
from pytrends.request import TrendReq | |
import streamlit as st | |
import pandas as pd | |
import xml.etree.ElementTree as ET | |
import requests | |
from datetime import datetime | |
import pytz | |
import hmac | |
import os | |
import time | |
from PIL import Image | |
from trend_crawl2 import crawl_url | |
import re | |
def parse_url(url): | |
response = requests.get(url) | |
root = ET.fromstring(response.content) | |
return root | |
def convert_into_dict(req_json): | |
result = {} | |
# Iterate over each category in the JSON data | |
for category, entries in req_json.items(): | |
# Initialize the category if not already in result | |
if category not in result: | |
result[category] = {} | |
for entry in entries: | |
# Extract 'entityName' and 'searchQueries' from 'static_data' | |
static_data = entry.get("static_data", []) | |
if static_data and len(static_data[0]) >= 4: | |
entity_name = static_data[0][0] # First element | |
search_queries = static_data[0][3] # Fourth element | |
else: | |
entity_name = None | |
search_queries = None | |
# Initialize the entity under the category if not already present | |
if entity_name: | |
if entity_name not in result[category]: | |
result[category][entity_name] = { | |
"searchQueries": search_queries, | |
"articles": [] | |
} | |
# Extract articles from 'dynamic_data' | |
articles = entry.get("dynamic_data", {}).get("article", []) | |
for article in articles: | |
href = article.get("href") | |
article_title = article.get("title") | |
# Append the article information to the corresponding entity's article list | |
result[category][entity_name]["articles"].append({ | |
"href": href, | |
"title": article_title | |
}) | |
return result | |
def find_details(req_json, gewünschter_titel): | |
gewünschte_details = [] | |
for trend_info in req_json: | |
if trend_info['title'] == gewünschter_titel: | |
for article in trend_info['articles']: | |
article_details = { | |
'url': article['url'], | |
'snippet': article['snippet'], | |
'articleTitle': article['articleTitle'], | |
'time': article['time'], | |
'source' : article['source'] | |
} | |
gewünschte_details.append(article_details) | |
return gewünschte_details | |
def find_details2(req_json): | |
gewünschte_details = [] | |
for article in req_json: | |
article_details = { | |
'url': article['url'], | |
'snippet': article['snippet'], | |
'articleTitle': article['title'], | |
'source' : article['source'] | |
} | |
gewünschte_details.append(article_details) | |
return gewünschte_details | |
if 'reset' not in st.session_state: | |
st.session_state.reset = False | |
def display_trends_from_yesterday(): | |
checkbox_statuses = {} | |
urls = [] | |
timezone = 'Europe/Vienna' | |
today = datetime.now(pytz.timezone(timezone)).date() | |
feed = parse_url(feed_url1) | |
entries = [] | |
ns = {'ht': 'https://trends.google.de/trends/trendingsearches/daily'} # Define namespace | |
for item in feed.findall('.//item'): | |
pubDate = datetime.strptime(item.find('pubDate').text, '%a, %d %b %Y %H:%M:%S %z').date() | |
# Filter: Überspringe, wenn pubDate heute ist | |
if pubDate == today: | |
continue | |
entry = { | |
'title': item.find('title').text, | |
'pubDate': item.find('pubDate').text, | |
'approx_traffic': item.find('ht:approx_traffic', ns).text if item.find('ht:approx_traffic', ns) is not None else None, | |
'news_items': [] | |
} | |
for news_item in item.findall('ht:news_item', ns): | |
news_details = { | |
'title': news_item.find('ht:news_item_title', ns).text, | |
'snippet': news_item.find('ht:news_item_snippet', ns).text, | |
'url': news_item.find('ht:news_item_url', ns).text, | |
'source': news_item.find('ht:news_item_source', ns).text | |
} | |
entry['news_items'].append(news_details) | |
entries.append(entry) | |
count = 1 | |
for entry in entries: | |
with st.expander(f"{count}• {entry['title']} | Generated Traffic: {entry['approx_traffic']}"): | |
st.write(f"Veröffentlichungsdatum : {entry['pubDate']}") | |
for count2, link in enumerate(entry['news_items'], start=1): | |
checkbox_label = f"yesterday_{count}_{count2}" | |
if st.session_state.reset: | |
st.session_state[checkbox_label] = False | |
checkbox_statuses[checkbox_label] = st.session_state.get(checkbox_label, False) | |
checkbox_statuses[checkbox_label] = st.checkbox( | |
f"{count2}• {link['title']} | {link['source']} | [Go To →]({link['url']})", | |
value=checkbox_statuses[checkbox_label], | |
key=checkbox_label | |
) | |
if checkbox_statuses[checkbox_label]: | |
urls.append(link['url']) | |
# Button am Ende des Expanders | |
base_url = os.getenv("url") | |
query_params = "&".join([f"article-links[]={url}" for url in urls]) | |
full_url = f"{base_url}{query_params}" | |
st.link_button("Open All Links" , url= full_url) | |
count += 1 | |
# Function to display articles for a specific category | |
def display_articles_for_category(category): | |
checkbox_statuses = {} | |
urls = [] | |
trending_data = st.session_state["real_trending_searches"][selected_country][category] | |
if st.session_state.get("reset", False): | |
for idx, (topic, data) in enumerate(trending_data.items()): | |
for article_index, _ in enumerate(data["articles"]): | |
checkbox_label = f"{category}_{idx}_{article_index + 1}" | |
st.session_state[checkbox_label] = False | |
for idx, (topic, data) in enumerate(trending_data.items()): | |
with st.expander(f"{idx + 1}• {topic} | Generated Traffic: {data['searchQueries']}"): | |
for article_index, article in enumerate(data["articles"], start=1): | |
checkbox_label = f"{category}_{idx}_{article_index}" | |
current_value = st.session_state.get(checkbox_label, False) | |
checkbox_statuses[checkbox_label] = current_value | |
disabled = (not current_value) and (sum(checkbox_statuses.values()) >= MAX_CHECKED) | |
checkbox_statuses[checkbox_label] = st.checkbox( | |
f"{article_index}• {article['title']} | [Go To →]({article['href']})", | |
value=current_value, | |
key=checkbox_label, | |
disabled=disabled | |
) | |
if checkbox_statuses[checkbox_label]: | |
urls.append(article["href"]) | |
base_url = os.getenv("url", "https://example.com/?") | |
query_params = "&".join([f"article-links[]={u}" for u in urls]) | |
full_url = f"{base_url}{query_params}" | |
st.link_button("Open All Links", url=full_url) | |
# Funktion zum Rendern von Artikeln für heute | |
def display_articles_for_today(count, index): | |
checkbox_statuses = {} | |
urls = [] | |
# Dictionary zur Verwaltung des Status jeder Checkbox | |
for count2, url in enumerate(index['articles'], start=1): | |
checkbox_label = f"today_{count}_{count2}" | |
if st.session_state.reset: | |
st.session_state[checkbox_label] = False | |
checkbox_statuses[checkbox_label] = st.session_state.get(checkbox_label, False) | |
with st.expander(f"{count+1}• {index['title']['query']} | Generated Traffic: {index['formattedTraffic']}"): | |
articles = find_details2(index['articles']) | |
for count2, url in enumerate(articles, start=1): | |
checkbox_label = f"today_{count}_{count2}" | |
disabled = not checkbox_statuses[checkbox_label] and sum(checkbox_statuses.values()) >= MAX_CHECKED | |
checkbox_statuses[checkbox_label] = st.checkbox( | |
f"{count2}• {url['articleTitle']} | {url['source']} | [Go To →]({url['url']})", | |
value=checkbox_statuses[checkbox_label], | |
key=checkbox_label, | |
disabled=disabled | |
) | |
if checkbox_statuses[checkbox_label]: | |
urls.append(url['url']) | |
# Button am Ende des Expanders | |
base_url = os.getenv("url") | |
query_params = "&".join([f"article-links[]={url}" for url in urls]) | |
full_url = f"{base_url}{query_params}" | |
st.link_button("Open All Links" , url= full_url) | |
country_list = { | |
"Germamy" : "DE", | |
"Austria" : "AT" | |
} | |
pytrend = TrendReq(hl='de-AT', tz=360, timeout=(10,50)) | |
if 'base_load_finished' not in st.session_state: | |
st.session_state["real_trending_searches"] = {} | |
st.session_state["base_data"] = {} | |
st.session_state["pn"] = "AT" | |
print(st.session_state.reset) | |
if 'base_load_finished' not in st.session_state or st.session_state.reset: | |
with st.spinner("Loading Trends"): | |
st.session_state["today"] = {} | |
st.session_state["base"] = {} | |
for country_name, pn_option in country_list.items(): | |
st.session_state["base_data"][pn_option] = {} | |
st.session_state["real_trending_searches"][pn_option] = {} | |
st.session_state["today"][pn_option] = pytrend.today_searches(pn=pn_option) | |
st.session_state["base"][pn_option] = crawl_url(url=f"https://trends.google.com/trends/trendingsearches/daily?geo={pn_option}&category=2") | |
st.session_state["real_trending_searches"][pn_option] = convert_into_dict(st.session_state["base"][pn_option]) | |
st.session_state["base_load_finished"]= True | |
MAX_CHECKED = 3 | |
def check_password(): | |
"""Returns `True` if the user had the correct password.""" | |
def password_entered(): | |
"""Checks whether a password entered by the user is correct.""" | |
if hmac.compare_digest(st.session_state["password"], os.environ.get("PASSWORD")): | |
st.session_state["password_correct"] = True | |
del st.session_state["password"] # Don't store the password. | |
else: | |
st.session_state["password_correct"] = False | |
# Return True if the password is validated. | |
if st.session_state.get("password_correct", False): | |
return True | |
# Show input for password. | |
st.text_input( | |
"Password", type="password", on_change=password_entered, key="password" | |
) | |
if "password_correct" in st.session_state: | |
st.error("😕 Password incorrect") | |
return False | |
if not check_password(): | |
st.stop() # Do not continue if check_password is not True. | |
fixed_order = [ | |
"All categories", | |
"Autos and Vehicles", | |
"Beauty and Fashion", | |
"Business and Finance", | |
"Climate", | |
"Entertainment", | |
"Food and Drink", | |
"Games", | |
"Health", | |
"Hobbies and Leisure", | |
"Jobs and Education", | |
"Law and Government", | |
"Other", | |
"Pets and Animals", | |
"Politics", | |
"Science", | |
"Shopping", | |
"Sports", | |
"Technology", | |
"Travel and Transportation", | |
] | |
if 'selected_option' not in st.session_state: | |
st.session_state['selected_option'] = "default_value" # You can set a default value as needed | |
img = Image.open(r"heute_tensora.png") | |
st.sidebar.image(img) | |
# Now, you can safely use st.session_state['selected_option'] | |
# Selectbox to choose a country | |
selected_country = st.sidebar.selectbox("Choose a Country", ["AT", "DE"]) | |
feed_url1 = f'https://trends.google.de/trends/trendingsearches/daily/rss?geo={selected_country}' | |
# Button to trigger actions | |
if st.sidebar.button("Change Country"): | |
if selected_country == "AT": | |
st.session_state["pn"] = selected_country | |
elif selected_country == "DE": | |
st.session_state["pn"] = selected_country | |
selected_option = st.sidebar.radio("Choose an option", ["Realzeit Anfragen", "Tagesaktuelle Anfragen", "Trends von Gestern"]) | |
st.warning("Die aufgelisteten Keywörter für erhöhte Reichweite in den Überschriften verwenden") | |
if selected_option == "Tagesaktuelle Anfragen": | |
for count, index in enumerate(st.session_state["today"][selected_country], start=0): | |
try: | |
display_articles_for_today(count, index) | |
except Exception as e: | |
st.code(e) | |
continue | |
elif selected_option == "Realzeit Anfragen": | |
raw_choices_list = list(st.session_state["real_trending_searches"][selected_country].keys()) | |
cleaned_to_raw_mapping = {re.sub(r"\s\(\d+\)$", "", choice): choice for choice in raw_choices_list} | |
choices_list = [category for category in fixed_order if category in cleaned_to_raw_mapping] | |
auswahl = st.selectbox("Select Ressort", choices_list, index=0) | |
display_articles_for_category(cleaned_to_raw_mapping[auswahl]) | |
elif selected_option == "Trends von Gestern": | |
display_trends_from_yesterday() | |
if st.session_state.reset: | |
st.session_state["reset"] = False |