import requests import pandas as pd import gradio as gr import plotly.express as px from datetime import datetime, timedelta import json from web3 import Web3 import time # RPC URLs OPTIMISM_RPC_URL = 'https://opt-mainnet.g.alchemy.com/v2/U5gnXPYxeyH43MJ9tP8ONBQHEDRav7H0' BASE_RPC_URL = 'https://base-mainnet.g.alchemy.com/v2/U5gnXPYxeyH43MJ9tP8ONBQHEDRav7H0' ETH_RPC_URL = 'https://eth-mainnet.g.alchemy.com/v2/U5gnXPYxeyH43MJ9tP8ONBQHEDRav7H0' # Initialize Web3 instances print("Initializing Web3 instances...") web3_optimism = Web3(Web3.HTTPProvider(OPTIMISM_RPC_URL)) web3_base = Web3(Web3.HTTPProvider(BASE_RPC_URL)) web3_eth = Web3(Web3.HTTPProvider(ETH_RPC_URL)) # Contract addresses for service registries contract_address_optimism = '0x3d77596beb0f130a4415df3D2D8232B3d3D31e44' contract_address_base = '0x3C1fF68f5aa342D296d4DEe4Bb1cACCA912D95fE' contract_address_eth = '0x48b6af7B12C71f09e2fC8aF4855De4Ff54e775cA' # Load the ABI from a local JSON file with open('service_registry_abi.json', 'r') as abi_file: contract_abi = json.load(abi_file) # Create the contract instances service_registry_optimism = web3_optimism.eth.contract(address=contract_address_optimism, abi=contract_abi) service_registry_base = web3_base.eth.contract(address=contract_address_base, abi=contract_abi) service_registry_eth = web3_eth.eth.contract(address=contract_address_eth, abi=contract_abi) print("Service registry contracts loaded.") # Check if connection is successful if not web3_optimism.is_connected(): raise Exception("Failed to connect to the Optimism network.") if not web3_base.is_connected(): raise Exception("Failed to connect to the Base network.") if not web3_eth.is_connected(): raise Exception("Failed to connect to the ETH network.") print("Successfully connected to Ethereum, Optimism, and Base networks.") def fetch_service_safes(web3, registry_contract): print("\nFetching service safes...") total_services = registry_contract.functions.totalSupply().call() print(f"Total services: {total_services}") service_safes = set() for service_id in range(1, total_services + 1): print(f"Processing service ID: {service_id}") service = registry_contract.functions.getService(service_id).call() agent_ids = service[-1] # Assuming the last element is the list of agent IDs print(f"Agent IDs: {agent_ids}") if 25 in agent_ids: agent_address = registry_contract.functions.getAgentInstances(service_id).call() service_safe = service[1] print(f"Found agent_address: {agent_address}") print(f"Found service safe: {service_safe}") service_safes.add(service_safe) print(f"Total service safes found: {len(service_safes)}") return service_safes # Fetch service safes for each network service_safes_optimism = fetch_service_safes(web3_optimism, service_registry_optimism) service_safes_base = fetch_service_safes(web3_base, service_registry_base) service_safes_eth = fetch_service_safes(web3_eth, service_registry_eth) service_safes_eth = {safe for safe in service_safes_eth if safe.lower() != '0x0000000000000000000000000000000000000000'} def get_block_range_for_date(chain_id, date_str, api_key, base_url): """Get the block range for a specific date.""" target_date = datetime.strptime(date_str, "%Y-%m-%d") start_of_day = datetime.combine(target_date, datetime.min.time()) end_of_day = datetime.combine(target_date, datetime.max.time()) start_timestamp = int(start_of_day.timestamp()) end_timestamp = int(end_of_day.timestamp()) # Get start block start_response = requests.get( f"{base_url}?module=block&action=getblocknobytime×tamp={start_timestamp}&closest=before&apikey={api_key}" ) if start_response.status_code == 200: start_data = start_response.json() start_block = start_data.get('result') else: print(f"Error fetching start block for {date_str} on chain {chain_id}") return None, None if start_block is None: print(f"No start block found for chain {chain_id} on {date_str}") return None, None print(f"Start block for chain {chain_id} on {date_str}: {start_block}") # Get end block time.sleep(1) end_response = requests.get( f"{base_url}?module=block&action=getblocknobytime×tamp={end_timestamp}&closest=before&apikey={api_key}" ) if end_response.status_code == 200: end_data = end_response.json() end_block = end_data.get('result') else: print(f"Error fetching end block for {date_str} on chain {chain_id}") return None, None if end_block is None: print(f"No end block found for chain {chain_id} on {date_str}") return None, None print(f"End block for chain {chain_id} on {date_str}: {end_block}") return start_block, end_block def get_transactions(api_keys, wallet_address, chain_name, start_block, end_block): """Retrieve transactions for the given wallet address, chain, and block range using the Etherscan or similar API.""" base_url = { 'optimism': "https://api-optimistic.etherscan.io/api", 'base': "https://api.basescan.org/api", 'ethereum': "https://api.etherscan.io/api" }.get(chain_name) if not base_url: print(f"Invalid chain name: {chain_name}") return [] params = { 'module': 'account', 'action': 'txlist', 'address': wallet_address, 'startblock': start_block, 'endblock': end_block, 'sort': 'asc', 'apikey': api_keys.get(chain_name) } response = requests.get(base_url, params=params) data = response.json() time.sleep(1) if data['status'] != '1': print(f"Error: {data['message']}") return [] valid_transactions = [tx for tx in data['result'] if tx['isError'] == '0'] return valid_transactions def date_range(start_date, end_date): """Generates a range of dates from start_date to end_date inclusive.""" start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") delta = timedelta(days=1) current_dt = start_dt while current_dt <= end_dt: yield current_dt.strftime("%Y-%m-%d") current_dt += delta def fetch_transactions(): # User inputs api_keys = { 'optimism': 'XQ72JA5XZ51QC7TG1W295AAIF4KTV92K1K', 'base': '4BFQMVW1QUKEPVDA4VW711CF4462682CY8', 'ethereum': '3GRYJGX55W3QWCEKGREF4H53AFHCAIVVR7' } base_urls = { 10: "https://api-optimistic.etherscan.io/api", 8453: "https://api.basescan.org/api", 1: "https://api.etherscan.io/api" } start_date = '2024-09-19' # Starting date current_date = datetime.now().strftime('%Y-%m-%d') # Till present date chains = { 10: ('optimism', service_safes_optimism), # Optimism chain ID and service safes 8453: ('base', service_safes_base), # Base chain ID and service safes 1: ('ethereum', service_safes_eth) # Ethereum mainnet chain ID and service safes } all_transactions = [] # List to hold all transactions for chain_id, (chain_name, service_safes) in chains.items(): base_url = base_urls[chain_id] api_key = api_keys[chain_name] for safe_address in service_safes: print(f"\nProcessing {chain_name.capitalize()} for safe address {safe_address}...") for single_date in date_range(start_date, current_date): start_block, end_block = get_block_range_for_date(chain_id, single_date, api_key, base_url) if start_block is None or end_block is None: print(f"Skipping date {single_date} for chain {chain_name} due to missing block data.") continue print(f"Start Block: {start_block}, End Block: {end_block} for date {single_date}") transactions = get_transactions(api_keys, safe_address, chain_name, start_block, end_block) if transactions: print(f"Found {len(transactions)} transactions on {single_date} for {chain_name.capitalize()} safe address {safe_address}:") for tx in transactions: tx_time = datetime.fromtimestamp(int(tx['timeStamp'])) all_transactions.append({ 'chain': chain_name, 'safe_address': safe_address, 'date': single_date, 'transaction_hash': tx['hash'], 'timestamp': tx_time, 'from': tx['from'], 'to': tx['to'], 'value_eth': int(tx['value']) / 1e18 # Convert value to ETH }) else: print(f"No transactions found for safe address {safe_address} on {single_date} on {chain_name.capitalize()}.") # Convert the collected transactions into a DataFrame df_transactions_new = pd.DataFrame(all_transactions) return df_transactions_new def create_transcation_visualizations(): df_transactions_new = fetch_transactions() df_transactions_new.to_csv('daily_transactions_new.csv', index=False) df_transactions_new['timestamp'] = pd.to_datetime(df_transactions_new['timestamp']) # Group by date and chain, count transactions daily_counts = df_transactions_new.groupby([df_transactions_new['timestamp'].dt.date, 'chain']).size().unstack(fill_value=0) # Set up the plot fig_tx_chain = px.bar( daily_counts, barmode="stack", title="Chain Daily Activity: Transactions" ) fig_tx_chain.update_layout( xaxis_title="Date", yaxis_title="Daily Transaction Nr", legend_title="Transaction Chain", xaxis_tickformat="%Y-%m-%d", height=700, ) return fig_tx_chain # Gradio interface def dashboard(): with gr.Blocks() as demo: gr.Markdown("# Valory Transactions Dashboard") # Fetch and display visualizations with gr.Tab("Transactions"): fig_tx_chain = create_transcation_visualizations() gr.Plot(fig_tx_chain) # Add more tabs as needed... return demo # Launch the dashboard if __name__ == "__main__": dashboard().launch()