|
from fastapi import FastAPI, HTTPException, Request |
|
from fastapi.staticfiles import StaticFiles |
|
from pytubefix import YouTube |
|
from pytubefix.exceptions import PytubeFixError |
|
from concurrent.futures import ThreadPoolExecutor |
|
import asyncio |
|
import aiohttp |
|
import aiofiles |
|
import tempfile |
|
import uuid |
|
import base64 |
|
import io |
|
import os |
|
import random |
|
import traceback |
|
import string |
|
import json |
|
|
|
app = FastAPI() |
|
|
|
MODAL_BASE_URL = "https://sxqib--api-fastapi-app.modal.run" |
|
|
|
def generate_hash(length=12): |
|
|
|
characters = string.ascii_lowercase + string.digits |
|
|
|
hash_string = ''.join(random.choice(characters) for _ in range(length)) |
|
return hash_string |
|
|
|
@app.get("/") |
|
async def read_root(): |
|
return {"message": "Saqib's API"} |
|
|
|
|
|
AUDIO_DIR = "audio_files" |
|
os.makedirs(AUDIO_DIR, exist_ok=True) |
|
|
|
|
|
OUTPUT_DIR = "output" |
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
|
|
|
app.mount("/audio", StaticFiles(directory=AUDIO_DIR), name="audio") |
|
|
|
|
|
app.mount("/output", StaticFiles(directory=OUTPUT_DIR), name="output") |
|
|
|
thread_pool = ThreadPoolExecutor(max_workers=2) |
|
|
|
async def run_ffmpeg_async(ffmpeg_command): |
|
loop = asyncio.get_running_loop() |
|
await loop.run_in_executor(thread_pool, ffmpeg_command) |
|
|
|
async def download_file(url: str, suffix: str): |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=400, detail=f"Failed to download file from {url}") |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: |
|
temp_file.write(await response.read()) |
|
return temp_file.name |
|
|
|
@app.post("/add_audio_to_image") |
|
async def add_audio_to_image(request: Request): |
|
try: |
|
|
|
output_filename = f"{uuid.uuid4()}.mp4" |
|
output_path = os.path.join(OUTPUT_DIR, output_filename) |
|
|
|
|
|
data = await request.json() |
|
async with aiohttp.ClientSession() as session: |
|
async with session.post(f"{MODAL_BASE_URL}/add_audio_to_image", json=data) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=500, detail="Failed to process request") |
|
output_data = await response.read() |
|
async with aiofiles.open(output_path, "wb") as f: |
|
await f.write(output_data) |
|
|
|
|
|
return f"https://sxqib-api.hf.space/output/{output_filename}" |
|
except Exception as e: |
|
print(f"An error occurred: {str(e)}") |
|
print(traceback.format_exc()) |
|
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") |
|
|
|
@app.post("/add_audio_to_video") |
|
async def add_audio_to_video(request: Request): |
|
try: |
|
|
|
output_filename = f"{uuid.uuid4()}.mp4" |
|
output_path = os.path.join(OUTPUT_DIR, output_filename) |
|
|
|
|
|
data = await request.json() |
|
async with aiohttp.ClientSession() as session: |
|
async with session.post(f"{MODAL_BASE_URL}/add_audio_to_video", json=data) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=500, detail="Failed to process request") |
|
output_data = await response.read() |
|
async with aiofiles.open(output_path, "wb") as f: |
|
await f.write(output_data) |
|
|
|
|
|
return f"https://sxqib-api.hf.space/output/{output_filename}" |
|
except Exception as e: |
|
print(f"An error occurred: {str(e)}") |
|
print(traceback.format_exc()) |
|
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") |
|
|
|
@app.post("/concatenate_videos") |
|
async def concatenate_videos(request: Request): |
|
try: |
|
|
|
output_filename = f"{uuid.uuid4()}.mp4" |
|
output_path = os.path.join(OUTPUT_DIR, output_filename) |
|
|
|
|
|
data = await request.json() |
|
async with aiohttp.ClientSession() as session: |
|
async with session.post(f"{MODAL_BASE_URL}/concatenate_videos", json=data) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=500, detail="Failed to process request") |
|
output_data = await response.read() |
|
async with aiofiles.open(output_path, "wb") as f: |
|
await f.write(output_data) |
|
|
|
|
|
return f"https://sxqib-api.hf.space/output/{output_filename}" |
|
|
|
except Exception as e: |
|
print(f"An error occurred: {str(e)}") |
|
print(traceback.format_exc()) |
|
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") |
|
|
|
@app.post("/make_video") |
|
async def make_video(request: Request): |
|
try: |
|
|
|
output_filename = f"{uuid.uuid4()}.mp4" |
|
output_path = os.path.join(OUTPUT_DIR, output_filename) |
|
|
|
|
|
data = await request.json() |
|
async with aiohttp.ClientSession() as session: |
|
async with session.post(f"{MODAL_BASE_URL}/make_video", json=data) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=500, detail="Failed to process request") |
|
output_data = await response.read() |
|
async with aiofiles.open(output_path, "wb") as f: |
|
await f.write(output_data) |
|
|
|
|
|
return f"https://sxqib-api.hf.space/output/{output_filename}" |
|
|
|
except Exception as e: |
|
print(f"An error occurred: {str(e)}") |
|
print(traceback.format_exc()) |
|
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") |
|
|
|
@app.get("/get_youtube_audio") |
|
async def get_youtube_audio(request: Request): |
|
try: |
|
|
|
unique_filename = f"{uuid.uuid4().hex}.mp3" |
|
out_file = os.path.join(AUDIO_DIR, unique_filename) |
|
|
|
|
|
data = request.query_params |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(f"{MODAL_BASE_URL}/get_youtube_audio", params=data) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=500, detail="Failed to process request") |
|
audio_data = await response.read() |
|
async with aiofiles.open(out_file, "wb") as f: |
|
await f.write(audio_data) |
|
|
|
|
|
return f"https://sxqib-api.hf.space/audio/{unique_filename}" |
|
except Exception as e: |
|
print(f"An error occurred: {str(e)}") |
|
print(traceback.format_exc()) |
|
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") |
|
|
|
@app.post("/whisper") |
|
async def whisper(request: Request): |
|
data = await request.json() |
|
if "audio_url" not in data: |
|
raise HTTPException(status_code=400, detail="audio_url not found in request") |
|
url = data["audio_url"] |
|
|
|
headers = { |
|
'Accept': 'application/json, text/plain, */*', |
|
'Accept-Language': 'en-US,en;q=0.9', |
|
'Cache-Control': 'no-cache', |
|
'Connection': 'keep-alive', |
|
'Content-Type': 'application/json', |
|
'DNT': '1', |
|
'Origin': 'https://deepinfra.com', |
|
'Pragma': 'no-cache', |
|
'Referer': 'https://deepinfra.com/', |
|
'Sec-Fetch-Dest': 'empty', |
|
'Sec-Fetch-Mode': 'cors', |
|
'Sec-Fetch-Site': 'same-site', |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0', |
|
'sec-ch-ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"', |
|
'sec-ch-ua-mobile': '?0', |
|
'sec-ch-ua-platform': '"Windows"', |
|
} |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url) as resp: |
|
if resp.status != 200: |
|
return f"Failed to download audio: {resp.status}" |
|
audio_data = await resp.read() |
|
|
|
|
|
audio_base64 = base64.b64encode(audio_data).decode("utf-8") |
|
|
|
json_data = '{"audio": "' + audio_base64 + '"}' |
|
|
|
|
|
async with session.post(f'https://api.deepinfra.com/v1/inference/openai/whisper-large', headers=headers, data=json_data) as post_resp: |
|
if post_resp.status != 200: |
|
return f"API request failed: {post_resp.status}" |
|
return await post_resp.json() |
|
|
|
@app.post("/img2location") |
|
async def img2location(request: Request): |
|
request_json = await request.json() |
|
image_url = request_json.get("image_url", None) |
|
|
|
if not image_url: |
|
raise HTTPException(status_code=400, detail="image_url not found in request") |
|
|
|
headers = { |
|
'accept': '*/*', |
|
'accept-language': 'en-US,en;q=0.9', |
|
'cache-control': 'no-cache', |
|
'dnt': '1', |
|
'origin': 'https://geospy.ai', |
|
'pragma': 'no-cache', |
|
'priority': 'u=1, i', |
|
'referer': 'https://geospy.ai/', |
|
'sec-ch-ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"', |
|
'sec-ch-ua-mobile': '?0', |
|
'sec-ch-ua-platform': '"Windows"', |
|
'sec-fetch-dest': 'empty', |
|
'sec-fetch-mode': 'cors', |
|
'sec-fetch-site': 'cross-site', |
|
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0', |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
|
async with session.get(image_url) as img_response: |
|
if img_response.status != 200: |
|
return f"Failed to fetch image: HTTP {img_response.status}" |
|
image_data = await img_response.read() |
|
|
|
|
|
data = aiohttp.FormData() |
|
data.add_field('image', io.BytesIO(image_data), filename="image.png", content_type='image/png') |
|
|
|
|
|
async with session.post("https://locate-image-7cs5mab6na-uc.a.run.app/", headers=headers, data=data) as response: |
|
if response.status != 200: |
|
return f"Failed to upload image: HTTP {response.status}" |
|
json_response = await response.json() |
|
if json_response["message"]["latitude"] and json_response["message"]["longitude"]: |
|
json_response["message"]["latitude"] = str(json_response["message"]["latitude"]) |
|
json_response["message"]["longitude"] = str(json_response["message"]["longitude"]) |
|
return json_response |
|
|
|
@app.post("/pixart-sigma") |
|
async def pixart_sigma(request: Request): |
|
request_json = await request.json() |
|
prompt = request_json.get("prompt", None) |
|
negative_prompt = request_json.get("negative_prompt", "") |
|
style = request_json.get("style", "(No style)") |
|
use_negative_prompt = request_json.get("use_negative_prompt", True) |
|
num_imgs = request_json.get("num_imgs", 1) |
|
seed = request_json.get("seed", 0) |
|
width = request_json.get("width", 1024) |
|
height = request_json.get("height", 1024) |
|
schedule = request_json.get("schedule", "DPM-Solver") |
|
dpms_guidance_scale = request_json.get("dpms_guidance_scale", 4.5) |
|
sas_guidance_scale = request_json.get("sas_guidance_scale", 3) |
|
dpms_inference_steps = request_json.get("dpms_inference_steps", 14) |
|
sas_inference_steps = request_json.get("sas_inference_steps", 25) |
|
randomize_seed = request_json.get("randomize_seed", True) |
|
|
|
hash = generate_hash() |
|
|
|
headers = { |
|
'accept': '*/*' |
|
} |
|
|
|
params = { |
|
'__theme': 'light', |
|
} |
|
|
|
json_data = { |
|
'data': [ |
|
prompt, |
|
negative_prompt, |
|
style, |
|
use_negative_prompt, |
|
num_imgs, |
|
seed, |
|
width, |
|
height, |
|
schedule, |
|
dpms_guidance_scale, |
|
sas_guidance_scale, |
|
dpms_inference_steps, |
|
sas_inference_steps, |
|
randomize_seed, |
|
], |
|
'event_data': None, |
|
'fn_index': 3, |
|
'trigger_id': 7, |
|
'session_hash': hash, |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.post(f'https://pixart-alpha-pixart-sigma.hf.space/queue/join', params=params, headers=headers, json=json_data, ssl=False) as response: |
|
print(response.status) |
|
|
|
params = { |
|
'session_hash': hash, |
|
} |
|
|
|
async with session.get(f'https://pixart-alpha-pixart-sigma.hf.space/queue/data', params=params, headers=headers, ssl=False) as response: |
|
async for line in response.content: |
|
try: |
|
if line: |
|
line = line.decode('utf-8') |
|
line = line.replace('data: ', '') |
|
line_json = json.loads(line) |
|
if line_json["msg"] == "process_completed": |
|
image_url = line_json["output"]["data"][0][0]["image"]["url"] |
|
return {"image_url": image_url} |
|
except: |
|
pass |
|
|
|
|
|
|
|
|