from multiprocessing import Process, connection from typing import Any, Dict, Optional, List, Deque from collections import deque import pyaudio import io import wave class AudioRecorder: def __init__( self, output_pipe: connection.Connection, input_device_index: Optional[int] = None, ): self.CHUNK: int = 1024 self.FORMAT: int = pyaudio.paInt16 self.CHANNELS: int = 1 self.RATE: int = 44100 self.RECORD_SECONDS: int = 1 self.recording_process: Optional[Process] = None self.audio_chunks: Deque[bytes] = deque(maxlen=2) self.output_pipe: connection.Connection = output_pipe self.input_device_index: Optional[int] = input_device_index @staticmethod def list_microphones() -> List[Dict[str, Any]]: """List all available input devices with their properties""" p = pyaudio.PyAudio() devices = [] for i in range(p.get_device_count()): device_info = p.get_device_info_by_index(i) if device_info["maxInputChannels"] > 0: # Only input devices devices.append(device_info) p.terminate() return devices def create_wav_bytes(self, frames: List[bytes]) -> bytes: """Convert raw audio frames to WAV format in memory""" wav_buffer = io.BytesIO() with wave.open(wav_buffer, "wb") as wf: wf.setnchannels(self.CHANNELS) wf.setsampwidth(pyaudio.get_sample_size(self.FORMAT)) wf.setframerate(self.RATE) wf.writeframes(b"".join(frames)) return wav_buffer.getvalue() def record_audio(self) -> None: p = pyaudio.PyAudio() while True: stream = p.open( format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, input_device_index=self.input_device_index, frames_per_buffer=self.CHUNK, ) frames: List[bytes] = [] # Record for RECORD_SECONDS for _ in range(0, int(self.RATE / self.CHUNK * self.RECORD_SECONDS)): try: data = stream.read(self.CHUNK, exception_on_overflow=False) frames.append(data) except OSError as e: print(f"Warning: Audio input overflow occurred: {e}") continue stream.stop_stream() stream.close() # Convert to WAV format and add to rolling buffer wav_bytes = self.create_wav_bytes(frames) self.audio_chunks.append(wav_bytes) # Send chunks through pipe if we have enough data if len(self.audio_chunks) == 2: self.output_pipe.send(b"".join(self.audio_chunks)) def start_recording(self) -> None: """Démarre l'enregistrement dans un processus séparé""" self.recording_process = Process(target=self.record_audio) self.recording_process.start() def stop_recording(self) -> None: """Arrête l'enregistrement""" if self.recording_process: self.recording_process.terminate() self.recording_process = None