Spaces:
Running
on
A10G
Running
on
A10G
File size: 3,267 Bytes
6831f1f 726ec90 6831f1f 726ec90 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
from multiprocessing import Process, connection
from typing import Any, Dict, Optional, List, Deque
from collections import deque
import pyaudio
import io
import wave
class AudioRecorder:
def __init__(
self,
output_pipe: connection.Connection,
input_device_index: Optional[int] = None,
):
self.CHUNK: int = 1024
self.FORMAT: int = pyaudio.paInt16
self.CHANNELS: int = 1
self.RATE: int = 44100
self.RECORD_SECONDS: int = 1
self.recording_process: Optional[Process] = None
self.audio_chunks: Deque[bytes] = deque(maxlen=2)
self.output_pipe: connection.Connection = output_pipe
self.input_device_index: Optional[int] = input_device_index
@staticmethod
def list_microphones() -> List[Dict[str, Any]]:
"""List all available input devices with their properties"""
p = pyaudio.PyAudio()
devices = []
for i in range(p.get_device_count()):
device_info = p.get_device_info_by_index(i)
if device_info["maxInputChannels"] > 0: # Only input devices
devices.append(device_info)
p.terminate()
return devices
def create_wav_bytes(self, frames: List[bytes]) -> bytes:
"""Convert raw audio frames to WAV format in memory"""
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, "wb") as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(pyaudio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(b"".join(frames))
return wav_buffer.getvalue()
def record_audio(self) -> None:
p = pyaudio.PyAudio()
while True:
stream = p.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
input_device_index=self.input_device_index,
frames_per_buffer=self.CHUNK,
)
frames: List[bytes] = []
# Record for RECORD_SECONDS
for _ in range(0, int(self.RATE / self.CHUNK * self.RECORD_SECONDS)):
try:
data = stream.read(self.CHUNK, exception_on_overflow=False)
frames.append(data)
except OSError as e:
print(f"Warning: Audio input overflow occurred: {e}")
continue
stream.stop_stream()
stream.close()
# Convert to WAV format and add to rolling buffer
wav_bytes = self.create_wav_bytes(frames)
self.audio_chunks.append(wav_bytes)
# Send chunks through pipe if we have enough data
if len(self.audio_chunks) == 2:
self.output_pipe.send(b"".join(self.audio_chunks))
def start_recording(self) -> None:
"""Démarre l'enregistrement dans un processus séparé"""
self.recording_process = Process(target=self.record_audio)
self.recording_process.start()
def stop_recording(self) -> None:
"""Arrête l'enregistrement"""
if self.recording_process:
self.recording_process.terminate()
self.recording_process = None
|