from fastapi import FastAPI, HTTPException, Request |
from pydantic import BaseModel |
import os |
import uuid |
import subprocess |
import requests |
import supabase |
import azure.cognitiveservices.speech as speechsdk |
from typing import List |
app = FastAPI() |
SUPABASE_SERVICE_ROLE_KEY = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU' |
NEXT_PUBLIC_AZURE_SPEECH_KEY = '2ffcc0ab85c747a09390a051c4399f81' |
supabase_client = supabase.create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) |
print(f"SUPABASE_SERVICE_ROLE_KEY: {'Exists' if SUPABASE_SERVICE_ROLE_KEY else 'Missing'}") |
print(f"AZURE_SPEECH_KEY: {'Exists' if AZURE_SPEECH_KEY else 'Missing Azure Speech Key'}") |
print(f"AZURE_SPEECH_REGION: {'Exists' if AZURE_SPEECH_REGION else 'Missing Azure Speech Region'}") |
class YouTubeData(BaseModel): |
youtubeUrl: List[str] |
userId: str |
async def download_and_save_audio(video_url: str, file_id: str) -> str: |
temp_file_path = f"/tmp/{file_id}.wav" |
command = [ |
"yt-dlp", "--extract-audio", "--audio-format", "wav", "--output", temp_file_path, video_url |
] |
try: |
subprocess.run(command, check=True) |
return temp_file_path |
except subprocess.CalledProcessError as e: |
raise HTTPException(status_code=500, detail=f"yt-dlp failed: {str(e)}") |
async def transcribe_audio(file_path: str) -> str: |
speech_config = speechsdk.SpeechConfig(subscription=AZURE_SPEECH_KEY, region=AZURE_SPEECH_REGION) |
speech_config.speech_recognition_language = "en-US" |
audio_config = speechsdk.AudioConfig(filename=file_path) |
recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_config) |
result = recognizer.recognize_once() |
if result.reason == speechsdk.ResultReason.RecognizedSpeech: |
return result.text |
else: |
raise HTTPException(status_code=500, detail=f"Speech recognition failed: {result.error_details}") |
async def upload_to_supabase(file_path: str, user_id: str, file_id: str): |
with open(file_path, "rb") as f: |
file_data = f.read() |
response = supabase_client.storage.from_("files").upload(f"{user_id}/{file_id}.txt", file_data, content_type="text/plain", upsert=True) |
return response |
@app.post("/transcribe") |
async def transcribe_youtube_audio(data: YouTubeData): |
combined_transcription = "" |
for url in data.youtubeUrl: |
file_id = str(uuid.uuid4()) |
temp_file_path = await download_and_save_audio(url, file_id) |
transcription = await transcribe_audio(temp_file_path) |
combined_transcription += f"\nTranscription for URL ({url}):\n{transcription}\n" |
os.remove(temp_file_path) |
file_id = str(uuid.uuid4()) |
file_name = f"{file_id}-transcription.txt" |
temp_transcription_path = f"/tmp/{file_name}" |
with open(temp_transcription_path, "w", encoding="utf-8") as f: |
f.write(combined_transcription) |
upload_response = await upload_to_supabase(temp_transcription_path, data.userId, file_id) |
os.remove(temp_transcription_path) |
return {"success": True, "file": {"id": file_id}} |