Spaces:
Running
on
A10G
Running
on
A10G
import io | |
import json | |
from flask import Flask, send_from_directory, jsonify, request, abort | |
import os | |
import gunicorn.app.base | |
from flask_cors import CORS | |
from multiprocessing import Queue | |
import base64 | |
from typing import Any, Optional, List, Dict, Tuple | |
from queue import Queue | |
from server.AudioTranscriber import AudioTranscriber | |
from server.ActionProcessor import ActionProcessor | |
# Use a directory in the user's home folder for static files | |
STATIC_DIR = "/app/server/static" if os.getenv("DEBUG") != "true" else "./server" | |
audio_queue: "Queue[io.BytesIO]" = Queue() | |
text_queue: "Queue[str]" = Queue() | |
action_queue: "Queue[str]" = Queue() | |
app = Flask(__name__, static_folder=STATIC_DIR) | |
_ = CORS( | |
app, | |
origins=["*"], | |
methods=["GET", "POST", "OPTIONS"], | |
allow_headers=["Content-Type", "Authorization"], | |
) | |
def add_header(response): | |
# Add permissive CORS headers | |
response.headers["Access-Control-Allow-Origin"] = "*" | |
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS" | |
response.headers["Access-Control-Allow-Headers"] = "*" # Allow all headers | |
# Cross-origin isolation headers | |
response.headers["Cross-Origin-Embedder-Policy"] = "require-corp" | |
response.headers["Cross-Origin-Opener-Policy"] = "same-origin" | |
response.headers["Cross-Origin-Resource-Policy"] = "cross-origin" | |
return response | |
def serve_index(): | |
# Handle logs=container query parameter | |
if request.args.get("logs") == "container": | |
files = ( | |
os.listdir(app.static_folder) if os.path.exists(app.static_folder) else [] | |
) | |
return jsonify( | |
{ | |
"static_folder": app.static_folder, | |
"exists": os.path.exists(app.static_folder), | |
"files": files, | |
"pwd": os.getcwd(), | |
"user": os.getenv("USER"), | |
} | |
) | |
try: | |
response = send_from_directory(app.static_folder, "index.html") | |
response.headers["Cross-Origin-Opener-Policy"] = "same-origin" | |
response.headers["Cross-Origin-Embedder-Policy"] = "require-corp" | |
return response | |
except FileNotFoundError: | |
abort( | |
404, | |
description=f"Static folder or index.html not found. Static folder: {app.static_folder}", | |
) | |
def get_data(): | |
return jsonify({"message": "Voici vos données", "status": "success"}) | |
def process_data(): | |
print("Processing data") | |
try: | |
# Check content type | |
content_type = request.headers.get("Content-Type", "") | |
# Handle different content types | |
if "application/json" in content_type: | |
data = request.get_json() | |
audio_base64 = data.get("audio_chunk") | |
elif "multipart/form-data" in content_type: | |
audio_base64 = request.form.get("audio_chunk") | |
else: | |
# Try to get raw data | |
audio_base64 = request.get_data().decode("utf-8") | |
# Validate the incoming data | |
if not audio_base64: | |
return ( | |
jsonify({"error": "Missing audio_chunk in request", "status": "error"}), | |
400, | |
) | |
# Decode the base64 audio chunk | |
try: | |
audio_chunk = base64.b64decode(audio_base64) | |
except Exception as e: | |
return ( | |
jsonify( | |
{ | |
"error": f"Failed to decode audio chunk: {str(e)}", | |
"status": "error", | |
} | |
), | |
400, | |
) | |
# Put the audio chunk in the queue for processing | |
audio_queue.put(io.BytesIO(audio_chunk)) | |
return jsonify( | |
{ | |
"message": "Audio chunk received and queued for processing", | |
"status": "success", | |
} | |
) | |
except Exception as e: | |
return ( | |
jsonify( | |
{"error": f"Failed to process request: {str(e)}", "status": "error"} | |
), | |
500, | |
) | |
def get_actions() -> Tuple[Dict[str, Any], int]: | |
"""Retrieve and clear all pending actions from the queue""" | |
actions: List[Dict[str, Any]] = [] | |
# Drain the queue into our actions list | |
while not action_queue.empty(): | |
try: | |
actions.append(action_queue.get_nowait()) | |
except Exception: | |
break | |
return jsonify({"actions": json.dumps(actions), "status": "success"}), 200 | |
def serve_static(path: str): | |
try: | |
return send_from_directory(app.static_folder, path) | |
except FileNotFoundError: | |
abort(404, description=f"File {path} not found in static folder") | |
class StandaloneApplication(gunicorn.app.base.BaseApplication): | |
def __init__(self, app: Flask, options: Optional[Dict[str, Any]] = None): | |
self.options = options or {} | |
self.application = app | |
super().__init__() | |
def load_config(self): | |
for key, value in self.options.items(): | |
self.cfg.set(key.lower(), value) | |
def load(self): | |
return self.application | |
if __name__ == "__main__": | |
print(f"Static folder path: {app.static_folder}") | |
print(f"Static folder exists: {os.path.exists(app.static_folder)}") | |
if os.path.exists(app.static_folder): | |
print(f"Static folder contents: {os.listdir(app.static_folder)}") | |
os.makedirs(app.static_folder, exist_ok=True) | |
# Start the audio transcriber thread | |
transcriber = AudioTranscriber(audio_queue, text_queue) | |
transcriber.start() | |
# Start the action processor thread | |
action_processor = ActionProcessor(text_queue, action_queue) | |
action_processor.start() | |
options: Any = { | |
"bind": "0.0.0.0:7860", | |
"workers": 3, | |
"worker_class": "sync", | |
"timeout": 120, | |
"forwarded_allow_ips": "*", | |
} | |
StandaloneApplication(app, options).run() | |