import os import time import logging import requests from apscheduler.schedulers.background import BackgroundScheduler from flask import Flask, request, jsonify # 配置日志记录 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') API_ENDPOINT = "https://api.siliconflow.cn/v1/user/info" # 修改为获取用户信息的接口 app = Flask(__name__) def get_credit_summary(api_key): """ 使用 API 密钥获取额度信息。 """ headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: response = requests.get(API_ENDPOINT, headers=headers) response.raise_for_status() data = response.json().get("data", {}) total_balance = data.get("totalBalance", 0) # 获取总余额 return {"total_balance": total_balance} except requests.exceptions.RequestException as e: logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}") return None except (KeyError, TypeError) as e: logging.error(f"解析额度信息失败,API Key:{api_key},错误信息:{e}") return None def load_keys(): """ 从环境变量中加载 keys,并记录到日志中,同时获取并记录每个 key 的额度信息。 """ keys_str = os.environ.get("KEYS") if keys_str: keys = [key.strip() for key in keys_str.split(',')] logging.info(f"加载的 keys:{keys}") for key in keys: credit_summary = get_credit_summary(key) if credit_summary: logging.info(f"API Key:{key},额度信息:{credit_summary}") else: logging.warning("环境变量 KEYS 未设置。") # 创建一个后台调度器 scheduler = BackgroundScheduler() # 添加定时任务,每小时执行一次 load_keys 函数 scheduler.add_job(load_keys, 'interval', hours=1) @app.route('/') def index(): """ 处理根路由的访问请求。 """ return "

Welcome to SiliconFlow

" @app.route('/check_tokens', methods=['POST']) def check_tokens(): """ 处理前端发送的 Token 检测请求。 """ tokens = request.json.get('tokens', []) results = [] for token in tokens: is_valid = False balance = 0 error_message = "" # 先尝试进行一个简单的请求来验证 Token 的有效性 try: headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } test_response = requests.post("https://api.siliconflow.cn/v1/chat/completions", headers=headers, json={ "model": "Qwen/Qwen2.5-72B-Instruct", "messages": [{"role": "user", "content": "hi"}], "max_tokens": 100, "stream": False }, timeout=5) if test_response.status_code == 200: is_valid = True credit_summary = get_credit_summary(token) if credit_summary: balance = credit_summary.get("total_balance", 0) else: error_message = test_response.json().get("message", "未知错误") except requests.exceptions.RequestException as e: error_message = str(e) results.append({ "token": token, "isValid": is_valid, "balance": balance, "message": error_message }) return jsonify(results) if __name__ == '__main__': # 打印所有环境变量,方便调试 logging.info(f"环境变量:{os.environ}") # 启动调度器 scheduler.start() # 手动触发一次 load_keys 任务 load_keys() logging.info("首次加载 keys 已手动触发执行") # 启动 Flask 应用,监听所有 IP 的 7860 端口(Hugging Face Space 默认端口) app.run(debug=False, host='0.0.0.0', port=int(os.environ.get('PORT', 7860)))