api / modules /app.py
saq1b's picture
Update modules/app.py
e3fe383 verified
raw
history blame
16.3 kB
from fastapi import FastAPI, HTTPException, Request
from fastapi.staticfiles import StaticFiles
from concurrent.futures import ThreadPoolExecutor
from youtube_transcript_api import YouTubeTranscriptApi
import asyncio
import aiohttp
import aiofiles
import tempfile
import uuid
import base64
import io
import os
import random
import traceback
import string
import json
app = FastAPI()
MODAL_BASE_URL = "https://sxqib--api-fastapi-app.modal.run"
def generate_hash(length=12):
# Characters that can appear in the hash
characters = string.ascii_lowercase + string.digits
# Generate a random string of the specified length
hash_string = ''.join(random.choice(characters) for _ in range(length))
return hash_string
@app.get("/")
async def read_root():
return {"message": "Saqib's API"}
# Create a directory to store MP3 files if it doesn't exist
AUDIO_DIR = "audio_files"
os.makedirs(AUDIO_DIR, exist_ok=True)
# Create a directory for storing output files
OUTPUT_DIR = "output"
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Mount the audio directory
app.mount("/audio", StaticFiles(directory=AUDIO_DIR), name="audio")
# Mount the output directory
app.mount("/output", StaticFiles(directory=OUTPUT_DIR), name="output")
thread_pool = ThreadPoolExecutor(max_workers=2)
async def run_ffmpeg_async(ffmpeg_command):
loop = asyncio.get_running_loop()
await loop.run_in_executor(thread_pool, ffmpeg_command)
async def download_file(url: str, suffix: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
raise HTTPException(status_code=400, detail=f"Failed to download file from {url}")
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
temp_file.write(await response.read())
return temp_file.name
@app.post("/add_audio_to_image")
async def add_audio_to_image(request: Request):
try:
# Generate a unique filename
output_filename = f"{uuid.uuid4()}.mp4"
output_path = os.path.join(OUTPUT_DIR, output_filename)
# Call the modal API with the request data and download the output file
data = await request.json()
async with aiohttp.ClientSession() as session:
async with session.post(f"{MODAL_BASE_URL}/add_audio_to_image", json=data) as response:
if response.status != 200:
raise HTTPException(status_code=500, detail="Failed to process request")
output_data = await response.read()
async with aiofiles.open(output_path, "wb") as f:
await f.write(output_data)
# Return the URL path to the output file
return f"https://sxqib-api.hf.space/output/{output_filename}"
except Exception as e:
print(f"An error occurred: {str(e)}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
@app.post("/add_audio_to_video")
async def add_audio_to_video(request: Request):
try:
# Generate a unique filename
output_filename = f"{uuid.uuid4()}.mp4"
output_path = os.path.join(OUTPUT_DIR, output_filename)
# Call the modal API with the request data and download the output file
data = await request.json()
async with aiohttp.ClientSession() as session:
async with session.post(f"{MODAL_BASE_URL}/add_audio_to_video", json=data) as response:
if response.status != 200:
raise HTTPException(status_code=500, detail="Failed to process request")
output_data = await response.read()
async with aiofiles.open(output_path, "wb") as f:
await f.write(output_data)
# Return the URL path to the output file
return f"https://sxqib-api.hf.space/output/{output_filename}"
except Exception as e:
print(f"An error occurred: {str(e)}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
@app.post("/concatenate_videos")
async def concatenate_videos(request: Request):
try:
# Generate a unique filename for the output
output_filename = f"{uuid.uuid4()}.mp4"
output_path = os.path.join(OUTPUT_DIR, output_filename)
# Call the modal API with the request data and download the output file
data = await request.json()
async with aiohttp.ClientSession() as session:
async with session.post(f"{MODAL_BASE_URL}/concatenate_videos", json=data) as response:
if response.status != 200:
raise HTTPException(status_code=500, detail="Failed to process request")
output_data = await response.read()
async with aiofiles.open(output_path, "wb") as f:
await f.write(output_data)
# Return the URL path to the output file
return f"https://sxqib-api.hf.space/output/{output_filename}"
except Exception as e:
print(f"An error occurred: {str(e)}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
@app.post("/concatenate_audio")
async def concatenate_audio(request: Request):
try:
# Generate a unique filename for the output
output_filename = f"{uuid.uuid4()}.mp3"
output_path = os.path.join(AUDIO_DIR, output_filename)
# Call the modal API with the request data and download the output file
data = await request.json()
async with aiohttp.ClientSession() as session:
async with session.post(f"{MODAL_BASE_URL}/concatenate_audio", json=data) as response:
if response.status != 200:
raise HTTPException(status_code=500, detail="Failed to process request")
output_data = await response.read()
async with aiofiles.open(output_path, "wb") as f:
await f.write(output_data)
# Return the URL path to the output file
return f"https://sxqib-api.hf.space/audio_files/{output_filename}"
except Exception as e:
print(f"An error occurred: {str(e)}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
@app.post("/make_video")
async def make_video(request: Request):
try:
# Generate a unique filename for the output
output_filename = f"{uuid.uuid4()}.mp4"
output_path = os.path.join(OUTPUT_DIR, output_filename)
# Call the modal API with the request data and download the output file
data = await request.json()
async with aiohttp.ClientSession() as session:
async with session.post(f"{MODAL_BASE_URL}/make_video", json=data) as response:
if response.status != 200:
raise HTTPException(status_code=500, detail="Failed to process request")
output_data = await response.read()
async with aiofiles.open(output_path, "wb") as f:
await f.write(output_data)
# Return the URL path to the output file
return f"https://sxqib-api.hf.space/output/{output_filename}"
except Exception as e:
print(f"An error occurred: {str(e)}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
@app.get("/get_youtube_audio")
async def get_youtube_audio(request: Request):
try:
# Generate a unique filename
unique_filename = f"{uuid.uuid4().hex}.mp3"
out_file = os.path.join(AUDIO_DIR, unique_filename)
# Call the modal API with the request parameters and download the output file
data = request.query_params
async with aiohttp.ClientSession() as session:
async with session.get(f"{MODAL_BASE_URL}/get_youtube_audio", params=data) as response:
if response.status != 200:
raise HTTPException(status_code=500, detail="Failed to process request")
audio_data = await response.read()
async with aiofiles.open(out_file, "wb") as f:
await f.write(audio_data)
# Return the URL path to the output file
return f"https://sxqib-api.hf.space/audio/{unique_filename}"
except Exception as e:
print(f"An error occurred: {str(e)}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
@app.get("/get_youtube_transcript")
async def get_youtube_transcript(request: Request):
try:
# Get the video ID from the query parameters
video_id = request.query_params.get("video_id")
if not video_id:
raise HTTPException(status_code=400, detail="video_id parameter is required")
# Get the transcript for the video
transcript = YouTubeTranscriptApi.get_transcript(video_id)
# Format the transcript into a single string
transcript_text = " ".join([i["text"] for i in transcript])
return {"transcript": transcript_text}
except Exception as e:
print(f"An error occurred: {str(e)}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
@app.post("/whisper")
async def whisper(request: Request):
data = await request.json() # Extracting JSON data from request
if "audio_url" not in data:
raise HTTPException(status_code=400, detail="audio_url not found in request")
url = data["audio_url"]
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'DNT': '1',
'Origin': 'https://deepinfra.com',
'Pragma': 'no-cache',
'Referer': 'https://deepinfra.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0',
'sec-ch-ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
# Async HTTP request to get the audio file
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status != 200:
return f"Failed to download audio: {resp.status}"
audio_data = await resp.read()
# Encode the audio data to base64
audio_base64 = base64.b64encode(audio_data).decode("utf-8")
json_data = '{"audio": "' + audio_base64 + '"}'
# Post request to the API
async with session.post(f'https://api.deepinfra.com/v1/inference/openai/whisper-large', headers=headers, data=json_data) as post_resp:
if post_resp.status != 200:
return f"API request failed: {post_resp.status}"
return await post_resp.json()
@app.post("/img2location")
async def img2location(request: Request):
request_json = await request.json()
image_url = request_json.get("image_url", None)
if not image_url:
raise HTTPException(status_code=400, detail="image_url not found in request")
headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'cache-control': 'no-cache',
'dnt': '1',
'origin': 'https://geospy.ai',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://geospy.ai/',
'sec-ch-ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0',
}
async with aiohttp.ClientSession() as session:
# Fetch the image from the URL
async with session.get(image_url) as img_response:
if img_response.status != 200:
return f"Failed to fetch image: HTTP {img_response.status}"
image_data = await img_response.read()
# Using BytesIO to handle the byte content
data = aiohttp.FormData()
data.add_field('image', io.BytesIO(image_data), filename="image.png", content_type='image/png')
# Sending the POST request
async with session.post("https://locate-image-7cs5mab6na-uc.a.run.app/", headers=headers, data=data) as response:
if response.status != 200:
return f"Failed to upload image: HTTP {response.status}"
json_response = await response.json()
if json_response["message"]["latitude"] and json_response["message"]["longitude"]:
json_response["message"]["latitude"] = str(json_response["message"]["latitude"])
json_response["message"]["longitude"] = str(json_response["message"]["longitude"])
return json_response
@app.post("/pixart-sigma")
async def pixart_sigma(request: Request):
request_json = await request.json()
prompt = request_json.get("prompt", None)
negative_prompt = request_json.get("negative_prompt", "")
style = request_json.get("style", "(No style)")
use_negative_prompt = request_json.get("use_negative_prompt", True)
num_imgs = request_json.get("num_imgs", 1)
seed = request_json.get("seed", 0)
width = request_json.get("width", 1024)
height = request_json.get("height", 1024)
schedule = request_json.get("schedule", "DPM-Solver")
dpms_guidance_scale = request_json.get("dpms_guidance_scale", 4.5)
sas_guidance_scale = request_json.get("sas_guidance_scale", 3)
dpms_inference_steps = request_json.get("dpms_inference_steps", 14)
sas_inference_steps = request_json.get("sas_inference_steps", 25)
randomize_seed = request_json.get("randomize_seed", True)
hash = generate_hash()
headers = {
'accept': '*/*'
}
params = {
'__theme': 'light',
}
json_data = {
'data': [
prompt,
negative_prompt,
style,
use_negative_prompt,
num_imgs,
seed,
width,
height,
schedule,
dpms_guidance_scale,
sas_guidance_scale,
dpms_inference_steps,
sas_inference_steps,
randomize_seed,
],
'event_data': None,
'fn_index': 3,
'trigger_id': 7,
'session_hash': hash,
}
async with aiohttp.ClientSession() as session:
async with session.post(f'https://pixart-alpha-pixart-sigma.hf.space/queue/join', params=params, headers=headers, json=json_data, ssl=False) as response:
print(response.status)
params = {
'session_hash': hash,
}
async with session.get(f'https://pixart-alpha-pixart-sigma.hf.space/queue/data', params=params, headers=headers, ssl=False) as response:
async for line in response.content:
try:
if line:
line = line.decode('utf-8')
line = line.replace('data: ', '')
line_json = json.loads(line)
if line_json["msg"] == "process_completed":
image_url = line_json["output"]["data"][0][0]["image"]["url"]
return {"image_url": image_url}
except:
pass
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)