Spaces:
Running
on
A10G
Running
on
A10G
from multiprocessing import Process, connection | |
from typing import Any, Dict, Optional, List, Deque | |
from collections import deque | |
import pyaudio | |
import io | |
import wave | |
class AudioRecorder: | |
def __init__( | |
self, | |
output_pipe: connection.Connection, | |
input_device_index: Optional[int] = None, | |
): | |
self.CHUNK: int = 1024 | |
self.FORMAT: int = pyaudio.paInt16 | |
self.CHANNELS: int = 1 | |
self.RATE: int = 44100 | |
self.RECORD_SECONDS: int = 1 | |
self.recording_process: Optional[Process] = None | |
self.audio_chunks: Deque[bytes] = deque(maxlen=2) | |
self.output_pipe: connection.Connection = output_pipe | |
self.input_device_index: Optional[int] = input_device_index | |
def list_microphones() -> List[Dict[str, Any]]: | |
"""List all available input devices with their properties""" | |
p = pyaudio.PyAudio() | |
devices = [] | |
for i in range(p.get_device_count()): | |
device_info = p.get_device_info_by_index(i) | |
if device_info["maxInputChannels"] > 0: # Only input devices | |
devices.append(device_info) | |
p.terminate() | |
return devices | |
def create_wav_bytes(self, frames: List[bytes]) -> bytes: | |
"""Convert raw audio frames to WAV format in memory""" | |
wav_buffer = io.BytesIO() | |
with wave.open(wav_buffer, "wb") as wf: | |
wf.setnchannels(self.CHANNELS) | |
wf.setsampwidth(pyaudio.get_sample_size(self.FORMAT)) | |
wf.setframerate(self.RATE) | |
wf.writeframes(b"".join(frames)) | |
return wav_buffer.getvalue() | |
def record_audio(self) -> None: | |
p = pyaudio.PyAudio() | |
while True: | |
stream = p.open( | |
format=self.FORMAT, | |
channels=self.CHANNELS, | |
rate=self.RATE, | |
input=True, | |
input_device_index=self.input_device_index, | |
frames_per_buffer=self.CHUNK, | |
) | |
frames: List[bytes] = [] | |
# Record for RECORD_SECONDS | |
for _ in range(0, int(self.RATE / self.CHUNK * self.RECORD_SECONDS)): | |
try: | |
data = stream.read(self.CHUNK, exception_on_overflow=False) | |
frames.append(data) | |
except OSError as e: | |
print(f"Warning: Audio input overflow occurred: {e}") | |
continue | |
stream.stop_stream() | |
stream.close() | |
# Convert to WAV format and add to rolling buffer | |
wav_bytes = self.create_wav_bytes(frames) | |
self.audio_chunks.append(wav_bytes) | |
# Send chunks through pipe if we have enough data | |
if len(self.audio_chunks) == 2: | |
self.output_pipe.send(b"".join(self.audio_chunks)) | |
def start_recording(self) -> None: | |
"""Démarre l'enregistrement dans un processus séparé""" | |
self.recording_process = Process(target=self.record_audio) | |
self.recording_process.start() | |
def stop_recording(self) -> None: | |
"""Arrête l'enregistrement""" | |
if self.recording_process: | |
self.recording_process.terminate() | |
self.recording_process = None | |