|
import os |
|
import time |
|
import logging |
|
import requests |
|
from apscheduler.schedulers.background import BackgroundScheduler |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
API_ENDPOINT = "https://api.siliconflow.cn/v1/usage/credit_summary" |
|
|
|
def get_credit_summary(api_key): |
|
""" |
|
使用 API 密钥获取额度信息。 |
|
|
|
Args: |
|
api_key: 用于访问 API 的密钥。 |
|
|
|
Returns: |
|
包含额度信息的字典,如果请求失败则返回 None。 |
|
""" |
|
headers = { |
|
"Authorization": f"Bearer {api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
try: |
|
response = requests.get(API_ENDPOINT, headers=headers) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.exceptions.RequestException as e: |
|
logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}") |
|
return None |
|
|
|
def load_keys(): |
|
""" |
|
从环境变量中加载 keys,并记录到日志中,同时获取并记录每个 key 的额度信息。 |
|
""" |
|
keys_str = os.environ.get("KEYS") |
|
if keys_str: |
|
keys = [key.strip() for key in keys_str.split(',')] |
|
logging.info(f"加载的 keys:{keys}") |
|
|
|
for key in keys: |
|
credit_summary = get_credit_summary(key) |
|
if credit_summary: |
|
logging.info(f"API Key:{key},额度信息:{credit_summary}") |
|
else: |
|
logging.warning("环境变量 KEYS 未设置。") |
|
|
|
|
|
scheduler = BackgroundScheduler() |
|
|
|
|
|
scheduler.add_job(load_keys, 'interval', hours=1) |
|
|
|
|
|
scheduler.start() |
|
|
|
|
|
if __name__ == '__main__': |
|
try: |
|
while True: |
|
time.sleep(120) |
|
except (KeyboardInterrupt, SystemExit): |
|
scheduler.shutdown() |
|
|