|
from fastapi import FastAPI, HTTPException, Request |
|
from pydantic import BaseModel |
|
import os |
|
import uuid |
|
import subprocess |
|
import requests |
|
import supabase |
|
import azure.cognitiveservices.speech as speechsdk |
|
from typing import List |
|
|
|
app = FastAPI() |
|
|
|
|
|
SUPABASE_URL = 'http://127.0.0.1:54321' |
|
SUPABASE_SERVICE_ROLE_KEY = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU' |
|
NEXT_PUBLIC_AZURE_SPEECH_KEY = '2ffcc0ab85c747a09390a051c4399f81' |
|
NEXT_PUBLIC_AZURE_SPEECH_REGION = 'westeurope' |
|
|
|
|
|
SUPABASE_URL = ("SUPABASE_URL") |
|
SUPABASE_SERVICE_ROLE_KEY = ("SUPABASE_SERVICE_ROLE_KEY") |
|
supabase_client = supabase.create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) |
|
print(f"SUPABASE_URL: {SUPABASE_URL}") |
|
print(f"SUPABASE_SERVICE_ROLE_KEY: {'Exists' if SUPABASE_SERVICE_ROLE_KEY else 'Missing'}") |
|
|
|
|
|
AZURE_SPEECH_KEY = ("NEXT_PUBLIC_AZURE_SPEECH_KEY") |
|
AZURE_SPEECH_REGION = ("NEXT_PUBLIC_AZURE_SPEECH_REGION") |
|
print(f"AZURE_SPEECH_KEY: {'Exists' if AZURE_SPEECH_KEY else 'Missing Azure Speech Key'}") |
|
print(f"AZURE_SPEECH_REGION: {'Exists' if AZURE_SPEECH_REGION else 'Missing Azure Speech Region'}") |
|
|
|
class YouTubeData(BaseModel): |
|
youtubeUrl: List[str] |
|
userId: str |
|
|
|
|
|
async def download_and_save_audio(video_url: str, file_id: str) -> str: |
|
temp_file_path = f"/tmp/{file_id}.wav" |
|
command = [ |
|
"yt-dlp", "--extract-audio", "--audio-format", "wav", "--output", temp_file_path, video_url |
|
] |
|
try: |
|
subprocess.run(command, check=True) |
|
return temp_file_path |
|
except subprocess.CalledProcessError as e: |
|
raise HTTPException(status_code=500, detail=f"yt-dlp failed: {str(e)}") |
|
|
|
|
|
async def transcribe_audio(file_path: str) -> str: |
|
speech_config = speechsdk.SpeechConfig(subscription=AZURE_SPEECH_KEY, region=AZURE_SPEECH_REGION) |
|
speech_config.speech_recognition_language = "en-US" |
|
audio_config = speechsdk.AudioConfig(filename=file_path) |
|
recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_config) |
|
result = recognizer.recognize_once() |
|
if result.reason == speechsdk.ResultReason.RecognizedSpeech: |
|
return result.text |
|
else: |
|
raise HTTPException(status_code=500, detail=f"Speech recognition failed: {result.error_details}") |
|
|
|
|
|
async def upload_to_supabase(file_path: str, user_id: str, file_id: str): |
|
with open(file_path, "rb") as f: |
|
file_data = f.read() |
|
response = supabase_client.storage.from_("files").upload(f"{user_id}/{file_id}.txt", file_data, content_type="text/plain", upsert=True) |
|
return response |
|
|
|
@app.post("/transcribe") |
|
async def transcribe_youtube_audio(data: YouTubeData): |
|
combined_transcription = "" |
|
|
|
for url in data.youtubeUrl: |
|
file_id = str(uuid.uuid4()) |
|
temp_file_path = await download_and_save_audio(url, file_id) |
|
transcription = await transcribe_audio(temp_file_path) |
|
combined_transcription += f"\nTranscription for URL ({url}):\n{transcription}\n" |
|
os.remove(temp_file_path) |
|
|
|
file_id = str(uuid.uuid4()) |
|
file_name = f"{file_id}-transcription.txt" |
|
temp_transcription_path = f"/tmp/{file_name}" |
|
|
|
with open(temp_transcription_path, "w", encoding="utf-8") as f: |
|
f.write(combined_transcription) |
|
|
|
upload_response = await upload_to_supabase(temp_transcription_path, data.userId, file_id) |
|
os.remove(temp_transcription_path) |
|
|
|
return {"success": True, "file": {"id": file_id}} |
|
|