import gradio as gr import requests import asyncio from typing import Any, Iterable from gradio.themes.base import Base from gradio.themes.utils import colors, fonts, sizes from gradio.themes.utils.colors import Color from communex.client import CommuneClient import aiohttp FONT = """""" IMAGE = """
𐌔𐌙𐌍𐌕𐋅𐌉𐌀
""" HEADER = """This leaderboard showcases the top-performing miners in the Synthia Commune Subnet. The models are ranked based on their daily rewards.
The Synthia subnet leverages Commune's incentives to create a permissionless mining market around distilling knowledge out of SOTA closed-source model APIs into a public dataset to accelerate the OpenSource AI space. Targeted models and strategy will adapt based on the current SOTA.
""" EVALUATION_HEADER = """Name represents the model name. Rewards / Day indicates the expected daily rewards for each model in $COMAI. UID is the unique identifier of the miner. $USD Value is the estimated dollar value of the daily rewards.
""" netuid = 3 node_url = "wss://commune-api-node-2.communeai.net" async def get_com_price(session: aiohttp.ClientSession) -> Any: retries = 5 for i in range(retries): try: async with session.get("https://api.mexc.com/api/v3/avgPrice?symbol=COMAIUSDT") as response: response.raise_for_status() price = float((await response.json())["price"]) print(f"Fetched COM price: {price}") return price except Exception as e: print(f"Error fetching COM price: {e}") await asyncio.sleep(retries) raise RuntimeError("Failed to fetch COM price") async def make_query(client: CommuneClient) -> tuple[dict[int, int], dict[int, str]]: request_dict = { "SubspaceModule": [ ("Name", [netuid]), ("Emission", []), ("Incentive", []), ("Dividends", []), ], } emission_dict = {} name_dict = {} result = client.query_batch_map(request_dict) emission = result["Emission"] netuid_emission = emission[netuid] incentive = result["Incentive"] netuid_incentive = incentive[netuid] dividends = result["Dividends"] netuid_dividends = dividends[netuid] names = result["Name"] highest_uid = max(names.keys()) for uid in range(highest_uid + 1): emission = netuid_emission[uid] if emission != 0: incentive = netuid_incentive[uid] dividends = netuid_dividends[uid] if incentive > 0: emission_dict[uid] = netuid_emission[uid] name_dict[uid] = names[uid] return emission_dict, name_dict async def get_leaderboard_data(): async with aiohttp.ClientSession() as session: try: com_price = await get_com_price(session) blocks_in_day = 10_800 client = CommuneClient(node_url) emission_dict, name_dict = await make_query(client) print("got the emission") scores = {} for uid, emi in emission_dict.items(): scores[uid] = (emi / 10**11) * blocks_in_day sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True) leaderboard_data = [] for rank, (uid, score) in enumerate(sorted_scores, start=1): name = name_dict[uid] units = score usd_value = score * com_price leaderboard_data.append((rank, uid, name, units, f"${usd_value:.2f}")) return leaderboard_data except Exception as e: print(f"Error fetching leaderboard data: {e}") return [] async def update_leaderboard_table(): leaderboard_data = await get_leaderboard_data() leaderboard_data = [list(row) for row in leaderboard_data] for row in leaderboard_data: row[0] = f"{row[0]} 🏆" total_usd_value = sum(float(row[4][1:]) for row in leaderboard_data) rewards_per_week = total_usd_value * 7 rewards_per_month = total_usd_value * 30 return leaderboard_data, f'''