from fastapi import FastAPI, HTTPException, Request from fastapi.staticfiles import StaticFiles from concurrent.futures import ThreadPoolExecutor from youtube_transcript_api import YouTubeTranscriptApi import asyncio import aiohttp import aiofiles import tempfile import uuid import base64 import io import os import random import traceback import string import json app = FastAPI() MODAL_BASE_URL = "https://sxqib--api-fastapi-app.modal.run" def generate_hash(length=12): # Characters that can appear in the hash characters = string.ascii_lowercase + string.digits # Generate a random string of the specified length hash_string = ''.join(random.choice(characters) for _ in range(length)) return hash_string @app.get("/") async def read_root(): return {"message": "Saqib's API"} # Create a directory to store MP3 files if it doesn't exist AUDIO_DIR = "audio_files" os.makedirs(AUDIO_DIR, exist_ok=True) # Create a directory for storing output files OUTPUT_DIR = "output" os.makedirs(OUTPUT_DIR, exist_ok=True) # Mount the audio directory app.mount("/audio", StaticFiles(directory=AUDIO_DIR), name="audio") # Mount the output directory app.mount("/output", StaticFiles(directory=OUTPUT_DIR), name="output") thread_pool = ThreadPoolExecutor(max_workers=2) async def run_ffmpeg_async(ffmpeg_command): loop = asyncio.get_running_loop() await loop.run_in_executor(thread_pool, ffmpeg_command) async def download_file(url: str, suffix: str): async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status != 200: raise HTTPException(status_code=400, detail=f"Failed to download file from {url}") with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: temp_file.write(await response.read()) return temp_file.name @app.post("/add_audio_to_image") async def add_audio_to_image(request: Request): try: # Generate a unique filename output_filename = f"{uuid.uuid4()}.mp4" output_path = os.path.join(OUTPUT_DIR, output_filename) # Call the modal API with the request data and download the output file data = await request.json() async with aiohttp.ClientSession() as session: async with session.post(f"{MODAL_BASE_URL}/add_audio_to_image", json=data) as response: if response.status != 200: raise HTTPException(status_code=500, detail="Failed to process request") output_data = await response.read() async with aiofiles.open(output_path, "wb") as f: await f.write(output_data) # Return the URL path to the output file return f"https://sxqib-api.hf.space/output/{output_filename}" except Exception as e: print(f"An error occurred: {str(e)}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") @app.post("/add_audio_to_video") async def add_audio_to_video(request: Request): try: # Generate a unique filename output_filename = f"{uuid.uuid4()}.mp4" output_path = os.path.join(OUTPUT_DIR, output_filename) # Call the modal API with the request data and download the output file data = await request.json() async with aiohttp.ClientSession() as session: async with session.post(f"{MODAL_BASE_URL}/add_audio_to_video", json=data) as response: if response.status != 200: raise HTTPException(status_code=500, detail="Failed to process request") output_data = await response.read() async with aiofiles.open(output_path, "wb") as f: await f.write(output_data) # Return the URL path to the output file return f"https://sxqib-api.hf.space/output/{output_filename}" except Exception as e: print(f"An error occurred: {str(e)}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") @app.post("/concatenate_videos") async def concatenate_videos(request: Request): try: # Generate a unique filename for the output output_filename = f"{uuid.uuid4()}.mp4" output_path = os.path.join(OUTPUT_DIR, output_filename) # Call the modal API with the request data and download the output file data = await request.json() async with aiohttp.ClientSession() as session: async with session.post(f"{MODAL_BASE_URL}/concatenate_videos", json=data) as response: if response.status != 200: raise HTTPException(status_code=500, detail="Failed to process request") output_data = await response.read() async with aiofiles.open(output_path, "wb") as f: await f.write(output_data) # Return the URL path to the output file return f"https://sxqib-api.hf.space/output/{output_filename}" except Exception as e: print(f"An error occurred: {str(e)}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") @app.post("/concatenate_audio") async def concatenate_audio(request: Request): try: # Generate a unique filename for the output output_filename = f"{uuid.uuid4()}.mp3" output_path = os.path.join(AUDIO_DIR, output_filename) # Call the modal API with the request data and download the output file data = await request.json() async with aiohttp.ClientSession() as session: async with session.post(f"{MODAL_BASE_URL}/concatenate_audio", json=data) as response: if response.status != 200: raise HTTPException(status_code=500, detail="Failed to process request") output_data = await response.read() async with aiofiles.open(output_path, "wb") as f: await f.write(output_data) # Return the URL path to the output file return f"https://sxqib-api.hf.space/audio_files/{output_filename}" except Exception as e: print(f"An error occurred: {str(e)}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") @app.post("/make_video") async def make_video(request: Request): try: # Generate a unique filename for the output output_filename = f"{uuid.uuid4()}.mp4" output_path = os.path.join(OUTPUT_DIR, output_filename) # Call the modal API with the request data and download the output file data = await request.json() async with aiohttp.ClientSession() as session: async with session.post(f"{MODAL_BASE_URL}/make_video", json=data) as response: if response.status != 200: raise HTTPException(status_code=500, detail="Failed to process request") output_data = await response.read() async with aiofiles.open(output_path, "wb") as f: await f.write(output_data) # Return the URL path to the output file return f"https://sxqib-api.hf.space/output/{output_filename}" except Exception as e: print(f"An error occurred: {str(e)}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") @app.get("/get_youtube_audio") async def get_youtube_audio(request: Request): try: # Generate a unique filename unique_filename = f"{uuid.uuid4().hex}.mp3" out_file = os.path.join(AUDIO_DIR, unique_filename) # Call the modal API with the request parameters and download the output file data = request.query_params async with aiohttp.ClientSession() as session: async with session.get(f"{MODAL_BASE_URL}/get_youtube_audio", params=data) as response: if response.status != 200: raise HTTPException(status_code=500, detail="Failed to process request") audio_data = await response.read() async with aiofiles.open(out_file, "wb") as f: await f.write(audio_data) # Return the URL path to the output file return f"https://sxqib-api.hf.space/audio/{unique_filename}" except Exception as e: print(f"An error occurred: {str(e)}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") @app.get("/get_youtube_transcript") async def get_youtube_transcript(request: Request): try: # Get the video ID from the query parameters video_id = request.query_params.get("video_id") if not video_id: raise HTTPException(status_code=400, detail="video_id parameter is required") # Get the transcript for the video transcript = YouTubeTranscriptApi.get_transcript(video_id) # Format the transcript into a single string transcript_text = " ".join([i["text"] for i in transcript]) return {"transcript": transcript_text} except Exception as e: print(f"An error occurred: {str(e)}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") @app.post("/whisper") async def whisper(request: Request): data = await request.json() # Extracting JSON data from request if "audio_url" not in data: raise HTTPException(status_code=400, detail="audio_url not found in request") url = data["audio_url"] headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.9', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/json', 'DNT': '1', 'Origin': 'https://deepinfra.com', 'Pragma': 'no-cache', 'Referer': 'https://deepinfra.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-site', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0', 'sec-ch-ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', } # Async HTTP request to get the audio file async with aiohttp.ClientSession() as session: async with session.get(url) as resp: if resp.status != 200: return f"Failed to download audio: {resp.status}" audio_data = await resp.read() # Encode the audio data to base64 audio_base64 = base64.b64encode(audio_data).decode("utf-8") json_data = '{"audio": "' + audio_base64 + '"}' # Post request to the API async with session.post(f'https://api.deepinfra.com/v1/inference/openai/whisper-large', headers=headers, data=json_data) as post_resp: if post_resp.status != 200: return f"API request failed: {post_resp.status}" return await post_resp.json() @app.post("/img2location") async def img2location(request: Request): request_json = await request.json() image_url = request_json.get("image_url", None) if not image_url: raise HTTPException(status_code=400, detail="image_url not found in request") headers = { 'accept': '*/*', 'accept-language': 'en-US,en;q=0.9', 'cache-control': 'no-cache', 'dnt': '1', 'origin': 'https://geospy.ai', 'pragma': 'no-cache', 'priority': 'u=1, i', 'referer': 'https://geospy.ai/', 'sec-ch-ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'cross-site', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0', } async with aiohttp.ClientSession() as session: # Fetch the image from the URL async with session.get(image_url) as img_response: if img_response.status != 200: return f"Failed to fetch image: HTTP {img_response.status}" image_data = await img_response.read() # Using BytesIO to handle the byte content data = aiohttp.FormData() data.add_field('image', io.BytesIO(image_data), filename="image.png", content_type='image/png') # Sending the POST request async with session.post("https://locate-image-7cs5mab6na-uc.a.run.app/", headers=headers, data=data) as response: if response.status != 200: return f"Failed to upload image: HTTP {response.status}" json_response = await response.json() if json_response["message"]["latitude"] and json_response["message"]["longitude"]: json_response["message"]["latitude"] = str(json_response["message"]["latitude"]) json_response["message"]["longitude"] = str(json_response["message"]["longitude"]) return json_response @app.post("/pixart-sigma") async def pixart_sigma(request: Request): request_json = await request.json() prompt = request_json.get("prompt", None) negative_prompt = request_json.get("negative_prompt", "") style = request_json.get("style", "(No style)") use_negative_prompt = request_json.get("use_negative_prompt", True) num_imgs = request_json.get("num_imgs", 1) seed = request_json.get("seed", 0) width = request_json.get("width", 1024) height = request_json.get("height", 1024) schedule = request_json.get("schedule", "DPM-Solver") dpms_guidance_scale = request_json.get("dpms_guidance_scale", 4.5) sas_guidance_scale = request_json.get("sas_guidance_scale", 3) dpms_inference_steps = request_json.get("dpms_inference_steps", 14) sas_inference_steps = request_json.get("sas_inference_steps", 25) randomize_seed = request_json.get("randomize_seed", True) hash = generate_hash() headers = { 'accept': '*/*' } params = { '__theme': 'light', } json_data = { 'data': [ prompt, negative_prompt, style, use_negative_prompt, num_imgs, seed, width, height, schedule, dpms_guidance_scale, sas_guidance_scale, dpms_inference_steps, sas_inference_steps, randomize_seed, ], 'event_data': None, 'fn_index': 3, 'trigger_id': 7, 'session_hash': hash, } async with aiohttp.ClientSession() as session: async with session.post(f'https://pixart-alpha-pixart-sigma.hf.space/queue/join', params=params, headers=headers, json=json_data, ssl=False) as response: print(response.status) params = { 'session_hash': hash, } async with session.get(f'https://pixart-alpha-pixart-sigma.hf.space/queue/data', params=params, headers=headers, ssl=False) as response: async for line in response.content: try: if line: line = line.decode('utf-8') line = line.replace('data: ', '') line_json = json.loads(line) if line_json["msg"] == "process_completed": image_url = line_json["output"]["data"][0][0]["image"]["url"] return {"image_url": image_url} except: pass # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="0.0.0.0", port=8000)