from http import HTTPStatus import os, io, base64 from typing import Optional from fastapi.responses import JSONResponse from typing_extensions import Annotated import uvicorn import logging from fastapi import FastAPI, File, Form, HTTPException, Header, UploadFile from apis.s3 import S3 from dotenv import load_dotenv from models.response import Response from models.image_object import ImageObject from apis.common import Common from fastapi.openapi.docs import ( get_redoc_html, get_swagger_ui_html, get_swagger_ui_oauth2_redirect_html, ) load_dotenv() IS_DEV = os.environ.get('ENV', 'DEV') != 'PROD' AWS_S3_BUCKET_NAME = os.getenv('AWS_S3_BUCKET_NAME', '') X_REQUEST_USER = os.environ.get('X_REQUEST_USER') X_API_KEY = os.environ.get('X_API_KEY') logging.basicConfig( level=logging.DEBUG if IS_DEV else logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) s3client = S3(logging) common = Common() app = FastAPI(docs_url=None, redoc_url=None) @app.get("/docs", include_in_schema=False) async def custom_swagger_ui_html(): return get_swagger_ui_html( openapi_url=app.openapi_url, title=app.title + " - Swagger UI", oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url, swagger_js_url="https://unpkg.com/swagger-ui-dist@5/swagger-ui-bundle.js", swagger_css_url="https://unpkg.com/swagger-ui-dist@5/swagger-ui.css", ) @app.get(app.swagger_ui_oauth2_redirect_url, include_in_schema=False) async def swagger_ui_redirect(): return get_swagger_ui_oauth2_redirect_html() @app.get("/redoc", include_in_schema=False) async def redoc_html(): return get_redoc_html( openapi_url=app.openapi_url, title=app.title + " - ReDoc", redoc_js_url="https://unpkg.com/redoc@next/bundles/redoc.standalone.js", ) @app.get("/") def root(): return Response(status=200, data='ok') @app.get("/health") def healthcheck(): return Response(status=200, data='ok') @app.post("/image") async def upload( o: ImageObject, x_request_user: str = Header(...), x_api_key: str = Header(...) ): res = Response() logging.info("--------------------------------") logging.info("Received request to upload image") if is_valid(x_request_user, x_api_key): key = f'{o.key}/{o.job_no}/{o.name}' logging.info(f'Key for S3 upload: {key}') if o.content is not None: try: # Decode base64 content decoded_content = base64.b64decode(o.content) logging.info(f'Decoded content length: {len(decoded_content)} bytes') # Wrap bytes in BytesIO to create a file-like object file_obj = io.BytesIO(decoded_content) # Upload file object to S3 # logging.info(f"Uploading file to S3 bucket '{AWS_S3_BUCKET_NAME}' with key '{key}'") res = s3client.upload_file(AWS_S3_BUCKET_NAME, file_obj, key) logging.info("File uploaded successfully") except Exception as e: res.error = str(e) logging.warning(f"Error during upload: {res.error}") res.data = o.ref return res.json() @app.post("/image-multiparts") async def upload2( x_request_user: str = Header(...), x_api_key: str = Header(...), job_no: Annotated[str, Form()] = '', s3key: Annotated[str, Form()] = '', ref: Annotated[str, Form()] = '', fileb: Optional[UploadFile] = File(None), ): res = Response() res.data = ref logging.info("--------------------------------") logging.info("Received request to upload image") # Validate headers if not is_valid(x_request_user, x_api_key): res.status = HTTPStatus.FORBIDDEN res.error = "Invalid credentials" return res.json() if fileb is None: res.status = HTTPStatus.BAD_REQUEST res.error = "File not found" return res.json() key = f'{s3key}/{job_no}/{fileb.filename}' logging.info(f'Key for S3 upload: {key}') try: # Read the file content directly from UploadFile file_content = await fileb.read() logging.info(f'File content length: {len(file_content)} bytes') # # Upload file object to S3 # # logging.info(f"Uploading file to S3 bucket '{AWS_S3_BUCKET_NAME}' with key '{key}'") # res = s3client.upload_file(AWS_S3_BUCKET_NAME, file, key) # logging.info("File uploaded successfully") except Exception as e: res.error = str(e) logging.warning(f"Error during upload: {res.error}") return res.json() def is_valid(u, p): return u == X_REQUEST_USER and p == X_API_KEY if __name__=='__main__': uvicorn.run('app:app', host='0.0.0.0', port=7860, reload=True)