|
import os |
|
import math |
|
import mido |
|
import pumpp |
|
import librosa |
|
import numpy as np |
|
import pandas as pd |
|
from scipy.ndimage import gaussian_filter1d |
|
from cqfe_models import mask_voas_cnn_v2_model, late_deep_cnn_model |
|
|
|
SATB_THRESHOLDS = [0.23, 0.17, 0.15, 0.17] |
|
|
|
|
|
|
|
freqscale = librosa.cqt_frequencies(n_bins=360, fmin=32.7, bins_per_octave=60) |
|
|
|
def bin_to_freq(bin): |
|
return freqscale[bin] |
|
|
|
vec_bin_to_freq = np.vectorize(bin_to_freq) |
|
|
|
|
|
|
|
def downsample_bins(voice): |
|
voice_0 = np.array(voice.T[0::5]).T |
|
voice_1 = np.array(voice.T[1::5]).T |
|
voice_2 = np.array(voice.T[2::5]).T |
|
voice_3 = np.array(voice.T[3::5]).T |
|
voice_4 = np.array(voice.T[4::5]).T |
|
|
|
voice_0 = voice_0.T[1:70].T |
|
voice_1 = voice_1.T[1:70].T |
|
voice_2 = voice_2.T[1:70].T |
|
voice_3 = voice_3.T[0:69].T |
|
voice_4 = voice_4.T[0:69].T |
|
|
|
voice_sums = voice_0 + voice_1 + voice_2 + voice_3 + voice_4 |
|
voice_argmax = np.argmax(voice_sums, axis=1) |
|
threshold = np.zeros(voice_sums.shape) |
|
threshold[np.arange(voice_argmax.size), voice_argmax] = 1 |
|
threshold[:, 0] = 0 |
|
voice_sums = threshold |
|
|
|
return voice_sums |
|
|
|
|
|
|
|
def bin_matrix_to_freq(matrix): |
|
s_freqs = vec_bin_to_freq(np.argmax(matrix[0], axis=0)).reshape(-1, 1) |
|
a_freqs = vec_bin_to_freq(np.argmax(matrix[1], axis=0)).reshape(-1, 1) |
|
t_freqs = vec_bin_to_freq(np.argmax(matrix[2], axis=0)).reshape(-1, 1) |
|
b_freqs = vec_bin_to_freq(np.argmax(matrix[3], axis=0)).reshape(-1, 1) |
|
|
|
freqs = np.concatenate((s_freqs, a_freqs, t_freqs, b_freqs), axis=1).T |
|
return freqs |
|
|
|
|
|
|
|
def create_midi(freq, write_path='./midi_track.mid', ticks_per_beat=58, |
|
tempo=90, save_to_file=True, program=53, channel=0): |
|
|
|
def freq_to_list(freq): |
|
|
|
T = freq.shape[0] |
|
|
|
midi_freqs = np.round(69 + 12*np.log2(freq/440)).squeeze().astype('int') |
|
t_last = 0 |
|
pitch_tm1 = 20 |
|
list_event = [] |
|
for t in range(T): |
|
pitch_t = midi_freqs[t] |
|
if (pitch_t != pitch_tm1): |
|
velocity = 127 |
|
if(pitch_t == 24): |
|
pitch_t = 0 |
|
velocity = 0 |
|
t_event = t - t_last |
|
t_last = t |
|
list_event.append((pitch_tm1, 0, t_event)) |
|
list_event.append((pitch_t, velocity, 0)) |
|
pitch_tm1 = pitch_t |
|
list_event.append((pitch_tm1, 0, T - t_last)) |
|
return list_event |
|
|
|
microseconds_per_beat = mido.bpm2tempo(tempo) |
|
|
|
mid = mido.MidiFile() |
|
mid.ticks_per_beat = ticks_per_beat |
|
|
|
|
|
|
|
track = mid.add_track("Voice Aah") |
|
|
|
events = freq_to_list(freq) |
|
|
|
|
|
track.append(mido.MetaMessage('set_tempo', tempo=microseconds_per_beat)) |
|
track.append(mido.MetaMessage('channel_prefix', channel=channel)) |
|
|
|
|
|
track.append(mido.Message('program_change', program=program, channel=channel)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
notes_on_list = [] |
|
|
|
for event in events: |
|
pitch, velocity, time = event |
|
if velocity == 0: |
|
|
|
track.append(mido.Message('note_off', note=pitch, velocity=0, time=time, channel=channel)) |
|
if(pitch in notes_on_list): |
|
notes_on_list.remove(pitch) |
|
else: |
|
if pitch in notes_on_list: |
|
track.append(mido.Message('note_off', note=pitch, velocity=0, time=time, channel=channel)) |
|
notes_on_list.remove(pitch) |
|
time = 0 |
|
track.append(mido.Message('note_on', note=pitch, velocity=velocity, time=time, channel=channel)) |
|
notes_on_list.append(pitch) |
|
if save_to_file: |
|
mid.save(write_path) |
|
return mid |
|
|
|
|
|
|
|
def song_to_midi(sop, alto, ten, bass): |
|
|
|
savepath = './output.mid' |
|
|
|
bin_matrix = np.array([sop, alto, ten, bass]) |
|
freq_matrix = bin_matrix_to_freq(bin_matrix) |
|
|
|
mid_sop = create_midi(freq_matrix[0], save_to_file=False, program=52, channel=0) |
|
mid_alto = create_midi(freq_matrix[1], save_to_file=False, program=52, channel=1) |
|
mid_ten = create_midi(freq_matrix[2], save_to_file=False, program=42, channel=2) |
|
mid_bass = create_midi(freq_matrix[3], save_to_file=False, program=52, channel=3) |
|
|
|
mid_mix = mido.MidiFile() |
|
mid_mix.ticks_per_beat=mid_sop.ticks_per_beat |
|
mid_mix.tracks = mid_sop.tracks + mid_alto.tracks + mid_ten.tracks + mid_bass.tracks |
|
mid_mix.save(savepath) |
|
|
|
return savepath |
|
|
|
|
|
|
|
def song_to_dataframe(sop, alto, ten, bass): |
|
|
|
timescale = np.arange(0, 0.011609977 * (sop.shape[1]), 0.011609977)[:sop.shape[1]] |
|
|
|
s_argmax = vec_bin_to_freq(np.argmax(sop, axis=0)) |
|
a_argmax = vec_bin_to_freq(np.argmax(alto, axis=0)) |
|
t_argmax = vec_bin_to_freq(np.argmax(ten, axis=0)) |
|
b_argmax = vec_bin_to_freq(np.argmax(bass, axis=0)) |
|
|
|
data = np.array([timescale, s_argmax, a_argmax, t_argmax, b_argmax], dtype=np.float32).T |
|
columns = ['Timestep', 'Soprano', 'Alto', 'Tenor', 'Bass'] |
|
|
|
df = pd.DataFrame(data, columns=columns) |
|
|
|
return df |
|
|
|
|
|
|
|
def prediction_postproc(input_array, argmax_and_threshold=True, |
|
gaussian_blur=True, |
|
threshold_value=0): |
|
prediction = np.moveaxis(input_array, 0, 1).reshape(360, -1) |
|
thres_reference = deepcopy(prediction) |
|
if(argmax_and_threshold): |
|
prediction = np.argmax(prediction, axis=0) |
|
prediction = np.array([prediction[i] if thres_reference[prediction[i], i] >= threshold_value else 0 for i in np.arange(prediction.size)]) |
|
threshold = np.zeros((360, prediction.shape[0])) |
|
threshold[prediction, np.arange(prediction.size)] = 1 |
|
prediction = threshold |
|
if(gaussian_blur): |
|
prediction = np.array(gaussian_filter1d(prediction, 1, axis=0, mode='wrap')) |
|
prediction = (prediction - np.min(prediction))/(np.max(prediction)-np.min(prediction)) |
|
return prediction |
|
|
|
|
|
|
|
def get_hcqt_params(): |
|
|
|
bins_per_octave = 60 |
|
n_octaves = 6 |
|
over_sample = 5 |
|
harmonics = [1, 2, 3, 4, 5] |
|
sr = 22050 |
|
fmin = 32.7 |
|
hop_length = 256 |
|
|
|
return bins_per_octave, n_octaves, harmonics, sr, fmin, hop_length, over_sample |
|
|
|
|
|
|
|
def create_pump_object(): |
|
|
|
(bins_per_octave, n_octaves, harmonics, |
|
sr, f_min, hop_length, over_sample) = get_hcqt_params() |
|
|
|
p_phdif = pumpp.feature.HCQTPhaseDiff(name='dphase', sr=sr, hop_length=hop_length, |
|
fmin=f_min, n_octaves=n_octaves, over_sample=over_sample, harmonics=harmonics, log=True) |
|
|
|
pump = pumpp.Pump(p_phdif) |
|
|
|
return pump |
|
|
|
|
|
|
|
def compute_pump_features(pump, audio_fpath): |
|
|
|
data = pump(audio_f=audio_fpath) |
|
|
|
return data |
|
|
|
|
|
|
|
def get_mpe_prediction(model, audio_file=None): |
|
"""Generate output from a model given an input numpy file. |
|
Part of this function is part of deepsalience |
|
""" |
|
|
|
split_value = 4000 |
|
|
|
if audio_file is not None: |
|
|
|
pump = create_pump_object() |
|
features = compute_pump_features(pump, audio_file) |
|
input_hcqt = features['dphase/mag'][0] |
|
input_dphase = features['dphase/dphase'][0] |
|
|
|
else: |
|
raise ValueError("One audio_file must be specified") |
|
|
|
input_hcqt = input_hcqt.transpose(1, 2, 0)[np.newaxis, :, :, :] |
|
input_dphase = input_dphase.transpose(1, 2, 0)[np.newaxis, :, :, :] |
|
|
|
n_t = input_hcqt.shape[3] |
|
t_slices = list(np.arange(0, n_t, split_value)) |
|
output_list = [] |
|
|
|
for t in t_slices: |
|
p = model.predict([np.transpose(input_hcqt[:, :, :, t:t+split_value], (0, 1, 3, 2)), |
|
np.transpose(input_dphase[:, :, :, t:t+split_value], (0, 1, 3, 2))] |
|
)[0, :, :] |
|
|
|
output_list.append(p) |
|
|
|
predicted_output = np.hstack(output_list).astype(np.float32) |
|
|
|
return predicted_output |
|
|
|
|
|
|
|
def get_va_prediction(model, f0_matrix): |
|
splits = f0_matrix.shape[1]//256 |
|
splits_diff = 256 - (f0_matrix.shape[1] - splits * 256) |
|
fill = np.zeros((360, splits_diff)) |
|
mix_filled = np.concatenate((np.copy(f0_matrix), fill), axis=1) |
|
mix_filled = np.reshape(mix_filled, (360, -1, 256, 1)).transpose((1, 0, 2, 3)) |
|
batches = math.ceil(mix_filled.shape[0]/24) |
|
|
|
s_pred_result = np.zeros((0, 360, 256)) |
|
a_pred_result = np.zeros((0, 360, 256)) |
|
t_pred_result = np.zeros((0, 360, 256)) |
|
b_pred_result = np.zeros((0, 360, 256)) |
|
|
|
for i in range(batches): |
|
s_pred, a_pred, t_pred, b_pred = model.predict(mix_filled[i*24:(i+1)*24]) |
|
s_pred_result = np.append(s_pred_result, s_pred, axis=0) |
|
a_pred_result = np.append(a_pred_result, a_pred, axis=0) |
|
t_pred_result = np.append(t_pred_result, t_pred, axis=0) |
|
b_pred_result = np.append(b_pred_result, b_pred, axis=0) |
|
|
|
s_pred_result = prediction_postproc(s_pred_result, threshold_value=SATB_THRESHOLDS[0])[:, :f0_matrix.shape[1]] |
|
a_pred_result = prediction_postproc(a_pred_result, threshold_value=SATB_THRESHOLDS[1])[:, :f0_matrix.shape[1]] |
|
t_pred_result = prediction_postproc(t_pred_result, threshold_value=SATB_THRESHOLDS[2])[:, :f0_matrix.shape[1]] |
|
b_pred_result = prediction_postproc(b_pred_result, threshold_value=SATB_THRESHOLDS[3])[:, :f0_matrix.shape[1]] |
|
|
|
return s_pred_result, a_pred_result, t_pred_result, b_pred_result |
|
|
|
|
|
|
|
def cqfe(audiofile, mpe=late_deep_cnn_model(), va=mask_voas_cnn_v2_model()): |
|
|
|
savepath_csv = './output.csv' |
|
savepath_hdf5 = './output.hdf5' |
|
|
|
mpe_pred = get_mpe_prediction(mpe, audiofile) |
|
s_pred, a_pred, t_pred, b_pred = get_va_prediction(va, mpe_pred) |
|
|
|
output_midi = song_to_midi(s_pred, a_pred, t_pred, b_pred) |
|
|
|
output_df = song_to_dataframe(s_pred, a_pred, t_pred, b_pred) |
|
output_df.to_csv(savepath_csv, mode='w', header=True) |
|
output_df.to_hdf(savepath_hdf5, key='F0', mode='w', complevel=9, complib='blosc', append=False, format='table') |
|
ax1 = output_df.plot.scatter(x='Timestep', y='Bass', s=1, color='#2f29e3', label='Bass') |
|
ax2 = output_df.plot.scatter(x='Timestep', y='Tenor', s=1, color='#e36129', label='Tenor', ax=ax1) |
|
ax3 = output_df.plot.scatter(x='Timestep', y='Alto', s=1, color='#29e35a', label='Alto', ax=ax1) |
|
ax4 = output_df.plot.scatter(x='Timestep', y='Soprano', s=1, color='#d3d921', label='Soprano', ax=ax1) |
|
ax1.set_xlabel('Time (s)') |
|
ax1.set_ylabel('Freq (Hz)') |
|
fig = ax1.get_figure() |
|
fig.set_dpi(150) |
|
|
|
return [output_midi, savepath_csv, savepath_hdf5], fig |
|
|
|
|
|
|