from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware import os import yt_dlp import uuid from typing import Dict from faster_whisper import WhisperModel from video import download_convert_video_to_audio app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Create a dictionary to store the status of the current transcribing process # This will be updated as the process progresses # The key is a string describing the current process, and the value is a string describing the current status status: Dict[str, str] = {} def segment_to_dict(segment): segment = segment._asdict() if segment["words"] is not None: segment["words"] = [word._asdict() for word in segment["words"]] return segment @app.post("/video") async def download_video(video_url: str): status["video"] = "Downloading..." download_convert_video_to_audio(yt_dlp, video_url, f"/home/user/{uuid.uuid4().hex}") status["video"] = "Downloaded" @app.post("/transcribe") async def transcribe_video(video_url: str, beam_size: int = 5, model_size: str = "tiny", word_timestamps: bool = True): status["model"] = "Loading..." model = WhisperModel(model_size, device="cpu", compute_type="int8") status["model"] = "Loaded" status["video"] = "Downloading..." rand_id = uuid.uuid4().hex download_convert_video_to_audio(yt_dlp, video_url, f"/home/user/{rand_id}") status["video"] = "Downloaded" status["transcription"] = "Transcribing..." segments, info = model.transcribe(f"/home/user/{rand_id}.mp3", beam_size=beam_size, word_timestamps=word_timestamps) segments = [segment_to_dict(segment) for segment in segments] total_duration = round(info.duration, 2) os.remove(f"/home/user/{rand_id}.mp3") status["transcription"] = "Completed" status["language"] = "Detected language '%s' with probability %f" % (info.language, info.language_probability) return segments @app.get("/status") async def get_status(): return status