import apache_beam as beam import gradio as gr import huggingface_hub import pandas as pd import plotly.graph_objects as go import spaces import textwrap import torch import us from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions from transformers import AutoTokenizer, AutoModelForCausalLM import json import logging import os import requests MODEL_NAME = "google/gemma-2-2b-it" PROMPT_TEMPLATE = """Write a succinct summary of the following weather alerts. Do not comment on missing information - just summarize the information provided/available. ```json {} ``` Summary (In the state...): """ # Initialize an empty list to store weather alerts alerts = [] # Define a transform for fetching weather alerts class FetchWeatherAlerts(beam.DoFn): def process(self, state): logging.info(f"Fetching weather alerts for {state} from weather.gov") url = f"https://api.weather.gov/alerts/active?area={state}" response = requests.get( url, headers={ "User-Agent": "(Neal DeBuhr, https://huggingface.co./spaces/ndebuhr/streaming-llm-weather-alerts)", "Accept": "application/geo+json", }, ) if response.status_code == 200: logging.info(f"Fetched weather alerts for {state} from weather.gov") features = response.json()["features"] alerts.append( { "features": [ { "event": feature["properties"]["event"], "headline": feature["properties"]["headline"], "instruction": feature["properties"]["instruction"], } for feature in features if feature["properties"]["messageType"] == "Alert" ], "state": state, } ) pipeline_options = PipelineOptions() # Save the main session state so that pickled functions and classes # defined in __main__ can be unpickled pipeline_options.view_as(SetupOptions).save_main_session = True # Create and run the Apache Beam pipeline to fetch weather alerts with beam.Pipeline(options=pipeline_options) as p: (p | "Create States" >> beam.Create([state.abbr for state in us.states.STATES]) | "Fetch Weather Alerts" >> beam.ParDo(FetchWeatherAlerts()) ) # Define a function to generate alert summaries using transformers and ZeroGPU @spaces.GPU() def generate_summaries(alerts): huggingface_hub.login(token=os.environ["HUGGINGFACE_TOKEN"]) device = torch.device("cuda") tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(device) for alert in alerts: prompt = PROMPT_TEMPLATE.format(json.dumps(alert, indent=2)) inputs = tokenizer(prompt, return_tensors="pt").to(device) with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=256, pad_token_id=tokenizer.eos_token_id ) alert["summary"] = ( tokenizer.decode(outputs[0], skip_special_tokens=True) .replace(prompt, "") .strip() ) return alerts alerts = generate_summaries(alerts) df = pd.DataFrame.from_dict( [{"state": alert["state"], "summary": alert["summary"]} for alert in alerts] ) def get_map(): def wrap_text(text, width=50): return "
".join(textwrap.wrap(text, width=width)) df["wrapped_summary"] = df["summary"].apply(wrap_text) fig = go.Figure( go.Choropleth( locations=df["state"], z=[1 for _ in df["summary"]], locationmode="USA-states", colorscale=[ [0, "lightgrey"], [1, "lightgrey"], ], # Single color for all states showscale=False, text=df["wrapped_summary"], hoverinfo="text", hovertemplate="%{text}", ) ) fig.update_layout(title_text="Streaming LLM Weather Alerts", geo_scope="usa") return fig # Create Gradio interface iface = gr.Interface(fn=get_map, inputs=None, outputs=gr.Plot()) # Launch the Gradio interface iface.launch()