ParentalControl / server /AudioRecorder.py
GitLab CI
Update game build from GitLab CI
726ec90
raw
history blame
4.96 kB
from multiprocessing import Process, Pipe, connection
from typing import Optional, List, Deque
from collections import deque
import pyaudio
import time
import io
import wave
class AudioRecorder:
def __init__(
self,
output_pipe: connection.Connection,
input_device_index: Optional[int] = None,
):
self.CHUNK: int = 1024
self.FORMAT: int = pyaudio.paInt16
self.CHANNELS: int = 1
self.RATE: int = 44100
self.RECORD_SECONDS: int = 1
self.recording_process: Optional[Process] = None
self.audio_chunks: Deque[bytes] = deque(maxlen=2)
self.output_pipe: connection.Connection = output_pipe
self.input_device_index: Optional[int] = input_device_index
@staticmethod
def list_microphones() -> List[dict]:
"""List all available input devices with their properties"""
p = pyaudio.PyAudio()
devices = []
for i in range(p.get_device_count()):
device_info = p.get_device_info_by_index(i)
if device_info["maxInputChannels"] > 0: # Only input devices
devices.append(device_info)
p.terminate()
return devices
def create_wav_bytes(self, frames: List[bytes]) -> bytes:
"""Convert raw audio frames to WAV format in memory"""
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, "wb") as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(pyaudio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(b"".join(frames))
return wav_buffer.getvalue()
def record_audio(self) -> None:
p = pyaudio.PyAudio()
while True:
stream = p.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
input_device_index=self.input_device_index,
frames_per_buffer=self.CHUNK,
)
frames: List[bytes] = []
# Record for RECORD_SECONDS
for _ in range(0, int(self.RATE / self.CHUNK * self.RECORD_SECONDS)):
try:
data = stream.read(self.CHUNK, exception_on_overflow=False)
frames.append(data)
except OSError as e:
print(f"Warning: Audio input overflow occurred: {e}")
continue
stream.stop_stream()
stream.close()
# Convert to WAV format and add to rolling buffer
wav_bytes = self.create_wav_bytes(frames)
self.audio_chunks.append(wav_bytes)
# Send chunks through pipe if we have enough data
if len(self.audio_chunks) == 2:
self.output_pipe.send(b"".join(self.audio_chunks))
def start_recording(self) -> None:
"""Démarre l'enregistrement dans un processus séparé"""
self.recording_process = Process(target=self.record_audio)
self.recording_process.start()
def stop_recording(self) -> None:
"""Arrête l'enregistrement"""
if self.recording_process:
self.recording_process.terminate()
self.recording_process = None
def transcription_process(input_pipe: connection.Connection) -> None:
from server.tts import AudioTranscriber # Import here to avoid thread locks
transcriber = AudioTranscriber()
while True:
try:
audio_data = input_pipe.recv()
audio_buffer = io.BytesIO(audio_data)
audio_buffer.seek(0)
segments, info = transcriber.transcribe_audio_bytes(
audio_buffer, language="fr"
)
transcriber.print_segments(segments)
except Exception as e:
print(f"Transcription error: {e}")
if __name__ == "__main__":
# List available microphones
microphones = AudioRecorder.list_microphones()
print("\nAvailable microphones:")
for i, device in enumerate(microphones):
print(f"{i}: {device['name']}")
# Ask for microphone selection
selected_index = int(input("\nSelect microphone index: "))
device_index = microphones[selected_index]["index"]
# Create pipe for communication between processes
recorder_conn, transcriber_conn = Pipe()
# Create and start transcription process
transcription_proc = Process(target=transcription_process, args=(transcriber_conn,))
transcription_proc.start()
# Create and start recorder with selected device
recorder = AudioRecorder(recorder_conn, input_device_index=device_index)
print("Début de l'enregistrement... Appuyez sur Ctrl+C pour arrêter")
try:
recorder.start_recording()
while True:
time.sleep(1)
except KeyboardInterrupt:
recorder.stop_recording()
transcription_proc.terminate()
print("\nEnregistrement arrêté")