Spaces:
Runtime error
Runtime error
File size: 2,109 Bytes
f46c80d 4672515 9cdc5fa 4672515 f46c80d da90409 4672515 f46c80d 9cdc5fa f46c80d 9cdc5fa 4672515 a6dcf15 4672515 9cdc5fa f115402 4672515 f115402 4672515 9cdc5fa a6dcf15 4672515 a9f9972 9cdc5fa 4672515 a6dcf15 4672515 48b0c57 9cdc5fa 4672515 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import os
import yt_dlp
import uuid
from typing import Dict
from faster_whisper import WhisperModel
from video import download_convert_video_to_audio
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create a dictionary to store the status of the current transcribing process
# This will be updated as the process progresses
# The key is a string describing the current process, and the value is a string describing the current status
status: Dict[str, str] = {}
def segment_to_dict(segment):
segment = segment._asdict()
if segment["words"] is not None:
segment["words"] = [word._asdict() for word in segment["words"]]
return segment
@app.post("/video")
async def download_video(video_url: str):
status["video"] = "Downloading..."
download_convert_video_to_audio(yt_dlp, video_url, f"/home/user/{uuid.uuid4().hex}")
status["video"] = "Downloaded"
@app.post("/transcribe")
async def transcribe_video(video_url: str, beam_size: int = 5, model_size: str = "tiny", word_timestamps: bool = True):
status["model"] = "Loading..."
model = WhisperModel(model_size, device="cpu", compute_type="int8")
status["model"] = "Loaded"
status["video"] = "Downloading..."
rand_id = uuid.uuid4().hex
download_convert_video_to_audio(yt_dlp, video_url, f"/home/user/{rand_id}")
status["video"] = "Downloaded"
status["transcription"] = "Transcribing..."
segments, info = model.transcribe(f"/home/user/{rand_id}.mp3", beam_size=beam_size, word_timestamps=word_timestamps)
segments = [segment_to_dict(segment) for segment in segments]
total_duration = round(info.duration, 2)
os.remove(f"/home/user/{rand_id}.mp3")
status["transcription"] = "Completed"
status["language"] = "Detected language '%s' with probability %f" % (info.language, info.language_probability)
return segments
@app.get("/status")
async def get_status():
return status
|