|
import os, sys, re, json |
|
import argparse |
|
import shutil |
|
import warnings |
|
import whisper_timestamped as wt |
|
from pdb import set_trace as b |
|
from pprint import pprint as pp |
|
from profanity_check import predict, predict_prob |
|
from pydub import AudioSegment |
|
from pydub.playback import play |
|
from subprocess import Popen, PIPE |
|
|
|
def parse_args(): |
|
""" |
|
""" |
|
parser = argparse.ArgumentParser( |
|
description=('Tool to mute profanities in a song (source separation -> speech recognition -> profanity detection -> mask profanities -> re-mix)'), |
|
usage=('see <py main.py --help> or run as local web app with streamlit: <streamlit run main.py>') |
|
) |
|
|
|
parser.add_argument( |
|
'-i', |
|
'--input', |
|
default=None, |
|
nargs='?', |
|
|
|
help=("path to a mp3") |
|
) |
|
parser.add_argument( |
|
'-m', |
|
'--model', |
|
default='small', |
|
nargs='?', |
|
help=("model used by whisper for speech recognition: tiny, small (default) or medium") |
|
) |
|
parser.add_argument( |
|
'-p', |
|
'--play', |
|
default=False, |
|
action='store_true', |
|
help=("play output audio at the end") |
|
) |
|
parser.add_argument( |
|
'-v', |
|
'--verbose', |
|
default=True, |
|
action='store_true', |
|
help=("print transcribed text and detected profanities to screen") |
|
) |
|
return parser.parse_args() |
|
|
|
|
|
def main(args, input_file=None, model_size=None, verbose=False, play_output=False, skip_ss=False): |
|
""" |
|
""" |
|
if not input_file: |
|
input_file = args.input |
|
|
|
if not model_size: |
|
model_size = args.model |
|
|
|
if not verbose: |
|
verbose = args.verbose |
|
|
|
if not play_output: |
|
play_output = args.play |
|
|
|
|
|
if len(sys.argv)>1 and not os.path.isfile(input_file): |
|
print('Error: --input file not found') |
|
raise Exception |
|
|
|
print(f'\nProcessing input file: {input_file}') |
|
|
|
if not skip_ss: |
|
|
|
print('Running source separation') |
|
stems_dir = source_separation(input_file, use_demucs=False, use_spleeter=True) |
|
vocal_stem = os.path.join(stems_dir, 'vocals.wav') |
|
|
|
instr_stem = os.path.join(stems_dir, 'accompaniment.wav') |
|
print(f'Vocal stem written to: {vocal_stem}') |
|
else: |
|
vocal_stem = input_file |
|
instr_stem = None |
|
|
|
audio = wt.load_audio(vocal_stem) |
|
model = wt.load_model(model_size, device='cpu') |
|
text = wt.transcribe(model, audio, language='en') |
|
|
|
if verbose: |
|
print('\nTranscribed text:') |
|
print(text['text']+'\n') |
|
|
|
|
|
print('Run profanity detection on text') |
|
profanities = profanity_detection(text) |
|
if not profanities: |
|
print(f'No profanities found in {input_file} - exiting') |
|
return 'No profanities found', None, None |
|
|
|
if verbose: |
|
print('profanities found in text:') |
|
pp(profanities) |
|
|
|
|
|
print('Mask profanities in vocal stem') |
|
vocals = mask_profanities(vocal_stem, profanities) |
|
|
|
|
|
print('Merge instrumentals stem and masked vocals stem') |
|
if not skip_ss: |
|
mix = AudioSegment.from_wav(instr_stem).overlay(vocals) |
|
else: |
|
mix = vocals |
|
|
|
|
|
outpath = input_file.replace('.mp3', '_masked.mp3').replace('.wav', '_masked.wav') |
|
if input_file.endswith('.wav'): |
|
mix.export(outpath, format="wav") |
|
elif input_file.endswith('.mp3'): |
|
mix.export(outpath, format="mp3") |
|
print(f'Mixed file written to: {outpath}') |
|
|
|
|
|
if play_output: |
|
print('\nPlaying output...') |
|
play(mix) |
|
|
|
return outpath, vocal_stem, instr_stem |
|
|
|
|
|
def source_separation(inpath, use_demucs=False, use_spleeter=True): |
|
""" |
|
Execute shell command to run demucs and pipe stdout/stderr back to python |
|
""" |
|
infile = os.path.basename(inpath) |
|
|
|
if use_demucs: |
|
cmd = f'demucs --two-stems=vocals --jobs 8 "{inpath}"' |
|
|
|
elif use_spleeter: |
|
outdir = 'audio/separated' |
|
cmd = f'spleeter separate {inpath} -p spleeter:2stems -o {outdir}' |
|
stems_dir = os.path.join(outdir, os.path.splitext(infile)[0]) |
|
|
|
stdout, stderr = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True, executable='/bin/bash').communicate() |
|
stdout = stdout.decode('utf8') |
|
|
|
|
|
if stderr: |
|
stderr = stderr.decode('utf-8').lower() |
|
if 'error' in stderr or 'not exist' in stderr: |
|
print(stderr.decode('utf8').split('\n')[0]) |
|
raise Exception |
|
|
|
|
|
if not os.path.isdir(stems_dir): |
|
print(f'Error: output stem directory "{stems_dir}" not found') |
|
raise Exception |
|
|
|
return stems_dir |
|
|
|
|
|
def profanity_detection(text): |
|
""" |
|
""" |
|
|
|
profs = [] |
|
for segment in text['segments']: |
|
for word in segment['words']: |
|
|
|
|
|
text = word['text'].replace('.','').replace(',','').lower() |
|
|
|
|
|
if text in ['cancer','hell','junk','die','lame','freak','freaky','white','stink','shut','spit','mouth','orders','eat','clouds','ugly','dirty','wet']: |
|
continue |
|
|
|
|
|
if '**' in text: |
|
profs.append(word) |
|
continue |
|
|
|
|
|
if text in ['bitchy', 'puss']: |
|
profs.append(word) |
|
continue |
|
|
|
|
|
if predict([word['text']])[0]: |
|
profs.append(word) |
|
|
|
return profs |
|
|
|
|
|
def mask_profanities(vocal_stem, profanities): |
|
""" |
|
""" |
|
|
|
vocals = AudioSegment.from_wav(vocal_stem) |
|
for prof in profanities: |
|
mask = vocals[prof['start']*1000:prof['end']*1000] |
|
mask -= 50 |
|
|
|
|
|
start = vocals[:prof['start']*1000] |
|
end = vocals[prof['end']*1000:] |
|
|
|
vocals = start + mask + end |
|
|
|
return vocals |
|
|
|
|
|
if __name__ == "__main__": |
|
args = parse_args() |
|
|
|
if len(sys.argv)>1: |
|
main(args, skip_ss=False) |
|
else: |
|
import streamlit as st |
|
st.title('Saylss') |
|
with st.expander("About", expanded=False): |
|
st.markdown(''' |
|
This app processes an input audio track (.mp3 or .wav) with the purpose of identifying and muting profanities in the song. |
|
|
|
|
|
A larger model takes longer to run and is more accurate, and vice-versa. |
|
|
|
|
|
Simply select the model size and upload your file! |
|
''') |
|
model = st.selectbox('Choose model size:', ('tiny','small','medium'), index=1) |
|
|
|
uploaded_file = st.file_uploader( |
|
"Choose input track:", |
|
type=[".mp3",".wav"], |
|
accept_multiple_files=False, |
|
) |
|
|
|
if uploaded_file is not None: |
|
uploaded_file.name = uploaded_file.name.replace(' ','_') |
|
ext = os.path.splitext(uploaded_file.name)[1] |
|
if ext == '.wav': |
|
st_format = 'audio/wav' |
|
elif ext == '.mp3': |
|
st_format = 'audio/mp3' |
|
|
|
uploaded_file_content = uploaded_file.getvalue() |
|
with open(uploaded_file.name, 'wb') as f: |
|
f.write(uploaded_file_content) |
|
|
|
audio_bytes_input = uploaded_file_content |
|
st.audio(audio_bytes_input, format=st_format) |
|
|
|
|
|
with st.spinner('Processing input audio...'): |
|
inpath = os.path.abspath(uploaded_file.name) |
|
outpath, vocal_stem, instr_stem = main(args, input_file=inpath, model_size=model) |
|
|
|
if outpath == 'No profanities found': |
|
st.text(outpath + ' - Refresh the page and try a different song or model size') |
|
sys.exit() |
|
|
|
|
|
|
|
st.text('\nOutput:') |
|
audio_file = open(outpath, 'rb') |
|
audio_bytes = audio_file.read() |
|
st.audio(audio_bytes, format=st_format) |
|
|
|
|
|
if os.path.isfile(inpath): |
|
os.remove(inpath) |
|
if os.path.isfile(outpath): |
|
os.remove(outpath) |
|
if os.path.isfile(vocal_stem): |
|
os.remove(vocal_stem) |
|
if os.path.isfile(instr_stem): |
|
os.remove(instr_stem) |
|
sep_dir = os.path.split(instr_stem)[0] |
|
if os.path.isdir(sep_dir): |
|
os.rmdir(sep_dir) |