File size: 3,267 Bytes
6831f1f
 
726ec90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6831f1f
726ec90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from multiprocessing import Process, connection
from typing import Any, Dict, Optional, List, Deque
from collections import deque
import pyaudio
import io
import wave


class AudioRecorder:
    def __init__(
        self,
        output_pipe: connection.Connection,
        input_device_index: Optional[int] = None,
    ):
        self.CHUNK: int = 1024
        self.FORMAT: int = pyaudio.paInt16
        self.CHANNELS: int = 1
        self.RATE: int = 44100
        self.RECORD_SECONDS: int = 1
        self.recording_process: Optional[Process] = None
        self.audio_chunks: Deque[bytes] = deque(maxlen=2)
        self.output_pipe: connection.Connection = output_pipe
        self.input_device_index: Optional[int] = input_device_index

    @staticmethod
    def list_microphones() -> List[Dict[str, Any]]:
        """List all available input devices with their properties"""
        p = pyaudio.PyAudio()
        devices = []

        for i in range(p.get_device_count()):
            device_info = p.get_device_info_by_index(i)
            if device_info["maxInputChannels"] > 0:  # Only input devices
                devices.append(device_info)

        p.terminate()
        return devices

    def create_wav_bytes(self, frames: List[bytes]) -> bytes:
        """Convert raw audio frames to WAV format in memory"""
        wav_buffer = io.BytesIO()
        with wave.open(wav_buffer, "wb") as wf:
            wf.setnchannels(self.CHANNELS)
            wf.setsampwidth(pyaudio.get_sample_size(self.FORMAT))
            wf.setframerate(self.RATE)
            wf.writeframes(b"".join(frames))
        return wav_buffer.getvalue()

    def record_audio(self) -> None:
        p = pyaudio.PyAudio()

        while True:
            stream = p.open(
                format=self.FORMAT,
                channels=self.CHANNELS,
                rate=self.RATE,
                input=True,
                input_device_index=self.input_device_index,
                frames_per_buffer=self.CHUNK,
            )

            frames: List[bytes] = []

            # Record for RECORD_SECONDS
            for _ in range(0, int(self.RATE / self.CHUNK * self.RECORD_SECONDS)):
                try:
                    data = stream.read(self.CHUNK, exception_on_overflow=False)
                    frames.append(data)
                except OSError as e:
                    print(f"Warning: Audio input overflow occurred: {e}")
                    continue

            stream.stop_stream()
            stream.close()

            # Convert to WAV format and add to rolling buffer
            wav_bytes = self.create_wav_bytes(frames)
            self.audio_chunks.append(wav_bytes)

            # Send chunks through pipe if we have enough data
            if len(self.audio_chunks) == 2:
                self.output_pipe.send(b"".join(self.audio_chunks))

    def start_recording(self) -> None:
        """Démarre l'enregistrement dans un processus séparé"""
        self.recording_process = Process(target=self.record_audio)
        self.recording_process.start()

    def stop_recording(self) -> None:
        """Arrête l'enregistrement"""
        if self.recording_process:
            self.recording_process.terminate()
            self.recording_process = None