|
from pydub import AudioSegment |
|
from tqdm import tqdm |
|
from .utils import run_command |
|
from .logging_setup import logger |
|
import numpy as np |
|
|
|
|
|
class Mixer: |
|
def __init__(self): |
|
self.parts = [] |
|
|
|
def __len__(self): |
|
parts = self._sync() |
|
seg = parts[0][1] |
|
frame_count = max(offset + seg.frame_count() for offset, seg in parts) |
|
return int(1000.0 * frame_count / seg.frame_rate) |
|
|
|
def overlay(self, sound, position=0): |
|
self.parts.append((position, sound)) |
|
return self |
|
|
|
def _sync(self): |
|
positions, segs = zip(*self.parts) |
|
|
|
frame_rate = segs[0].frame_rate |
|
array_type = segs[0].array_type |
|
|
|
offsets = [int(frame_rate * pos / 1000.0) for pos in positions] |
|
segs = AudioSegment.empty()._sync(*segs) |
|
return list(zip(offsets, segs)) |
|
|
|
def append(self, sound): |
|
self.overlay(sound, position=len(self)) |
|
|
|
def to_audio_segment(self): |
|
parts = self._sync() |
|
seg = parts[0][1] |
|
channels = seg.channels |
|
|
|
frame_count = max(offset + seg.frame_count() for offset, seg in parts) |
|
sample_count = int(frame_count * seg.channels) |
|
|
|
output = np.zeros(sample_count, dtype="int32") |
|
for offset, seg in parts: |
|
sample_offset = offset * channels |
|
samples = np.frombuffer(seg.get_array_of_samples(), dtype="int32") |
|
samples = np.int16(samples/np.max(np.abs(samples)) * 32767) |
|
start = sample_offset |
|
end = start + len(samples) |
|
output[start:end] += samples |
|
|
|
return seg._spawn( |
|
output, overrides={"sample_width": 4}).normalize(headroom=0.0) |
|
|
|
|
|
def create_translated_audio( |
|
result_diarize, audio_files, final_file, concat=False, avoid_overlap=False, |
|
): |
|
total_duration = result_diarize["segments"][-1]["end"] |
|
|
|
if concat: |
|
""" |
|
file .\audio\1.ogg |
|
file .\audio\2.ogg |
|
file .\audio\3.ogg |
|
file .\audio\4.ogg |
|
... |
|
""" |
|
|
|
|
|
with open("list.txt", "w") as file: |
|
for i, audio_file in enumerate(audio_files): |
|
if i == len(audio_files) - 1: |
|
file.write(f"file {audio_file}") |
|
else: |
|
file.write(f"file {audio_file}\n") |
|
|
|
|
|
command = ( |
|
f"ffmpeg -f concat -safe 0 -i list.txt -c:a pcm_s16le {final_file}" |
|
) |
|
run_command(command) |
|
|
|
else: |
|
|
|
base_audio = AudioSegment.silent( |
|
duration=int(total_duration * 1000), frame_rate=41000 |
|
) |
|
combined_audio = Mixer() |
|
combined_audio.overlay(base_audio) |
|
|
|
logger.debug( |
|
f"Audio duration: {total_duration // 60} " |
|
f"minutes and {int(total_duration % 60)} seconds" |
|
) |
|
|
|
last_end_time = 0 |
|
previous_speaker = "" |
|
for line, audio_file in tqdm( |
|
zip(result_diarize["segments"], audio_files) |
|
): |
|
start = float(line["start"]) |
|
|
|
|
|
try: |
|
audio = AudioSegment.from_file(audio_file) |
|
|
|
|
|
if avoid_overlap: |
|
speaker = line["speaker"] |
|
if (last_end_time - 0.500) > start: |
|
overlap_time = last_end_time - start |
|
if previous_speaker and previous_speaker != speaker: |
|
start = (last_end_time - 0.500) |
|
else: |
|
start = (last_end_time - 0.200) |
|
if overlap_time > 2.5: |
|
start = start - 0.3 |
|
logger.info( |
|
f"Avoid overlap for {str(audio_file)} " |
|
f"with {str(start)}" |
|
) |
|
|
|
previous_speaker = speaker |
|
|
|
duration_tts_seconds = len(audio) / 1000.0 |
|
last_end_time = (start + duration_tts_seconds) |
|
|
|
start_time = start * 1000 |
|
combined_audio = combined_audio.overlay( |
|
audio, position=start_time |
|
) |
|
except Exception as error: |
|
logger.debug(str(error)) |
|
logger.error(f"Error audio file {audio_file}") |
|
|
|
|
|
combined_audio_data = combined_audio.to_audio_segment() |
|
combined_audio_data.export( |
|
final_file, format="wav" |
|
) |
|
|