File size: 4,768 Bytes
10e329b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import hvplot.streamz
import pandas as pd
import numpy as np
from streamz import Stream
from streamz.dataframe import DataFrame
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message
import datetime
import queue
import threading
import time
import os
import json
from huggingface_hub import CommitScheduler, HfApi, hf_hub_download
import uuid
from pathlib import Path
import panel as pn
pn.extension(design="material")
# Create a queue to communicate between threads
post_queue = queue.Queue()
# Counter for posts
post_count = 0
# Create streaming dataframe
stream = Stream()
# Wait 1 second to collect initial data
time.sleep(1)
example = pd.DataFrame(
{"timestamp": [pd.Timestamp.now()], "post_count": [post_count]}, index=[0]
)
df = DataFrame(stream, example=example)
# Calculate backlog for 1 month (31 days)
MONTH_IN_SECONDS = 31 * 24 * 60 * 60 # 31 days * 24 hours * 60 minutes * 60 seconds
# Add environment variable support for configuration
REPO_ID = os.getenv("HF_REPO_ID", "davanstrien/bluesky-counts")
REPO_TYPE = os.getenv("HF_REPO_TYPE", "dataset")
HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # Required for HuggingFace API access
DATA_FOLDER = Path("bluesky_data")
DATA_FILE = f"bluesky_counts_{uuid.uuid4()}.json"
def load_hub_data():
"""Load the most recent data from the Hub"""
try:
api = HfApi(token=HF_TOKEN)
# List files in the repository
files = api.list_repo_files(REPO_ID, repo_type=REPO_TYPE)
data_files = [f for f in files if f.startswith("data/bluesky_counts_")]
if not data_files:
return []
# Get the most recent file
latest_file = sorted(data_files)[-1]
# Download the file
local_path = hf_hub_download(
repo_id=REPO_ID, filename=latest_file, repo_type=REPO_TYPE, token=HF_TOKEN
)
# Load and parse the data
data = []
with open(local_path, "r") as f:
data.extend(json.loads(line.strip()) for line in f)
# Keep only last month of data
return data[-MONTH_IN_SECONDS:]
except Exception as e:
print(f"Error loading data from Hub: {e}")
return []
# Initialize storage and Hub connection
DATA_FOLDER.mkdir(exist_ok=True)
scheduler = CommitScheduler(
repo_id=REPO_ID,
repo_type=REPO_TYPE,
folder_path=DATA_FOLDER,
path_in_repo="data",
every=600, # Upload every 10 minutes
token=HF_TOKEN, # Add token for authentication
)
def on_message_handler(message):
global post_count
commit = parse_subscribe_repos_message(message)
# Only count new posts (not likes, reposts, etc)
if hasattr(commit, "ops"):
for op in commit.ops:
if op.action == "create" and "app.bsky.feed.post" in op.path:
post_count += 1
def emit_counts():
"""Emit post counts every second"""
global post_count
if saved_data := load_hub_data():
print(f"Loaded {len(saved_data)} historical data points from Hub")
# Emit historical data
for point in saved_data[-100:]: # Emit last 100 points to initialize plot
df = pd.DataFrame(
{
"timestamp": [pd.Timestamp(point["timestamp"])],
"post_count": [point["post_count"]],
}
)
stream.emit(df)
# Wait for first second to collect initial data
time.sleep(1)
while True:
# Create DataFrame with current timestamp and count
now = pd.Timestamp.now()
df = pd.DataFrame({"timestamp": [now], "post_count": [post_count]})
stream.emit(df)
# Reset counter
post_count = 0
# Wait 1 second
time.sleep(1)
# Create the plot with month-long backlog
plot = df.hvplot.line(
"timestamp",
"post_count",
title="Bluesky Posts per Second",
width=800,
height=400,
backlog=MONTH_IN_SECONDS, # Keep last month of points
)
# Start Firehose client in a separate thread
def run_firehose():
client = FirehoseSubscribeReposClient()
client.start(on_message_handler)
firehose_thread = threading.Thread(target=run_firehose)
firehose_thread.daemon = True
firehose_thread.start()
# Start emitting counts in another thread
emit_thread = threading.Thread(target=emit_counts)
emit_thread.daemon = True
emit_thread.start()
# If running in a Jupyter notebook, display the plot
if __name__ == "__main__":
import panel as pn
pn.extension()
dashboard = pn.Column(pn.pane.HoloViews(plot))
# Update server configuration for Docker
pn.serve(
dashboard,
address="0.0.0.0",
port=7860,
allow_websocket_origin=["*"], # Changed from "*" to ["*"]
show=False,
)
|