semabox / main.py
Tri4's picture
Update main.py
de34940 verified
#https://products.aspose.app/audio/voice-recorder/wav
from flask import Flask, request, jsonify
from datetime import datetime
import whisper
import time
import pytz
import os
app = Flask(__name__)
@app.route("/")
def hello():
return "Semabox, listens to you!"
# Load the Whisper model
print("Loading Whisper model...\n", flush=True)
model = whisper.load_model("tiny")
print("\nWhisper model loaded.\n", flush=True)
# Get time of request
def get_time():
nairobi_timezone = pytz.timezone('Africa/Nairobi')
current_time_nairobi = datetime.now(nairobi_timezone)
curr_day = current_time_nairobi.strftime('%A')
curr_date = current_time_nairobi.strftime('%Y-%m-%d')
curr_time = current_time_nairobi.strftime('%H:%M:%S')
full_date = f"{curr_day} | {curr_date} | {curr_time}"
return full_date, curr_time
# Convert file size from bytes to KB or MB
def convert_size(bytes):
if bytes < 1024:
return f"{bytes} bytes"
elif bytes < 1024**2:
return f"{bytes / 1024:.2f} KB"
else:
return f"{bytes / 1024**2:.2f} MB"
def transcribe(audio_path):
#print(f" Transcribing audio from: {audio_path}", flush=True)
# Load audio and pad/trim it to fit 30 seconds
#print(" Loading and processing audio...", flush=True)
audio = whisper.load_audio(audio_path)
audio = whisper.pad_or_trim(audio)
# Make log-Mel spectrogram and move to the same device as the model
#print(" Creating log-Mel spectrogram...", flush=True)
mel = whisper.log_mel_spectrogram(audio).to(model.device)
# Detect the spoken language
#print(" Detecting language...", flush=True)
_, probs = model.detect_language(mel)
language = max(probs, key=probs.get)
#print(f" Detected language: {language}", flush=True)
# Decode the audio
#print(" Decoding audio...", flush=True)
options = whisper.DecodingOptions(fp16=False)
result = whisper.decode(model, mel, options)
print(" Transcription complete.", flush=True)
return result.text, language, result
@app.route('/transcribe', methods=['POST'])
def transcribe_audio():
# Record the time when the request was received
request_received_time, _ = get_time()
print(f"Query:- {request_received_time}", flush=True)
if 'audio' not in request.files:
print("Error: No audio file provided", flush=True)
return jsonify({"error": "No audio file provided"}), 400
audio_file = request.files['audio']
audio_file_size_bytes = len(audio_file.read()) # Calculate the size of the file in bytes
audio_file.seek(0) # Reset the file pointer after reading
audio_file_size = convert_size(audio_file_size_bytes) # Convert file size to KB or MB
# Save the uploaded audio file
audio_path = os.path.join("temp_audio", audio_file.filename)
os.makedirs("temp_audio", exist_ok=True)
audio_file.save(audio_path)
print(f" Audio file saved to: {audio_path} (Size: {audio_file_size})", flush=True)
# Record the time before starting transcription
transcription_start_time = time.time()
# Transcribe the audio
try:
transcription, language, srt = transcribe(audio_path)
except Exception as e:
print(f" Error during transcription: {str(e)}", flush=True)
return jsonify({"error": f"An error occurred: {str(e)}"}), 500
# Calculate the time taken for transcription
transcription_end_time = time.time()
transcription_duration = round(transcription_end_time - transcription_start_time, 2)
# Clean up the saved file
os.remove(audio_path)
print(f" Audio file removed from: {audio_path}\n", flush=True)
# Record the time when the response is being sent
response_sent_time, _ = get_time()
# Return the transcription, detected language, and timing information
#print(f" Transcription: {transcription}, Language: {language}, Processing Time: {transcription_duration}\n", flush=True)
print(f" \033[92mTranscription: {transcription}, Language: {language}, Processing Time: {transcription_duration}\033[0m\n", flush=True)
#print(srt, flush=True)
return jsonify({
"transcription": transcription,
"language": language,
"request_received_time": request_received_time,
"transcription_duration_seconds": transcription_duration,
"response_sent_time": response_sent_time,
"audio_file_size": audio_file_size
}), 200
@app.route('/healthcheck', methods=['GET'])
def healthcheck():
print("Received request at /healthcheck\n", flush=True)
return jsonify({"status": "API is running"}), 200
if __name__ == '__main__':
print("Starting Flask app...\n", flush=True)
app.run(host="0.0.0.0", port=5000)