|
|
|
|
|
from flask import Flask, request, jsonify |
|
from datetime import datetime |
|
import whisper |
|
import time |
|
import pytz |
|
import os |
|
|
|
app = Flask(__name__) |
|
|
|
@app.route("/") |
|
def hello(): |
|
return "Semabox, listens to you!" |
|
|
|
|
|
print("Loading Whisper model...\n", flush=True) |
|
model = whisper.load_model("tiny") |
|
print("\nWhisper model loaded.\n", flush=True) |
|
|
|
|
|
def get_time(): |
|
nairobi_timezone = pytz.timezone('Africa/Nairobi') |
|
current_time_nairobi = datetime.now(nairobi_timezone) |
|
|
|
curr_day = current_time_nairobi.strftime('%A') |
|
curr_date = current_time_nairobi.strftime('%Y-%m-%d') |
|
curr_time = current_time_nairobi.strftime('%H:%M:%S') |
|
|
|
full_date = f"{curr_day} | {curr_date} | {curr_time}" |
|
return full_date, curr_time |
|
|
|
|
|
def convert_size(bytes): |
|
if bytes < 1024: |
|
return f"{bytes} bytes" |
|
elif bytes < 1024**2: |
|
return f"{bytes / 1024:.2f} KB" |
|
else: |
|
return f"{bytes / 1024**2:.2f} MB" |
|
|
|
def transcribe(audio_path): |
|
|
|
|
|
|
|
|
|
audio = whisper.load_audio(audio_path) |
|
audio = whisper.pad_or_trim(audio) |
|
|
|
|
|
|
|
mel = whisper.log_mel_spectrogram(audio).to(model.device) |
|
|
|
|
|
|
|
_, probs = model.detect_language(mel) |
|
language = max(probs, key=probs.get) |
|
|
|
|
|
|
|
|
|
options = whisper.DecodingOptions(fp16=False) |
|
result = whisper.decode(model, mel, options) |
|
|
|
print(" Transcription complete.", flush=True) |
|
return result.text, language, result |
|
|
|
@app.route('/transcribe', methods=['POST']) |
|
def transcribe_audio(): |
|
|
|
request_received_time, _ = get_time() |
|
print(f"Query:- {request_received_time}", flush=True) |
|
|
|
if 'audio' not in request.files: |
|
print("Error: No audio file provided", flush=True) |
|
return jsonify({"error": "No audio file provided"}), 400 |
|
|
|
audio_file = request.files['audio'] |
|
audio_file_size_bytes = len(audio_file.read()) |
|
audio_file.seek(0) |
|
audio_file_size = convert_size(audio_file_size_bytes) |
|
|
|
|
|
audio_path = os.path.join("temp_audio", audio_file.filename) |
|
os.makedirs("temp_audio", exist_ok=True) |
|
audio_file.save(audio_path) |
|
print(f" Audio file saved to: {audio_path} (Size: {audio_file_size})", flush=True) |
|
|
|
|
|
transcription_start_time = time.time() |
|
|
|
|
|
try: |
|
transcription, language, srt = transcribe(audio_path) |
|
except Exception as e: |
|
print(f" Error during transcription: {str(e)}", flush=True) |
|
return jsonify({"error": f"An error occurred: {str(e)}"}), 500 |
|
|
|
|
|
transcription_end_time = time.time() |
|
transcription_duration = round(transcription_end_time - transcription_start_time, 2) |
|
|
|
|
|
os.remove(audio_path) |
|
print(f" Audio file removed from: {audio_path}\n", flush=True) |
|
|
|
|
|
response_sent_time, _ = get_time() |
|
|
|
|
|
|
|
print(f" \033[92mTranscription: {transcription}, Language: {language}, Processing Time: {transcription_duration}\033[0m\n", flush=True) |
|
|
|
return jsonify({ |
|
"transcription": transcription, |
|
"language": language, |
|
"request_received_time": request_received_time, |
|
"transcription_duration_seconds": transcription_duration, |
|
"response_sent_time": response_sent_time, |
|
"audio_file_size": audio_file_size |
|
}), 200 |
|
|
|
@app.route('/healthcheck', methods=['GET']) |
|
def healthcheck(): |
|
print("Received request at /healthcheck\n", flush=True) |
|
return jsonify({"status": "API is running"}), 200 |
|
|
|
if __name__ == '__main__': |
|
print("Starting Flask app...\n", flush=True) |
|
app.run(host="0.0.0.0", port=5000) |
|
|