import io from typing import List import threading from multiprocessing import Queue from queue import Empty from faster_whisper import WhisperModel import logging import sys # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) # Get a logger for your app logger = logging.getLogger(__name__) class AudioTranscriber(threading.Thread): def __init__( self, audio_queue: "Queue[io.BytesIO]", text_queue: "Queue[str]", language: str = "en", confidence_threshold: float = 0.5, ): super().__init__() self.audio_queue = audio_queue self.action_queue = text_queue self.daemon = True # Thread will exit when main program exits self.max_buffer_size = 4 self.language = language self.confidence_threshold = confidence_threshold self.buffer: List[io.BytesIO] = [] self.transcriber = WhisperModel( "large", device="cuda", compute_type="int8", ) def run(self): while True: try: # Wait for 1 second before timing out and checking again audio_chunk = self.audio_queue.get(timeout=1) self.buffer.append(audio_chunk) while len(self.buffer) >= self.max_buffer_size: _ = self.buffer.pop(0) # Create a BytesIO object from the joined buffer joined_buffer = io.BytesIO( b"".join([chunk.getvalue() for chunk in self.buffer]) ) segments, info = self.transcriber.transcribe( joined_buffer, language=self.language ) # Put the transcription results in the output queue for segment in segments: if segment.no_speech_prob <= self.confidence_threshold: self.action_queue.put(segment.text) # Still print for debugging logger.info( f"[{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text}" ) else: self.action_queue.put("") except Empty: continue # If queue is empty, continue waiting except Exception as e: logger.error(f"Error processing audio chunk: {e}")