Spaces:
Running
Running
from pytrends.request import TrendReq | |
import streamlit as st | |
import pandas as pd | |
import xml.etree.ElementTree as ET | |
import requests | |
from datetime import datetime | |
import pytz | |
import hmac | |
import os | |
feed_url1 = 'https://trends.google.de/trends/trendingsearches/daily/rss?geo=AT' | |
def parse_url(url): | |
response = requests.get(url) | |
root = ET.fromstring(response.content) | |
return root | |
def convert_into_pd(req_json): | |
wanted_keys = ["entityNames", "title"] | |
final_json = [{ key: ts[key] for key in ts.keys() if key in wanted_keys} for ts in req_json ] | |
result_df = pd.DataFrame(final_json) | |
return result_df | |
def find_details(req_json, gewünschter_titel): | |
gewünschte_details = [] | |
for trend_info in req_json: | |
if trend_info['title'] == gewünschter_titel: | |
for article in trend_info['articles']: | |
article_details = { | |
'url': article['url'], | |
'snippet': article['snippet'], | |
'articleTitle': article['articleTitle'], | |
'time': article['time'] | |
} | |
gewünschte_details.append(article_details) | |
return gewünschte_details | |
def find_details2(req_json): | |
gewünschte_details = [] | |
for article in req_json: | |
article_details = { | |
'url': article['url'], | |
'snippet': article['snippet'], | |
'articleTitle': article['title'], | |
#'time': article['time'] | |
} | |
gewünschte_details.append(article_details) | |
return gewünschte_details | |
def display_articles_for_category(category): | |
for index, row in real_trending_searches[category].iterrows(): | |
count = index + 1 | |
with st.expander(f"{count}• {row['title']}"): | |
articles = find_details(base_data[category], row['title']) | |
for count2, url in enumerate(articles, start=1): | |
st.markdown(f"{count2}• {url['articleTitle']} [Go To →]({url['url']})") | |
def display_articles_for_today(count, index): | |
with st.expander(f"{count+1}• {index['title']['query']} | Generated Traffic: {index['formattedTraffic']}"): | |
articles = find_details2(index['articles']) | |
for count2, url in enumerate(articles, start=1): | |
st.markdown(f"{count2}• {url['articleTitle']} [Go To →]({url['url']})") | |
#st.markdown(f"{count}• {index} [Go To →])") | |
categories = { | |
"Gesundheit": "m", | |
"Alle": "all", | |
"Business": "b", | |
"Headlines": "h", | |
"Sport": "s", | |
"Entertainment": "e", | |
"Technik": "t", | |
} | |
def check_password(): | |
"""Returns `True` if the user had the correct password.""" | |
def password_entered(): | |
"""Checks whether a password entered by the user is correct.""" | |
if hmac.compare_digest(st.session_state["password"], os.environ.get("PASSWORD")): | |
st.session_state["password_correct"] = True | |
del st.session_state["password"] # Don't store the password. | |
else: | |
st.session_state["password_correct"] = False | |
# Return True if the password is validated. | |
if st.session_state.get("password_correct", False): | |
return True | |
# Show input for password. | |
st.text_input( | |
"Password", type="password", on_change=password_entered, key="password" | |
) | |
if "password_correct" in st.session_state: | |
st.error("😕 Password incorrect") | |
return False | |
if not check_password(): | |
st.stop() # Do not continue if check_password is not True. | |
pytrend = TrendReq(hl='de-AT', tz=360, timeout=(10,50)) | |
real_trending_searches = {} | |
base_data = {} | |
for category_name, category_code in categories.items(): | |
base = pytrend.realtime_trending_searches(pn='AT', cat=category_code, count=75) | |
base_data[category_name] = base | |
real_trending_searches[category_name] = convert_into_pd(base) | |
if 'selected_option' not in st.session_state: | |
st.session_state['selected_option'] = "default_value" # You can set a default value as needed | |
# Now, you can safely use st.session_state['selected_option'] | |
selected_option = st.sidebar.radio("Choose an option", ["Realzeit Anfragen", "Tagesaktuelle Anfragen", "Trending Searches Yesterday"]) | |
if selected_option == "Tagesaktuelle Anfragen": | |
today = pytrend.today_searches(pn="AT") | |
#trending_searches = pytrend.trending_searches(pn="austria") | |
for count, index in enumerate(today, start=0): | |
display_articles_for_today(count, index) | |
elif selected_option == "Realzeit Anfragen": | |
choices_list = list(real_trending_searches.keys()) | |
auswahl = st.selectbox("Select Ressort", choices_list) | |
display_articles_for_category(auswahl) | |
elif selected_option == "Trending Searches Yesterday": | |
# trending_searches = pytrend.trending_searches(pn="austria") | |
# st.code(trending_searches) | |
timezone = 'Europe/Vienna' | |
today = datetime.now(pytz.timezone(timezone)).date() | |
feed = parse_url(feed_url1) | |
entries = [] | |
ns = {'ht': 'https://trends.google.de/trends/trendingsearches/daily'} # Define namespace | |
for item in feed.findall('.//item'): | |
pubDate = datetime.strptime(item.find('pubDate').text, '%a, %d %b %Y %H:%M:%S %z').date() | |
# Filter: Überspringe, wenn pubDate heute ist | |
if pubDate == today: | |
continue | |
entry = { | |
'title': item.find('title').text, | |
'pubDate': item.find('pubDate').text, | |
'approx_traffic': item.find('ht:approx_traffic', ns).text if item.find('ht:approx_traffic', ns) is not None else None, | |
'news_items': [] | |
} | |
for news_item in item.findall('ht:news_item', ns): | |
news_details = { | |
'title': news_item.find('ht:news_item_title', ns).text, | |
'snippet': news_item.find('ht:news_item_snippet', ns).text, | |
'url': news_item.find('ht:news_item_url', ns).text, | |
'source': news_item.find('ht:news_item_source', ns).text | |
} | |
entry['news_items'].append(news_details) | |
entries.append(entry) | |
count = 1 | |
for entry in entries: | |
with st.expander(f"{count}• {entry['title']} | Generated Traffic: {entry['approx_traffic']}"): | |
#st.code(entry) | |
st.write(f"Veröffentlichungsdatum : {entry['pubDate']}") | |
for count2, link in enumerate(entry['news_items'], start=1): | |
st.markdown(f"{count2}• {link['title']} [Go To →]({link['url']})") | |
count += 1 |