import argparse import datetime import functools import json import math import os import time import traceback from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple import bittensor as bt import numpy as np import pandas as pd import wandb from bittensor.extrinsics.serving import get_metadata from dotenv import load_dotenv from wandb.apis.public.history import HistoryScan, SampledHistoryScan from competitions import COMP_NAME_TO_ID NETUID = 37 DELAY_SECS = 3 RETRIES = 3 load_dotenv() WANDB_TOKEN = os.environ.get("WANDB_API_KEY", None) SUBTENSOR_ENDPOINT = os.environ.get("SUBTENSOR_ENDPOINT", None) VALIDATOR_WANDB_PROJECT = "rusticluftig/finetuning" BENCHMARK_WANDB_PROJECT = "rusticluftig/test-benchmarks" @dataclass(frozen=True) class ModelData: uid: int hotkey: str competition_id: int namespace: str name: str commit: str # Hash of (hash(model) + hotkey) secure_hash: str block: int incentive: float emission: float @classmethod def from_compressed_str( cls, uid: int, hotkey: str, cs: str, block: int, incentive: float, emission: float, ): """Returns an instance of this class from a compressed string representation""" tokens = cs.split(":") return ModelData( uid=uid, hotkey=hotkey, namespace=tokens[0], name=tokens[1], commit=tokens[2], secure_hash=tokens[3], competition_id=int(tokens[4]), block=block, incentive=incentive, emission=emission, ) def run_with_retries(func, *args, **kwargs): """Runs a provided function with retries in the event of a failure.""" for i in range(0, RETRIES): try: return func(*args, **kwargs) except (Exception, RuntimeError): print(f"Failed to run function: {traceback.format_exc()}") if i == RETRIES - 1: raise time.sleep(DELAY_SECS) raise RuntimeError("Should never happen") def get_subtensor_and_metagraph() -> Tuple[bt.subtensor, bt.metagraph]: """Returns a subtensor and metagraph for the finetuning subnet.""" def _internal() -> Tuple[bt.subtensor, bt.metagraph]: if SUBTENSOR_ENDPOINT: parser = argparse.ArgumentParser() bt.subtensor.add_args(parser) subtensor = bt.subtensor( config=bt.config( parser=parser, args=["--subtensor.chain_endpoint", SUBTENSOR_ENDPOINT], ) ) else: subtensor = bt.subtensor("finney") metagraph = subtensor.metagraph(NETUID, lite=False) return subtensor, metagraph return run_with_retries(_internal) def get_subnet_data( subtensor: bt.subtensor, metagraph: bt.metagraph ) -> List[ModelData]: result = [] for uid in metagraph.uids.tolist(): hotkey = metagraph.hotkeys[uid] metadata = None try: metadata = run_with_retries( functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey) ) except: print(f"Failed to get metadata for UID {uid}: {traceback.format_exc()}") if not metadata: continue commitment = metadata["info"]["fields"][0] hex_data = commitment[list(commitment.keys())[0]][2:] chain_str = bytes.fromhex(hex_data).decode() block = metadata["block"] incentive = np.nan_to_num(metagraph.incentive[uid]).item() emission = ( np.nan_to_num(metagraph.emission[uid]).item() * 20 ) # convert to daily TAO model_data = None try: model_data = ModelData.from_compressed_str( uid, hotkey, chain_str, block, incentive, emission ) except: continue result.append(model_data) return result def get_wandb_runs( project: str, filters: Dict[str, Any], order: str = "-created_at" ) -> List: """Get the latest runs from Wandb, retrying infinitely until we get them. Args: project (str): The Wandb project to get runs from. filters (Dict[str, Any]): Filters to apply to the runs. order (str): Order to sort the runs by. Defaults to "-created_at" (newest first) Returns: List: List of runs matching the provided filters """ while True: api = wandb.Api(api_key=WANDB_TOKEN, timeout=100) runs = list( api.runs( project, filters=filters, order=order, ) ) if len(runs) > 0: return runs # WandDB API is quite unreliable. Wait another minute and try again. print("Failed to get runs from Wandb. Trying again in 60 seconds.") time.sleep(60) def get_scores( uids: List[int], wandb_runs: List, ) -> Dict[int, Dict[str, Optional[float]]]: """Returns the most recent scores for the provided UIDs. Args: uids (List[int]): List of UIDs to get scores for. wandb_runs (List): List of validator runs from Wandb. Requires the runs are provided in descending order. """ result = {} previous_timestamp = None seen_competitions = set() # Iterate through the runs until we've processed all the uids. for i, run in enumerate(wandb_runs): if not "original_format_json" in run.summary: continue data = json.loads(run.summary["original_format_json"]) all_uid_data = data["uid_data"] timestamp = data["timestamp"] # Make sure runs are indeed in descending time order. assert ( previous_timestamp is None or timestamp < previous_timestamp ), f"Timestamps are not in descending order: {timestamp} >= {previous_timestamp}" previous_timestamp = timestamp comp_id = data.get("competition_id", None) for uid in uids: if uid in result: continue if str(uid) in all_uid_data: uid_data = all_uid_data[str(uid)] # Only the most recent run per competition is fresh. is_fresh = comp_id not in seen_competitions result[uid] = { "avg_loss": uid_data.get("average_loss", None), "win_rate": uid_data.get("win_rate", None), "win_total": uid_data.get("win_total", None), "weight": uid_data.get("weight", None), "competition_id": uid_data.get("competition_id", None), "fresh": is_fresh, } seen_competitions.add(comp_id) if len(result) == len(uids): break return result def get_validator_weights( metagraph: bt.metagraph, ) -> Dict[int, Tuple[float, int, Dict[int, float]]]: """Returns a dictionary of validator UIDs to (vtrust, stake, {uid: weight}).""" ret = {} for uid in metagraph.uids.tolist(): vtrust = metagraph.validator_trust[uid].item() stake = metagraph.stake[uid].item() if vtrust > 0 and stake > 10_000: ret[uid] = (vtrust, stake, {}) for ouid in metagraph.uids.tolist(): if ouid == uid: continue weight = round(metagraph.weights[uid][ouid].item(), 4) if weight > 0: ret[uid][-1][ouid] = weight return ret def get_losses_over_time(wandb_runs: List, competition_id: int) -> pd.DataFrame: """Returns a dataframe of the best average model loss over time.""" timestamps = [] losses = [] for run in wandb_runs: # For each run, check the 10 most recent steps. best_loss = math.inf should_add_datapoint = False min_step = max(0, run.lastHistoryStep - 10) history_scan = SampledHistoryScan( run.client, run, ["original_format_json"], min_step, run.lastHistoryStep, page_size=10, ) max_timestamp = None for step in history_scan: data = json.loads(step["original_format_json"]) all_uid_data = data["uid_data"] timestamp = datetime.datetime.fromtimestamp(data["timestamp"]) if max_timestamp is None: max_timestamp = timestamp max_timestamp = max(max_timestamp, timestamp) for _, uid_data in all_uid_data.items(): loss = uid_data.get("average_loss", math.inf) c_id = uid_data.get("competition_id", None) if c_id is None or c_id != competition_id: continue # Filter out issue caused by wandb unavailability. if loss < 0.99 and loss < best_loss: best_loss = loss should_add_datapoint = True # Now that we've processed the run's most recent steps, check if we should add a datapoint. if should_add_datapoint: timestamps.append(max_timestamp) losses.append(best_loss) return pd.DataFrame({"timestamp": timestamps, "losses": losses}) def is_floatable(x) -> bool: return ( isinstance(x, float) and not math.isnan(x) and not math.isinf(x) ) or isinstance(x, int) def format_score(uid: int, scores, key) -> Optional[float]: if uid in scores: if key in scores[uid]: point = scores[uid][key] if is_floatable(point): return round(scores[uid][key], 4) return None def leaderboard_data( leaderboard: List[ModelData], scores: Dict[int, Dict[str, Optional[float]]], competition_id: int, show_stale: bool, ) -> List[List[Any]]: """Returns the leaderboard data, based on models data and UID scores.""" return [ [ f"[{c.namespace}/{c.name} ({c.commit[0:8]})](https://huggingface.co./{c.namespace}/{c.name}/commit/{c.commit})", format_score(c.uid, scores, "win_rate"), format_score(c.uid, scores, "avg_loss"), format_score(c.uid, scores, "weight"), c.uid, c.block, ] for c in leaderboard if c.competition_id == competition_id and ((c.uid in scores and scores[c.uid]["fresh"]) or show_stale) ] def get_benchmarks() -> Tuple[pd.DataFrame, Dict[str, Dict[str, float]]]: """Returns the latest benchmarks and the time they were run.""" if not BENCHMARK_WANDB_PROJECT: print("No benchmark project set.") return None, None runs = get_wandb_runs( project=BENCHMARK_WANDB_PROJECT, filters=None, order="+created_at" ) timestamps, uids, models, comp_ids, mmlu, mmlu_pro = [], [], [], [], [], [] for run in runs: uid = run.config.get("uid", None) model = run.config.get("model", None) # Any run without a competition_id was for competition 2. comp_name = run.config.get("competition_id", "B7_MULTI_CHOICE") comp_id = COMP_NAME_TO_ID.get(comp_name, 2) if not uid or not model: continue samples = list( HistoryScan( run.client, run, 0, 1, ) ) if not samples: continue sample = samples[0] # Make sure we have all the required keys. has_all_keys = True for required_key in ["mmlu.acc,none", "mmlu_pro", "_timestamp"]: if required_key not in sample: has_all_keys = False break if not has_all_keys: continue comp_ids.append(comp_id) timestamps.append(datetime.datetime.fromtimestamp(sample["_timestamp"])) mmlu.append(sample["mmlu.acc,none"]) mmlu_pro.append(sample["mmlu_pro"]) uids.append(uid) models.append(model) return ( pd.DataFrame( { "timestamp": timestamps, "uid": uids, "model": models, "competition_id": comp_ids, "mmlu": mmlu, "mmlu_pro": mmlu_pro, } ), { "mmlu": { "Llama-3.1-8B-Instruct": 0.681, "Mistral-7B-Instruct-v0.3": 0.597, "gemma-2-9b-it": 0.719, }, "mmlu_pro": { "Llama-3.1-8B-Instruct": 30.68, "Mistral-7B-Instruct-v0.3": 23.06, "gemma-2-9b-it": 31.95, }, }, ) def make_validator_dataframe( validator_df: pd.DataFrame, model_data: ModelData ) -> pd.DataFrame: values = [ [uid, int(validator_df[uid][1]), round(validator_df[uid][0], 4)] + [validator_df[uid][-1].get(c.uid) for c in model_data if c.incentive] for uid, _ in sorted( zip( validator_df.keys(), [validator_df[x][1] for x in validator_df.keys()], ), key=lambda x: x[1], reverse=True, ) ] dtypes = {"UID": int, "Stake (τ)": float, "V-Trust": float} dtypes.update( { f"{c.namespace}/{c.name} ({c.commit[0:8]})": float for c in model_data if c.incentive } ) return pd.DataFrame(values, columns=dtypes.keys()).astype(dtypes) def make_metagraph_dataframe(metagraph: bt.metagraph, weights=False) -> pd.DataFrame: cols = [ "stake", "emission", "trust", "validator_trust", "dividends", "incentive", "R", "consensus", "validator_permit", ] frame = pd.DataFrame({k: getattr(metagraph, k) for k in cols}) frame["block"] = metagraph.block.item() frame["netuid"] = NETUID frame["uid"] = range(len(frame)) frame["hotkey"] = [axon.hotkey for axon in metagraph.axons] frame["coldkey"] = [axon.coldkey for axon in metagraph.axons] if weights and metagraph.W is not None: # convert NxN tensor to a list of lists so it fits into the dataframe frame["weights"] = [w.tolist() for w in metagraph.W] return frame def load_state_vars() -> dict[Any]: while True: try: subtensor, metagraph = get_subtensor_and_metagraph() print(f"Loaded subtensor and metagraph: {metagraph}") model_data: List[ModelData] = get_subnet_data(subtensor, metagraph) model_data.sort(key=lambda x: x.incentive, reverse=True) print(f"Loaded {len(model_data)} models") vali_runs = get_wandb_runs( project=VALIDATOR_WANDB_PROJECT, filters={ "$and": [{"config.type": "validator"}], "$or": [{"config.uid": 28}, {"config.uid": 16}], }, ) print(f"Loaded {len(vali_runs)} validator runs") scores = get_scores([x.uid for x in model_data], vali_runs) print(f"Loaded {len(scores)} scores") validator_df = get_validator_weights(metagraph) weight_keys = set() for uid, stats in validator_df.items(): weight_keys.update(stats[-1].keys()) print("Loaded validator weights") # Compute loss over time for all competitions. # losses_2 = get_losses_over_time(vali_runs, 2) # print("Loaded losses over time for comp 2") benchmarks_df, benchmarks_targets = get_benchmarks() print("Loaded benchmarks") break except KeyboardInterrupt: print("Exiting...") break except Exception as e: print(f"Failed to get data: {traceback.format_exc()}") time.sleep(30) return { "metagraph": metagraph, "model_data": model_data, "vali_runs": vali_runs, "scores": scores, "validator_df": validator_df, "benchmarks_df": benchmarks_df, "benchmarks_targets": benchmarks_targets, }