Spaces:
Runtime error
Runtime error
import os | |
import secrets | |
import datetime | |
import requests | |
from fastapi import FastAPI, Depends, HTTPException, Header, Request | |
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, func | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import sessionmaker, Session, relationship | |
from pydantic import BaseModel | |
from dotenv import load_dotenv | |
from fastapi.templating import Jinja2Templates | |
from fastapi.staticfiles import StaticFiles | |
# Load environment variables from .env file | |
load_dotenv() | |
# Environment variables | |
MYSQL_USER = os.getenv("MYSQL_USER") | |
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD") | |
MYSQL_HOST = os.getenv("MYSQL_HOST") | |
MYSQL_DB = os.getenv("MYSQL_DB") | |
MAIN_API_KEY = os.getenv("MAIN_API_KEY") | |
MAIN_API_URL = os.getenv("MAIN_API_URL", "https://api.typegpt.net/v1/chat/completions") | |
MODEL_NAME = os.getenv("MODEL_NAME", "Image-Generator") | |
DATABASE_URL = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}/{MYSQL_DB}" | |
engine = create_engine(DATABASE_URL) | |
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
Base = declarative_base() | |
# --------------------------- | |
# Database Models | |
# --------------------------- | |
class User(Base): | |
__tablename__ = "users" | |
id = Column(Integer, primary_key=True, index=True) | |
username = Column(String(50), unique=True, index=True, nullable=False) | |
hashed_password = Column(String(128), nullable=False) | |
is_admin = Column(Boolean, default=False) | |
credits = Column(Integer, default=0) | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
# Relationship to API keys | |
api_keys = relationship("APIKey", back_populates="owner", cascade="all, delete-orphan") | |
class APIKey(Base): | |
__tablename__ = "api_keys" | |
id = Column(Integer, primary_key=True, index=True) | |
key = Column(String(64), unique=True, index=True, nullable=False) | |
user_id = Column(Integer, ForeignKey("users.id"), nullable=False) | |
expiry_date = Column(DateTime, nullable=True) | |
active = Column(Boolean, default=True) | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
owner = relationship("User", back_populates="api_keys") | |
Base.metadata.create_all(bind=engine) | |
# --------------------------- | |
# FastAPI App & Config | |
# --------------------------- | |
app = FastAPI(title="Real API Key Platform") | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
templates = Jinja2Templates(directory="templates") | |
def get_db(): | |
db = SessionLocal() | |
try: | |
yield db | |
finally: | |
db.close() | |
def generate_api_key() -> str: | |
return secrets.token_hex(16) | |
# --------------------------- | |
# Pydantic Models | |
# --------------------------- | |
class UserCreate(BaseModel): | |
username: str | |
password: str | |
class UserOut(BaseModel): | |
id: int | |
username: str | |
is_admin: bool | |
credits: int | |
created_at: datetime.datetime | |
class Config: | |
orm_mode = True | |
class APIKeyOut(BaseModel): | |
id: int | |
key: str | |
expiry_date: datetime.datetime | None = None | |
active: bool | |
created_at: datetime.datetime | |
class Config: | |
orm_mode = True | |
class GenerateKeyPayload(BaseModel): | |
expiry_date: datetime.datetime | None = None | |
class RequestPayload(BaseModel): | |
prompt: str | |
class CreditPayload(BaseModel): | |
username: str | |
credits: int | |
class DeactivateKeyPayload(BaseModel): | |
key: str | |
# --------------------------- | |
# Authentication Dependency | |
# --------------------------- | |
def get_current_user(x_api_key: str = Header(...), db: Session = Depends(get_db)) -> User: | |
api_key_obj = db.query(APIKey).filter(APIKey.key == x_api_key, APIKey.active == True).first() | |
if not api_key_obj: | |
raise HTTPException(status_code=401, detail="Invalid or inactive API Key") | |
if api_key_obj.expiry_date and datetime.datetime.utcnow() > api_key_obj.expiry_date: | |
raise HTTPException(status_code=401, detail="API Key expired") | |
return api_key_obj.owner | |
# --------------------------- | |
# Endpoints | |
# --------------------------- | |
# Registration: Create a new user and generate a primary API key (no expiry) | |
def register(user: UserCreate, db: Session = Depends(get_db)): | |
if db.query(User).filter(User.username == user.username).first(): | |
raise HTTPException(status_code=400, detail="Username already exists") | |
new_user = User(username=user.username, hashed_password=user.password, is_admin=False) | |
db.add(new_user) | |
db.commit() | |
db.refresh(new_user) | |
primary_key = APIKey(key=generate_api_key(), user_id=new_user.id, expiry_date=None, active=True) | |
db.add(primary_key) | |
db.commit() | |
db.refresh(primary_key) | |
return new_user | |
# User panel: Get current user details | |
def read_user_me(current_user: User = Depends(get_current_user)): | |
return current_user | |
# List all API keys belonging to the current user | |
def get_user_api_keys(current_user: User = Depends(get_current_user)): | |
return current_user.api_keys | |
# Allow user to generate a new API key (with optional expiry date) | |
def generate_key(payload: GenerateKeyPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
new_key = APIKey(key=generate_api_key(), user_id=current_user.id, expiry_date=payload.expiry_date, active=True) | |
db.add(new_key) | |
db.commit() | |
db.refresh(new_key) | |
return new_key | |
# Test endpoint for users | |
def test_api(current_user: User = Depends(get_current_user)): | |
return {"message": "API is working", "username": current_user.username, "credits": current_user.credits} | |
# Proxy endpoint: Forwards request to main API using the secured main API key | |
def generate_image(payload: RequestPayload, current_user: User = Depends(get_current_user)): | |
headers = { | |
"Authorization": f"Bearer {MAIN_API_KEY}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"model": MODEL_NAME, | |
"prompt": payload.prompt | |
} | |
response = requests.post(MAIN_API_URL, json=data, headers=headers) | |
if response.status_code != 200: | |
raise HTTPException(status_code=response.status_code, detail="Error from main API") | |
return response.json() | |
# --------------------------- | |
# Admin Endpoints | |
# --------------------------- | |
# List all users (admin-only) | |
def list_users(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
if not current_user.is_admin: | |
raise HTTPException(status_code=403, detail="Not authorized") | |
users = db.query(User).all() | |
return users | |
# Add credits to a user's account (admin-only) | |
def add_credit(payload: CreditPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
if not current_user.is_admin: | |
raise HTTPException(status_code=403, detail="Not authorized") | |
user = db.query(User).filter(User.username == payload.username).first() | |
if not user: | |
raise HTTPException(status_code=404, detail="User not found") | |
user.credits += payload.credits | |
db.commit() | |
db.refresh(user) | |
return {"message": f"Added {payload.credits} credits to {user.username}. Total credits: {user.credits}"} | |
# List all API keys in the system (admin-only) | |
def list_all_api_keys(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
if not current_user.is_admin: | |
raise HTTPException(status_code=403, detail="Not authorized") | |
keys = db.query(APIKey).all() | |
return keys | |
# Deactivate an API key (admin-only) | |
def deactivate_key(payload: DeactivateKeyPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
if not current_user.is_admin: | |
raise HTTPException(status_code=403, detail="Not authorized") | |
key_obj = db.query(APIKey).filter(APIKey.key == payload.key).first() | |
if not key_obj: | |
raise HTTPException(status_code=404, detail="API Key not found") | |
key_obj.active = False | |
db.commit() | |
return {"message": f"API Key {payload.key} deactivated."} | |
# --------------------------- | |
# UI Endpoints (Panels) | |
# --------------------------- | |
def admin_ui(request: Request): | |
return templates.TemplateResponse("admin.html", {"request": request}) | |
def user_ui(request: Request): | |
return templates.TemplateResponse("user.html", {"request": request}) | |