SNAPZION-TESTING / main.py
Niansuh's picture
Update main.py
58893fb verified
import os
import secrets
import datetime
import requests
from fastapi import FastAPI, Depends, HTTPException, Header, Request
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship
from pydantic import BaseModel
from dotenv import load_dotenv
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
# Load environment variables from .env file
load_dotenv()
# Environment variables
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_DB = os.getenv("MYSQL_DB")
MAIN_API_KEY = os.getenv("MAIN_API_KEY")
MAIN_API_URL = os.getenv("MAIN_API_URL", "https://api.typegpt.net/v1/chat/completions")
MODEL_NAME = os.getenv("MODEL_NAME", "Image-Generator")
DATABASE_URL = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}/{MYSQL_DB}"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# ---------------------------
# Database Models
# ---------------------------
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
hashed_password = Column(String(128), nullable=False)
is_admin = Column(Boolean, default=False)
credits = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationship to API keys
api_keys = relationship("APIKey", back_populates="owner", cascade="all, delete-orphan")
class APIKey(Base):
__tablename__ = "api_keys"
id = Column(Integer, primary_key=True, index=True)
key = Column(String(64), unique=True, index=True, nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
expiry_date = Column(DateTime, nullable=True)
active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
owner = relationship("User", back_populates="api_keys")
Base.metadata.create_all(bind=engine)
# ---------------------------
# FastAPI App & Config
# ---------------------------
app = FastAPI(title="Real API Key Platform")
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def generate_api_key() -> str:
return secrets.token_hex(16)
# ---------------------------
# Pydantic Models
# ---------------------------
class UserCreate(BaseModel):
username: str
password: str
class UserOut(BaseModel):
id: int
username: str
is_admin: bool
credits: int
created_at: datetime.datetime
class Config:
orm_mode = True
class APIKeyOut(BaseModel):
id: int
key: str
expiry_date: datetime.datetime | None = None
active: bool
created_at: datetime.datetime
class Config:
orm_mode = True
class GenerateKeyPayload(BaseModel):
expiry_date: datetime.datetime | None = None
class RequestPayload(BaseModel):
prompt: str
class CreditPayload(BaseModel):
username: str
credits: int
class DeactivateKeyPayload(BaseModel):
key: str
# ---------------------------
# Authentication Dependency
# ---------------------------
def get_current_user(x_api_key: str = Header(...), db: Session = Depends(get_db)) -> User:
api_key_obj = db.query(APIKey).filter(APIKey.key == x_api_key, APIKey.active == True).first()
if not api_key_obj:
raise HTTPException(status_code=401, detail="Invalid or inactive API Key")
if api_key_obj.expiry_date and datetime.datetime.utcnow() > api_key_obj.expiry_date:
raise HTTPException(status_code=401, detail="API Key expired")
return api_key_obj.owner
# ---------------------------
# Endpoints
# ---------------------------
# Registration: Create a new user and generate a primary API key (no expiry)
@app.post("/register", response_model=UserOut)
def register(user: UserCreate, db: Session = Depends(get_db)):
if db.query(User).filter(User.username == user.username).first():
raise HTTPException(status_code=400, detail="Username already exists")
new_user = User(username=user.username, hashed_password=user.password, is_admin=False)
db.add(new_user)
db.commit()
db.refresh(new_user)
primary_key = APIKey(key=generate_api_key(), user_id=new_user.id, expiry_date=None, active=True)
db.add(primary_key)
db.commit()
db.refresh(primary_key)
return new_user
# User panel: Get current user details
@app.get("/user/me", response_model=UserOut)
def read_user_me(current_user: User = Depends(get_current_user)):
return current_user
# List all API keys belonging to the current user
@app.get("/user/api_keys", response_model=list[APIKeyOut])
def get_user_api_keys(current_user: User = Depends(get_current_user)):
return current_user.api_keys
# Allow user to generate a new API key (with optional expiry date)
@app.post("/user/generate_key", response_model=APIKeyOut)
def generate_key(payload: GenerateKeyPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
new_key = APIKey(key=generate_api_key(), user_id=current_user.id, expiry_date=payload.expiry_date, active=True)
db.add(new_key)
db.commit()
db.refresh(new_key)
return new_key
# Test endpoint for users
@app.get("/user/test_api")
def test_api(current_user: User = Depends(get_current_user)):
return {"message": "API is working", "username": current_user.username, "credits": current_user.credits}
# Proxy endpoint: Forwards request to main API using the secured main API key
@app.post("/generate")
def generate_image(payload: RequestPayload, current_user: User = Depends(get_current_user)):
headers = {
"Authorization": f"Bearer {MAIN_API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": MODEL_NAME,
"prompt": payload.prompt
}
response = requests.post(MAIN_API_URL, json=data, headers=headers)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Error from main API")
return response.json()
# ---------------------------
# Admin Endpoints
# ---------------------------
# List all users (admin-only)
@app.get("/admin/users", response_model=list[UserOut])
def list_users(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
users = db.query(User).all()
return users
# Add credits to a user's account (admin-only)
@app.post("/admin/add_credit")
def add_credit(payload: CreditPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
user = db.query(User).filter(User.username == payload.username).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.credits += payload.credits
db.commit()
db.refresh(user)
return {"message": f"Added {payload.credits} credits to {user.username}. Total credits: {user.credits}"}
# List all API keys in the system (admin-only)
@app.get("/admin/api_keys", response_model=list[APIKeyOut])
def list_all_api_keys(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
keys = db.query(APIKey).all()
return keys
# Deactivate an API key (admin-only)
@app.post("/admin/deactivate_key")
def deactivate_key(payload: DeactivateKeyPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
key_obj = db.query(APIKey).filter(APIKey.key == payload.key).first()
if not key_obj:
raise HTTPException(status_code=404, detail="API Key not found")
key_obj.active = False
db.commit()
return {"message": f"API Key {payload.key} deactivated."}
# ---------------------------
# UI Endpoints (Panels)
# ---------------------------
@app.get("/admin/ui")
def admin_ui(request: Request):
return templates.TemplateResponse("admin.html", {"request": request})
@app.get("/user/ui")
def user_ui(request: Request):
return templates.TemplateResponse("user.html", {"request": request})