webcrawler / app.py
Add1E's picture
Update app.py
b86b57c verified
raw
history blame
6.55 kB
from pytrends.request import TrendReq
import streamlit as st
import pandas as pd
import xml.etree.ElementTree as ET
import requests
from datetime import datetime
import pytz
import hmac
import os
feed_url1 = 'https://trends.google.de/trends/trendingsearches/daily/rss?geo=AT'
def parse_url(url):
response = requests.get(url)
root = ET.fromstring(response.content)
return root
def convert_into_pd(req_json):
wanted_keys = ["entityNames", "title"]
final_json = [{ key: ts[key] for key in ts.keys() if key in wanted_keys} for ts in req_json ]
result_df = pd.DataFrame(final_json)
return result_df
def find_details(req_json, gewünschter_titel):
gewünschte_details = []
for trend_info in req_json:
if trend_info['title'] == gewünschter_titel:
for article in trend_info['articles']:
article_details = {
'url': article['url'],
'snippet': article['snippet'],
'articleTitle': article['articleTitle'],
'time': article['time']
}
gewünschte_details.append(article_details)
return gewünschte_details
def find_details2(req_json):
gewünschte_details = []
for article in req_json:
article_details = {
'url': article['url'],
'snippet': article['snippet'],
'articleTitle': article['title'],
#'time': article['time']
}
gewünschte_details.append(article_details)
return gewünschte_details
def display_articles_for_category(category):
for index, row in real_trending_searches[category].iterrows():
count = index + 1
with st.expander(f"{count}{row['title']}"):
articles = find_details(base_data[category], row['title'])
for count2, url in enumerate(articles, start=1):
st.markdown(f"{count2}{url['articleTitle']} [Go To →]({url['url']})")
def display_articles_for_today(count, index):
with st.expander(f"{count+1}{index['title']['query']} | Generated Traffic: {index['formattedTraffic']}"):
articles = find_details2(index['articles'])
for count2, url in enumerate(articles, start=1):
st.markdown(f"{count2}{url['articleTitle']} [Go To →]({url['url']})")
#st.markdown(f"{count}• {index} [Go To →])")
categories = {
"Gesundheit": "m",
"Alle": "all",
"Business": "b",
"Headlines": "h",
"Sport": "s",
"Entertainment": "e",
"Technik": "t",
}
def check_password():
"""Returns `True` if the user had the correct password."""
def password_entered():
"""Checks whether a password entered by the user is correct."""
if hmac.compare_digest(st.session_state["password"], os.environ.get("PASSWORD")):
st.session_state["password_correct"] = True
del st.session_state["password"] # Don't store the password.
else:
st.session_state["password_correct"] = False
# Return True if the password is validated.
if st.session_state.get("password_correct", False):
return True
# Show input for password.
st.text_input(
"Password", type="password", on_change=password_entered, key="password"
)
if "password_correct" in st.session_state:
st.error("😕 Password incorrect")
return False
if not check_password():
st.stop() # Do not continue if check_password is not True.
pytrend = TrendReq(hl='de-AT', tz=360, timeout=(10,50))
real_trending_searches = {}
base_data = {}
for category_name, category_code in categories.items():
base = pytrend.realtime_trending_searches(pn='AT', cat=category_code, count=75)
base_data[category_name] = base
real_trending_searches[category_name] = convert_into_pd(base)
if 'selected_option' not in st.session_state:
st.session_state['selected_option'] = "default_value" # You can set a default value as needed
# Now, you can safely use st.session_state['selected_option']
selected_option = st.sidebar.radio("Choose an option", ["Realzeit Anfragen", "Tagesaktuelle Anfragen", "Trending Searches Yesterday"])
if selected_option == "Tagesaktuelle Anfragen":
today = pytrend.today_searches(pn="AT")
#trending_searches = pytrend.trending_searches(pn="austria")
for count, index in enumerate(today, start=0):
display_articles_for_today(count, index)
elif selected_option == "Realzeit Anfragen":
choices_list = list(real_trending_searches.keys())
auswahl = st.selectbox("Select Ressort", choices_list)
display_articles_for_category(auswahl)
elif selected_option == "Trending Searches Yesterday":
# trending_searches = pytrend.trending_searches(pn="austria")
# st.code(trending_searches)
timezone = 'Europe/Vienna'
today = datetime.now(pytz.timezone(timezone)).date()
feed = parse_url(feed_url1)
entries = []
ns = {'ht': 'https://trends.google.de/trends/trendingsearches/daily'} # Define namespace
for item in feed.findall('.//item'):
pubDate = datetime.strptime(item.find('pubDate').text, '%a, %d %b %Y %H:%M:%S %z').date()
# Filter: Überspringe, wenn pubDate heute ist
if pubDate == today:
continue
entry = {
'title': item.find('title').text,
'pubDate': item.find('pubDate').text,
'approx_traffic': item.find('ht:approx_traffic', ns).text if item.find('ht:approx_traffic', ns) is not None else None,
'news_items': []
}
for news_item in item.findall('ht:news_item', ns):
news_details = {
'title': news_item.find('ht:news_item_title', ns).text,
'snippet': news_item.find('ht:news_item_snippet', ns).text,
'url': news_item.find('ht:news_item_url', ns).text,
'source': news_item.find('ht:news_item_source', ns).text
}
entry['news_items'].append(news_details)
entries.append(entry)
count = 1
for entry in entries:
with st.expander(f"{count}{entry['title']} | Generated Traffic: {entry['approx_traffic']}"):
#st.code(entry)
st.write(f"Veröffentlichungsdatum : {entry['pubDate']}")
for count2, link in enumerate(entry['news_items'], start=1):
st.markdown(f"{count2}{link['title']} [Go To →]({link['url']})")
count += 1