ParentalControl / server /AudioRecorder.py
GitLab CI
Update game build from GitLab CI
6831f1f
raw
history blame
3.27 kB
from multiprocessing import Process, connection
from typing import Any, Dict, Optional, List, Deque
from collections import deque
import pyaudio
import io
import wave
class AudioRecorder:
def __init__(
self,
output_pipe: connection.Connection,
input_device_index: Optional[int] = None,
):
self.CHUNK: int = 1024
self.FORMAT: int = pyaudio.paInt16
self.CHANNELS: int = 1
self.RATE: int = 44100
self.RECORD_SECONDS: int = 1
self.recording_process: Optional[Process] = None
self.audio_chunks: Deque[bytes] = deque(maxlen=2)
self.output_pipe: connection.Connection = output_pipe
self.input_device_index: Optional[int] = input_device_index
@staticmethod
def list_microphones() -> List[Dict[str, Any]]:
"""List all available input devices with their properties"""
p = pyaudio.PyAudio()
devices = []
for i in range(p.get_device_count()):
device_info = p.get_device_info_by_index(i)
if device_info["maxInputChannels"] > 0: # Only input devices
devices.append(device_info)
p.terminate()
return devices
def create_wav_bytes(self, frames: List[bytes]) -> bytes:
"""Convert raw audio frames to WAV format in memory"""
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, "wb") as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(pyaudio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(b"".join(frames))
return wav_buffer.getvalue()
def record_audio(self) -> None:
p = pyaudio.PyAudio()
while True:
stream = p.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
input_device_index=self.input_device_index,
frames_per_buffer=self.CHUNK,
)
frames: List[bytes] = []
# Record for RECORD_SECONDS
for _ in range(0, int(self.RATE / self.CHUNK * self.RECORD_SECONDS)):
try:
data = stream.read(self.CHUNK, exception_on_overflow=False)
frames.append(data)
except OSError as e:
print(f"Warning: Audio input overflow occurred: {e}")
continue
stream.stop_stream()
stream.close()
# Convert to WAV format and add to rolling buffer
wav_bytes = self.create_wav_bytes(frames)
self.audio_chunks.append(wav_bytes)
# Send chunks through pipe if we have enough data
if len(self.audio_chunks) == 2:
self.output_pipe.send(b"".join(self.audio_chunks))
def start_recording(self) -> None:
"""Démarre l'enregistrement dans un processus séparé"""
self.recording_process = Process(target=self.record_audio)
self.recording_process.start()
def stop_recording(self) -> None:
"""Arrête l'enregistrement"""
if self.recording_process:
self.recording_process.terminate()
self.recording_process = None