File size: 2,109 Bytes
f46c80d
4672515
 
9cdc5fa
 
4672515
 
 
 
f46c80d
 
da90409
 
 
 
 
 
 
 
 
4672515
 
 
 
f46c80d
9cdc5fa
 
 
 
 
f46c80d
9cdc5fa
 
4672515
a6dcf15
4672515
9cdc5fa
 
f115402
4672515
f115402
4672515
 
9cdc5fa
a6dcf15
4672515
 
a9f9972
9cdc5fa
4672515
a6dcf15
4672515
 
48b0c57
9cdc5fa
4672515
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import os
import yt_dlp
import uuid
from typing import Dict

from faster_whisper import WhisperModel
from video import download_convert_video_to_audio

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Create a dictionary to store the status of the current transcribing process
# This will be updated as the process progresses
# The key is a string describing the current process, and the value is a string describing the current status
status: Dict[str, str] = {}

def segment_to_dict(segment):
    segment = segment._asdict()
    if segment["words"] is not None:
        segment["words"] = [word._asdict() for word in segment["words"]]
    return segment

@app.post("/video")
async def download_video(video_url: str):
    status["video"] = "Downloading..."
    download_convert_video_to_audio(yt_dlp, video_url, f"/home/user/{uuid.uuid4().hex}")
    status["video"] = "Downloaded"

@app.post("/transcribe")
async def transcribe_video(video_url: str, beam_size: int = 5, model_size: str = "tiny", word_timestamps: bool = True):
    status["model"] = "Loading..."
    model = WhisperModel(model_size, device="cpu", compute_type="int8")
    status["model"] = "Loaded"
    status["video"] = "Downloading..."
    rand_id = uuid.uuid4().hex
    download_convert_video_to_audio(yt_dlp, video_url, f"/home/user/{rand_id}")
    status["video"] = "Downloaded"
    status["transcription"] = "Transcribing..."
    segments, info = model.transcribe(f"/home/user/{rand_id}.mp3", beam_size=beam_size, word_timestamps=word_timestamps)
    segments = [segment_to_dict(segment) for segment in segments]
    total_duration = round(info.duration, 2)
    os.remove(f"/home/user/{rand_id}.mp3")
    status["transcription"] = "Completed"
    status["language"] = "Detected language '%s' with probability %f" % (info.language, info.language_probability)
    return segments

@app.get("/status")
async def get_status():
    return status