import os import secrets import datetime import requests from fastapi import FastAPI, Depends, HTTPException, Header, Request from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, func from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session, relationship from pydantic import BaseModel from dotenv import load_dotenv from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles # Load environment variables from .env file load_dotenv() # Environment variables MYSQL_USER = os.getenv("MYSQL_USER") MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD") MYSQL_HOST = os.getenv("MYSQL_HOST") MYSQL_DB = os.getenv("MYSQL_DB") MAIN_API_KEY = os.getenv("MAIN_API_KEY") MAIN_API_URL = os.getenv("MAIN_API_URL", "https://api.typegpt.net/v1/chat/completions") MODEL_NAME = os.getenv("MODEL_NAME", "Image-Generator") DATABASE_URL = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}/{MYSQL_DB}" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() # --------------------------- # Database Models # --------------------------- class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) username = Column(String(50), unique=True, index=True, nullable=False) hashed_password = Column(String(128), nullable=False) is_admin = Column(Boolean, default=False) credits = Column(Integer, default=0) created_at = Column(DateTime(timezone=True), server_default=func.now()) # Relationship to API keys api_keys = relationship("APIKey", back_populates="owner", cascade="all, delete-orphan") class APIKey(Base): __tablename__ = "api_keys" id = Column(Integer, primary_key=True, index=True) key = Column(String(64), unique=True, index=True, nullable=False) user_id = Column(Integer, ForeignKey("users.id"), nullable=False) expiry_date = Column(DateTime, nullable=True) active = Column(Boolean, default=True) created_at = Column(DateTime(timezone=True), server_default=func.now()) owner = relationship("User", back_populates="api_keys") Base.metadata.create_all(bind=engine) # --------------------------- # FastAPI App & Config # --------------------------- app = FastAPI(title="Real API Key Platform") app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") def get_db(): db = SessionLocal() try: yield db finally: db.close() def generate_api_key() -> str: return secrets.token_hex(16) # --------------------------- # Pydantic Models # --------------------------- class UserCreate(BaseModel): username: str password: str class UserOut(BaseModel): id: int username: str is_admin: bool credits: int created_at: datetime.datetime class Config: orm_mode = True class APIKeyOut(BaseModel): id: int key: str expiry_date: datetime.datetime | None = None active: bool created_at: datetime.datetime class Config: orm_mode = True class GenerateKeyPayload(BaseModel): expiry_date: datetime.datetime | None = None class RequestPayload(BaseModel): prompt: str class CreditPayload(BaseModel): username: str credits: int class DeactivateKeyPayload(BaseModel): key: str # --------------------------- # Authentication Dependency # --------------------------- def get_current_user(x_api_key: str = Header(...), db: Session = Depends(get_db)) -> User: api_key_obj = db.query(APIKey).filter(APIKey.key == x_api_key, APIKey.active == True).first() if not api_key_obj: raise HTTPException(status_code=401, detail="Invalid or inactive API Key") if api_key_obj.expiry_date and datetime.datetime.utcnow() > api_key_obj.expiry_date: raise HTTPException(status_code=401, detail="API Key expired") return api_key_obj.owner # --------------------------- # Endpoints # --------------------------- # Registration: Create a new user and generate a primary API key (no expiry) @app.post("/register", response_model=UserOut) def register(user: UserCreate, db: Session = Depends(get_db)): if db.query(User).filter(User.username == user.username).first(): raise HTTPException(status_code=400, detail="Username already exists") new_user = User(username=user.username, hashed_password=user.password, is_admin=False) db.add(new_user) db.commit() db.refresh(new_user) primary_key = APIKey(key=generate_api_key(), user_id=new_user.id, expiry_date=None, active=True) db.add(primary_key) db.commit() db.refresh(primary_key) return new_user # User panel: Get current user details @app.get("/user/me", response_model=UserOut) def read_user_me(current_user: User = Depends(get_current_user)): return current_user # List all API keys belonging to the current user @app.get("/user/api_keys", response_model=list[APIKeyOut]) def get_user_api_keys(current_user: User = Depends(get_current_user)): return current_user.api_keys # Allow user to generate a new API key (with optional expiry date) @app.post("/user/generate_key", response_model=APIKeyOut) def generate_key(payload: GenerateKeyPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): new_key = APIKey(key=generate_api_key(), user_id=current_user.id, expiry_date=payload.expiry_date, active=True) db.add(new_key) db.commit() db.refresh(new_key) return new_key # Test endpoint for users @app.get("/user/test_api") def test_api(current_user: User = Depends(get_current_user)): return {"message": "API is working", "username": current_user.username, "credits": current_user.credits} # Proxy endpoint: Forwards request to main API using the secured main API key @app.post("/generate") def generate_image(payload: RequestPayload, current_user: User = Depends(get_current_user)): headers = { "Authorization": f"Bearer {MAIN_API_KEY}", "Content-Type": "application/json" } data = { "model": MODEL_NAME, "prompt": payload.prompt } response = requests.post(MAIN_API_URL, json=data, headers=headers) if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail="Error from main API") return response.json() # --------------------------- # Admin Endpoints # --------------------------- # List all users (admin-only) @app.get("/admin/users", response_model=list[UserOut]) def list_users(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): if not current_user.is_admin: raise HTTPException(status_code=403, detail="Not authorized") users = db.query(User).all() return users # Add credits to a user's account (admin-only) @app.post("/admin/add_credit") def add_credit(payload: CreditPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): if not current_user.is_admin: raise HTTPException(status_code=403, detail="Not authorized") user = db.query(User).filter(User.username == payload.username).first() if not user: raise HTTPException(status_code=404, detail="User not found") user.credits += payload.credits db.commit() db.refresh(user) return {"message": f"Added {payload.credits} credits to {user.username}. Total credits: {user.credits}"} # List all API keys in the system (admin-only) @app.get("/admin/api_keys", response_model=list[APIKeyOut]) def list_all_api_keys(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): if not current_user.is_admin: raise HTTPException(status_code=403, detail="Not authorized") keys = db.query(APIKey).all() return keys # Deactivate an API key (admin-only) @app.post("/admin/deactivate_key") def deactivate_key(payload: DeactivateKeyPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): if not current_user.is_admin: raise HTTPException(status_code=403, detail="Not authorized") key_obj = db.query(APIKey).filter(APIKey.key == payload.key).first() if not key_obj: raise HTTPException(status_code=404, detail="API Key not found") key_obj.active = False db.commit() return {"message": f"API Key {payload.key} deactivated."} # --------------------------- # UI Endpoints (Panels) # --------------------------- @app.get("/admin/ui") def admin_ui(request: Request): return templates.TemplateResponse("admin.html", {"request": request}) @app.get("/user/ui") def user_ui(request: Request): return templates.TemplateResponse("user.html", {"request": request})