Spaces:
Sleeping
Sleeping
File size: 8,425 Bytes
dabcedb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
import requests
import json
import base64
import re
import logging
class AuthManager:
"""
AuthManager类用于管理身份验证过程,包括获取API密钥、用户信息和处理刷新令牌等操作。
Attributes:
session (requests.Session): 一个session对象,用于保持与服务器之间的连接。
api_key (str): API密钥,用于身份验证。
user_info (dict): 用户信息字典。
refresh_token (str): 用于刷新的令牌。
user_id (str): 用户唯一标识符。
next_action (str): 下一步操作的标识符。
logger (logging.Logger): 日志记录器。
Methods:
log_values(): 记录关键信息到日志。
get_next_action(): 获取下一步的操作。
fetch_apikey(): 获取并存储API密钥。
login(email, password): 使用email和密码进行登录。
refresh_user_token(): 刷新用户的身份验证令牌。
base64_encode(data): 以JSON格式编码然后转换为Base64。
get_cookie_value(): 获取cookie值用于会话验证。
"""
email = ""
password = ""
def __init__(self, email, password):
self.email = email
self.password = password
self.session = requests.Session()
self.api_key = ""
self.user_info = {}
self.refresh_token = ""
self.user_id = ""
self.next_action = ""
self.logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
self.login()
self.fetch_apikey()
self.get_next_action()
self.log_values()
def log_values(self):
"""
记录刷新令牌、用户ID和下一步操作到日志中。
"""
self.logger.info(f"\033[92mRefresh Token: {self.refresh_token}\033[0m")
self.logger.info(f"\033[94mUser ID: {self.user_id}\033[0m")
self.logger.info(f"\033[96mNext Action: {self.next_action}\033[0m")
def get_next_action(self):
"""
尝试从服务器获取下一步操作标识符,并更新实例变量。
Returns:
str: 下一步的操作标识符,如果未找到则返回空字符串。
"""
try:
url = "https://chat.notdiamond.ai/"
headers = {
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'cookie': self.get_cookie_value()
}
response = self.session.get(url, headers=headers)
response.raise_for_status()
script_tags = re.findall(r'<script[^>]*>(.*?)<\/script>', response.text, re.DOTALL)
matches = []
for script_content in script_tags:
if "static/chunks/app/(chat)/page-" in script_content:
matches.extend(re.findall(r'static/chunks/[a-zA-Z0-9]+-[a-zA-Z0-9]+\.js', script_content))
if not matches:
self.logger.warning("未找到匹配的脚本标签")
else:
full_script_urls = [f"https://chat.notdiamond.ai/_next/{match}" for match in matches]
for full_url in full_script_urls:
try:
script_response = self.session.get(full_url, headers=headers)
script_response.raise_for_status()
match = re.search(r'v=\(0,s.\$\)\("([^"]+)"\)', script_response.text)
if match:
desired_value = match.group(1)
self.next_action = desired_value
return self.next_action
except requests.RequestException as e:
self.logger.error(f"请求脚本URL时发生错误 {full_url}: {e}")
return ""
return ""
except requests.RequestException as e:
return ""
def fetch_apikey(self):
"""
获取API密钥并将其存储在本地文件中,以备后用。
Returns:
str: 获取的API密钥。
"""
if self.api_key:
return self.api_key
try:
url = "https://chat.notdiamond.ai/login"
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/128.0.0.0 Safari/537.36'
}
response = self.session.get(url, headers=headers)
response.raise_for_status()
match = re.search(r'<script src="(/_next/static/chunks/app/layout-[^"]+\.js)"', response.text)
if not match:
self.logger.warning("未找到匹配的脚本标签")
return ""
js_url = f"https://chat.notdiamond.ai{match.group(1)}"
response = self.session.get(js_url, headers=headers)
response.raise_for_status()
match = re.search(r'\("https://spuckhogycrxcbomznwo\.supabase\.co","([^"]+)"\)', response.text)
if match:
self.api_key = match.group(1)
return self.api_key
else:
self.logger.error("未能匹配API key")
return ""
except requests.RequestException as e:
self.logger.error(f"请求JS文件时发生错误: {e}")
return ""
def login(self):
"""
使用类成员中的电子邮件和密码进行用户登录,并获取用户信息。
"""
url = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/token?grant_type=password"
headers = {
'apikey': self.fetch_apikey(),
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'Content-Type': 'application/json'
}
data = {
"email": self.email,
"password": self.password,
"gotrue_meta_security": {}
}
try:
response = self.session.post(url, headers=headers, json=data)
response.raise_for_status()
self.user_info = response.json()
self.refresh_token = self.user_info.get('refresh_token', '')
self.user_id = self.user_info.get('user', {}).get('id', '')
except requests.RequestException as e:
self.logger.error(f"\033[91m登录请求错误: {e}\033[0m")
return ""
def refresh_user_token(self):
"""
使用刷新令牌来请求一个新的访问令牌并更新实例变量。
"""
url = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/token?grant_type=refresh_token"
headers = {
'apikey': self.fetch_apikey(),
'content-type': 'application/json;charset=UTF-8',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
}
data = {
"refresh_token": self.refresh_token
}
try:
response = self.session.post(url, headers=headers, json=data)
response.raise_for_status()
self.user_info = response.json()
self.refresh_token = self.user_info.get('refresh_token', '')
self.user_id = self.user_info.get('user', {}).get('id', '')
except requests.RequestException as e:
self.logger.error(f"刷新令牌请求错误: {e}")
return ""
def base64_encode(self, data):
"""
将输入数据转换为JSON字符串并编码为Base64格式。
Parameters:
data (any): 需要编码的数据。
Returns:
str: 编码后的Base64字符串。
"""
json_str = json.dumps(data, ensure_ascii=False)
return base64.b64encode(json_str.encode('utf-8')).decode('utf-8')
def get_cookie_value(self):
"""
生成并返回用于会话验证的cookie值。
Returns:
str: 用于验证的cookie字符串。
"""
encoded_user_info = self.base64_encode(self.user_info)
return f"sb-spuckhogycrxcbomznwo-auth-token=base64-{encoded_user_info}"
|