Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import argparse | |
import datetime | |
import functools | |
import json | |
import math | |
import os | |
import time | |
import traceback | |
from dataclasses import dataclass | |
from typing import Any, Dict, List, Optional, Tuple | |
import bittensor as bt | |
import numpy as np | |
import pandas as pd | |
import wandb | |
from bittensor.extrinsics.serving import get_metadata | |
from dotenv import load_dotenv | |
from wandb.apis.public.history import HistoryScan | |
NETUID = 37 | |
DELAY_SECS = 3 | |
RETRIES = 3 | |
load_dotenv() | |
WANDB_TOKEN = os.environ.get("WANDB_API_KEY", None) | |
SUBTENSOR_ENDPOINT = os.environ.get("SUBTENSOR_ENDPOINT", None) | |
VALIDATOR_WANDB_PROJECT = "rusticluftig/finetuning" | |
BENCHMARK_WANDB_PROJECT = "" | |
BENCHMARK_FLAG = os.environ.get("BENCHMARK_FLAG", None) | |
class ModelData: | |
uid: int | |
hotkey: str | |
competition_id: int | |
namespace: str | |
name: str | |
commit: str | |
# Hash of (hash(model) + hotkey) | |
secure_hash: str | |
block: int | |
incentive: float | |
emission: float | |
def from_compressed_str( | |
cls, | |
uid: int, | |
hotkey: str, | |
cs: str, | |
block: int, | |
incentive: float, | |
emission: float, | |
): | |
"""Returns an instance of this class from a compressed string representation""" | |
tokens = cs.split(":") | |
return ModelData( | |
uid=uid, | |
hotkey=hotkey, | |
namespace=tokens[0], | |
name=tokens[1], | |
commit=tokens[2], | |
secure_hash=tokens[3], | |
competition_id=int(tokens[4]), | |
block=block, | |
incentive=incentive, | |
emission=emission, | |
) | |
def run_with_retries(func, *args, **kwargs): | |
"""Runs a provided function with retries in the event of a failure.""" | |
for i in range(0, RETRIES): | |
try: | |
return func(*args, **kwargs) | |
except (Exception, RuntimeError): | |
print(f"Failed to run function: {traceback.format_exc()}") | |
if i == RETRIES - 1: | |
raise | |
time.sleep(DELAY_SECS) | |
raise RuntimeError("Should never happen") | |
def get_subtensor_and_metagraph() -> Tuple[bt.subtensor, bt.metagraph]: | |
"""Returns a subtensor and metagraph for the finetuning subnet.""" | |
def _internal() -> Tuple[bt.subtensor, bt.metagraph]: | |
if SUBTENSOR_ENDPOINT: | |
parser = argparse.ArgumentParser() | |
bt.subtensor.add_args(parser) | |
subtensor = bt.subtensor( | |
config=bt.config( | |
parser=parser, | |
args=["--subtensor.chain_endpoint", SUBTENSOR_ENDPOINT], | |
) | |
) | |
else: | |
subtensor = bt.subtensor("finney") | |
metagraph = subtensor.metagraph(NETUID, lite=False) | |
return subtensor, metagraph | |
return run_with_retries(_internal) | |
def get_subnet_data( | |
subtensor: bt.subtensor, metagraph: bt.metagraph | |
) -> List[ModelData]: | |
result = [] | |
for uid in metagraph.uids.tolist(): | |
hotkey = metagraph.hotkeys[uid] | |
metadata = None | |
try: | |
metadata = run_with_retries( | |
functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey) | |
) | |
except: | |
print(f"Failed to get metadata for UID {uid}: {traceback.format_exc()}") | |
if not metadata: | |
continue | |
commitment = metadata["info"]["fields"][0] | |
hex_data = commitment[list(commitment.keys())[0]][2:] | |
chain_str = bytes.fromhex(hex_data).decode() | |
block = metadata["block"] | |
incentive = np.nan_to_num(metagraph.incentive[uid]).item() | |
emission = ( | |
np.nan_to_num(metagraph.emission[uid]).item() * 20 | |
) # convert to daily TAO | |
model_data = None | |
try: | |
model_data = ModelData.from_compressed_str( | |
uid, hotkey, chain_str, block, incentive, emission | |
) | |
except: | |
continue | |
result.append(model_data) | |
return result | |
def get_wandb_runs(project: str, filters: Dict[str, Any]) -> List: | |
"""Get the latest runs from Wandb, retrying infinitely until we get them. | |
Returns: | |
List: List of runs matching the provided filters, newest run (by creation time) first. | |
""" | |
while True: | |
api = wandb.Api(api_key=WANDB_TOKEN) | |
runs = list( | |
api.runs( | |
project, | |
filters=filters, | |
order="-created_at", | |
) | |
) | |
if len(runs) > 0: | |
return runs | |
# WandDB API is quite unreliable. Wait another minute and try again. | |
print("Failed to get runs from Wandb. Trying again in 60 seconds.") | |
time.sleep(60) | |
def get_scores( | |
uids: List[int], | |
wandb_runs: List, | |
) -> Dict[int, Dict[str, Optional[float]]]: | |
"""Returns the most recent scores for the provided UIDs. | |
Args: | |
uids (List[int]): List of UIDs to get scores for. | |
wandb_runs (List): List of validator runs from Wandb. Requires the runs are provided in descending order. | |
""" | |
def _maybe_convert_loss(loss: float, comp_id: int) -> float: | |
"""Converts loss to score for competitions that require it.""" | |
if comp_id == 2: | |
return 1 - loss if loss else None | |
return loss | |
result = {} | |
previous_timestamp = None | |
seen_competitions = set() | |
# Iterate through the runs until we've processed all the uids. | |
for i, run in enumerate(wandb_runs): | |
if not "original_format_json" in run.summary: | |
continue | |
data = json.loads(run.summary["original_format_json"]) | |
all_uid_data = data["uid_data"] | |
timestamp = data["timestamp"] | |
# Make sure runs are indeed in descending time order. | |
assert ( | |
previous_timestamp is None or timestamp < previous_timestamp | |
), f"Timestamps are not in descending order: {timestamp} >= {previous_timestamp}" | |
previous_timestamp = timestamp | |
comp_id = data.get("competition_id", None) | |
for uid in uids: | |
if uid in result: | |
continue | |
if str(uid) in all_uid_data: | |
uid_data = all_uid_data[str(uid)] | |
# Only the most recent run per competition is fresh. | |
is_fresh = comp_id not in seen_competitions | |
result[uid] = { | |
"avg_loss": _maybe_convert_loss(uid_data.get("average_loss", None), comp_id), | |
"win_rate": uid_data.get("win_rate", None), | |
"win_total": uid_data.get("win_total", None), | |
"weight": uid_data.get("weight", None), | |
"competition_id": uid_data.get("competition_id", None), | |
"fresh": is_fresh, | |
} | |
seen_competitions.add(comp_id) | |
if len(result) == len(uids): | |
break | |
return result | |
def get_validator_weights( | |
metagraph: bt.metagraph, | |
) -> Dict[int, Tuple[float, int, Dict[int, float]]]: | |
"""Returns a dictionary of validator UIDs to (vtrust, stake, {uid: weight}).""" | |
ret = {} | |
for uid in metagraph.uids.tolist(): | |
vtrust = metagraph.validator_trust[uid].item() | |
stake = metagraph.stake[uid].item() | |
if vtrust > 0 and stake > 10_000: | |
ret[uid] = (vtrust, stake, {}) | |
for ouid in metagraph.uids.tolist(): | |
if ouid == uid: | |
continue | |
weight = round(metagraph.weights[uid][ouid].item(), 4) | |
if weight > 0: | |
ret[uid][-1][ouid] = weight | |
return ret | |
def get_losses_over_time(wandb_runs: List, competition_id: int) -> pd.DataFrame: | |
"""Returns a dataframe of the best average model loss over time.""" | |
timestamps = [] | |
losses = [] | |
for run in wandb_runs: | |
# For each run, check the 10 most recent steps. | |
best_loss = math.inf | |
should_add_datapoint = False | |
min_step = max(0, run.lastHistoryStep - 10) | |
history_scan = HistoryScan( | |
run.client, run, min_step, run.lastHistoryStep, page_size=10 | |
) | |
max_timestamp = None | |
for step in history_scan: | |
if "original_format_json" not in step: | |
continue | |
data = json.loads(step["original_format_json"]) | |
all_uid_data = data["uid_data"] | |
timestamp = datetime.datetime.fromtimestamp(data["timestamp"]) | |
if max_timestamp is None: | |
max_timestamp = timestamp | |
max_timestamp = max(max_timestamp, timestamp) | |
for _, uid_data in all_uid_data.items(): | |
loss = uid_data.get("average_loss", math.inf) | |
c_id = uid_data.get("competition_id", None) | |
if c_id is None or c_id != competition_id: | |
continue | |
if loss < best_loss: | |
best_loss = loss | |
should_add_datapoint = True | |
# Now that we've processed the run's most recent steps, check if we should add a datapoint. | |
if should_add_datapoint: | |
timestamps.append(max_timestamp) | |
losses.append(best_loss) | |
return pd.DataFrame({"timestamp": timestamps, "losses": losses }) | |
def next_epoch(subtensor: bt.subtensor, block: int) -> int: | |
return ( | |
block | |
+ subtensor.get_subnet_hyperparameters(NETUID).tempo | |
- subtensor.blocks_since_epoch(NETUID, block) | |
) | |
def is_floatable(x) -> bool: | |
return ( | |
isinstance(x, float) and not math.isnan(x) and not math.isinf(x) | |
) or isinstance(x, int) | |
def format_score(uid: int, scores, key) -> Optional[float]: | |
if uid in scores: | |
if key in scores[uid]: | |
point = scores[uid][key] | |
if is_floatable(point): | |
return round(scores[uid][key], 4) | |
return None | |
def leaderboard_data( | |
leaderboard: List[ModelData], | |
scores: Dict[int, Dict[str, Optional[float]]], | |
competition_id: int, | |
show_stale: bool, | |
) -> List[List[Any]]: | |
"""Returns the leaderboard data, based on models data and UID scores.""" | |
return [ | |
[ | |
f"[{c.namespace}/{c.name} ({c.commit[0:8]})](https://huggingface.co./{c.namespace}/{c.name}/commit/{c.commit})", | |
format_score(c.uid, scores, "win_rate"), | |
format_score(c.uid, scores, "avg_loss"), | |
format_score(c.uid, scores, "weight"), | |
c.uid, | |
c.block, | |
] | |
for c in leaderboard | |
if c.competition_id == competition_id and ((c.uid in scores and scores[c.uid]["fresh"]) or show_stale) | |
] | |
def get_benchmarks() -> Tuple[pd.DataFrame, datetime.datetime]: | |
"""Returns the latest benchmarks and the time they were run.""" | |
if not BENCHMARK_WANDB_PROJECT: | |
print("No benchmark project set.") | |
return None, None | |
runs = get_wandb_runs(project=BENCHMARK_WANDB_PROJECT, filters=None) | |
for run in runs: | |
artifacts = list(run.logged_artifacts()) | |
if artifacts: | |
table = artifacts[-1].get("benchmarks") | |
if table: | |
return table.get_dataframe(), datetime.datetime.strptime( | |
run.metadata["startedAt"], "%Y-%m-%dT%H:%M:%S.%f" | |
) | |
print("Failed to get benchmarks from Wandb.") | |
return None, None | |
def make_validator_dataframe( | |
validator_df: pd.DataFrame, model_data: ModelData | |
) -> pd.DataFrame: | |
values = [ | |
[uid, int(validator_df[uid][1]), round(validator_df[uid][0], 4)] | |
+ [validator_df[uid][-1].get(c.uid) for c in model_data if c.incentive] | |
for uid, _ in sorted( | |
zip( | |
validator_df.keys(), | |
[validator_df[x][1] for x in validator_df.keys()], | |
), | |
key=lambda x: x[1], | |
reverse=True, | |
) | |
] | |
dtypes = {"UID": int, "Stake (τ)": float, "V-Trust": float} | |
dtypes.update( | |
{ | |
f"{c.namespace}/{c.name} ({c.commit[0:8]})": float | |
for c in model_data | |
if c.incentive | |
} | |
) | |
return pd.DataFrame(values, columns=dtypes.keys()).astype(dtypes) | |
def make_metagraph_dataframe(metagraph: bt.metagraph, weights=False) -> pd.DataFrame: | |
cols = [ | |
"stake", | |
"emission", | |
"trust", | |
"validator_trust", | |
"dividends", | |
"incentive", | |
"R", | |
"consensus", | |
"validator_permit", | |
] | |
frame = pd.DataFrame({k: getattr(metagraph, k) for k in cols}) | |
frame["block"] = metagraph.block.item() | |
frame["netuid"] = NETUID | |
frame["uid"] = range(len(frame)) | |
frame["hotkey"] = [axon.hotkey for axon in metagraph.axons] | |
frame["coldkey"] = [axon.coldkey for axon in metagraph.axons] | |
if weights and metagraph.W is not None: | |
# convert NxN tensor to a list of lists so it fits into the dataframe | |
frame["weights"] = [w.tolist() for w in metagraph.W] | |
return frame | |
def load_state_vars() -> dict[Any]: | |
while True: | |
try: | |
subtensor, metagraph = get_subtensor_and_metagraph() | |
print(f"Loaded subtensor and metagraph: {metagraph}") | |
model_data: List[ModelData] = get_subnet_data(subtensor, metagraph) | |
model_data.sort(key=lambda x: x.incentive, reverse=True) | |
print(f"Loaded {len(model_data)} models") | |
vali_runs = get_wandb_runs( | |
project=VALIDATOR_WANDB_PROJECT, | |
filters={"config.type": "validator", "config.uid": 28}, | |
# filters={"$and": [{"config.type": "validator"}], "$or": [{"config.uid": 28}, {"config.uid": 252}]}, | |
) | |
print(f"Loaded {len(vali_runs)} validator runs") | |
scores = get_scores([x.uid for x in model_data], vali_runs) | |
validator_df = get_validator_weights(metagraph) | |
weight_keys = set() | |
for uid, stats in validator_df.items(): | |
weight_keys.update(stats[-1].keys()) | |
# Compute loss over time for all competitions. | |
losses_1 = get_losses_over_time(vali_runs, 1) | |
losses_2 = get_losses_over_time(vali_runs, 2) | |
# Enable benchmark if the flag is set | |
if BENCHMARK_FLAG: | |
benchmarks, benchmark_timestamp = get_benchmarks() | |
else: | |
benchmarks, benchmark_timestamp = None, None | |
break | |
except KeyboardInterrupt: | |
print("Exiting...") | |
break | |
except Exception as e: | |
print(f"Failed to get data: {traceback.format_exc()}") | |
time.sleep(30) | |
return { | |
"metagraph": metagraph, | |
"model_data": model_data, | |
"vali_runs": vali_runs, | |
"scores": scores, | |
"validator_df": validator_df, | |
"benchmarks": benchmarks, | |
"benchmark_timestamp": benchmark_timestamp, | |
"losses_1": losses_1, | |
"losses_2": losses_2, | |
} | |
def test_load_state_vars(): | |
# TODO: Change to finetuning data. | |
subtensor = bt.subtensor("finney") | |
metagraph = subtensor.metagraph(NETUID, lite=True) | |
model_data = [ | |
ModelData( | |
uid=253, | |
hotkey="5DjoPAgZ54Zf6NsuiVYh8RjonnWWWREE2iXBNzM2VDBMQDPm", | |
namespace="jw-hf-test", | |
name="jw2", | |
commit="aad131f6b02219964e6dcf749c2a23e75a7ceca8", | |
secure_hash="L1ImYzWJwV+9KSnZ2TYW0Iy2KMcVjJVTd30YJoRkpbw=", | |
block=3131103, | |
incentive=1.0, | |
emission=209.06051635742188, | |
), | |
ModelData( | |
uid=1, | |
hotkey="5CccVtjk4yamCao6QYgEg7jc8vktdj16RbLKNUftHfEsjuJS", | |
namespace="borggAI", | |
name="bittensor-subnet9-models", | |
commit="d373864bc6c972872edb8db95eed570958054bac", | |
secure_hash="+drdTIKYEGYClW2FFVVID6A2Dh//4rLmExRFCJsH6Y4=", | |
block=2081837, | |
incentive=0.0, | |
emission=0.0, | |
), | |
ModelData( | |
uid=2, | |
hotkey="5HYwoXaczs3jAptbb5mk4aUCkgZqeNcNzJKxSec97GwasfLy", | |
namespace="jungiebeen", | |
name="pretrain1", | |
commit="4c0c6bfd0f92e243d6c8a82209142e7204c852c3", | |
secure_hash="ld/agc0XIWICom/Cpj0fkQLcMogMNj/F65MJogK5RLY=", | |
block=2467482, | |
incentive=0.0, | |
emission=0.0, | |
), | |
ModelData( | |
uid=3, | |
hotkey="5Dnb6edh9yTeEp5aasRPZVPRAkxvQ6qnERVcXw22awMZ5rxm", | |
namespace="jungiebeen", | |
name="pretrain2", | |
commit="e827b7281c92224adb11124489cc45356553a87a", | |
secure_hash="ld/agc0XIWICom/Cpj0fkQLcMogMNj/F65MJogK5RLY=", | |
block=2467497, | |
incentive=0.0, | |
emission=0.0, | |
), | |
ModelData( | |
uid=4, | |
hotkey="5FRfca8NbnH424WaX43PMhKBnbLA1bZpRRoXXiVs6HgsxN4K", | |
namespace="ZainAli60", | |
name="mine_modeles", | |
commit="8a4ed4ad1f1fb58d424fd22e8e9874b87d32917c", | |
secure_hash="tVcbZAFoNIOF+Ntxq31OQ2NrLXf5iFCmmPUJlpkMYYo=", | |
block=2508509, | |
incentive=0.0, | |
emission=0.0, | |
), | |
] | |
vali_runs = get_wandb_runs( | |
project=VALIDATOR_WANDB_PROJECT, | |
filters={"config.type": "validator", "config.uid": 238}, | |
) | |
scores = get_scores([x.uid for x in model_data], vali_runs) | |
validator_df = { | |
28: (1.0, 33273.4453125, {253: 1.0}), | |
49: ( | |
0.9127794504165649, | |
10401.677734375, | |
{ | |
7: 0.0867, | |
217: 0.0001, | |
219: 0.0001, | |
241: 0.0001, | |
248: 0.0001, | |
253: 0.9128, | |
}, | |
), | |
78: (1.0, 26730.37109375, {253: 1.0}), | |
116: (1.0, 629248.4375, {253: 1.0}), | |
150: (1.0, 272634.53125, {253: 1.0}), | |
161: (1.0, 280212.53125, {253: 1.0}), | |
180: (1.0, 16838.0, {253: 1.0}), | |
184: (1.0, 47969.3984375, {253: 1.0}), | |
210: (1.0, 262846.28125, {253: 1.0}), | |
213: (1.0, 119462.734375, {253: 1.0}), | |
215: (1.0, 274747.46875, {253: 1.0}), | |
234: (1.0, 38831.6953125, {253: 1.0}), | |
236: (1.0, 183966.9375, {253: 1.0}), | |
238: (1.0, 1293707.25, {253: 1.0}), | |
240: (1.0, 106461.6015625, {253: 1.0}), | |
243: (1.0, 320271.5, {253: 1.0}), | |
244: (1.0, 116138.9609375, {253: 1.0}), | |
247: (0.9527428150177002, 119812.390625, {7: 0.0472, 253: 0.9528}), | |
249: (1.0, 478127.3125, {253: 1.0}), | |
252: (1.0, 442395.03125, {253: 1.0}), | |
254: (1.0, 46845.2109375, {253: 1.0}), | |
255: (1.0, 28977.56640625, {253: 1.0}), | |
} | |
return { | |
"metagraph": metagraph, | |
"model_data": model_data, | |
"vali_runs": vali_runs, | |
"scores": scores, | |
"validator_df": validator_df, | |
} | |