Xornotor's picture
Update: CSV, HDF5 and MIDI outputs
9c5c602
raw
history blame
10.6 kB
import os
import math
import mido
import pumpp
import librosa
import numpy as np
import pandas as pd
from scipy.ndimage import gaussian_filter1d
from cqfe_models import mask_voas_cnn_v2_model, late_deep_cnn_model
############################################################
freqscale = librosa.cqt_frequencies(n_bins=360, fmin=32.7, bins_per_octave=60)
def bin_to_freq(bin):
return freqscale[bin]
vec_bin_to_freq = np.vectorize(bin_to_freq)
############################################################
def downsample_bins(voice):
voice_0 = np.array(voice.T[0::5]).T
voice_1 = np.array(voice.T[1::5]).T
voice_2 = np.array(voice.T[2::5]).T
voice_3 = np.array(voice.T[3::5]).T
voice_4 = np.array(voice.T[4::5]).T
voice_0 = voice_0.T[1:70].T
voice_1 = voice_1.T[1:70].T
voice_2 = voice_2.T[1:70].T
voice_3 = voice_3.T[0:69].T
voice_4 = voice_4.T[0:69].T
voice_sums = voice_0 + voice_1 + voice_2 + voice_3 + voice_4
voice_argmax = np.argmax(voice_sums, axis=1)
threshold = np.zeros(voice_sums.shape)
threshold[np.arange(voice_argmax.size), voice_argmax] = 1
threshold[:, 0] = 0
voice_sums = threshold
return voice_sums
############################################################
def create_midi(pr, write_path='./midi_track.mid', ticks_per_beat=58,
tempo=90, save_to_file=True, program=53, channel=0):
def pr_to_list(pr):
# List event = (pitch, velocity, time)
T, N = pr.shape
t_last = 0
pr_tm1 = np.zeros(N)
list_event = []
for t in range(T):
pr_t = pr[t]
mask = (pr_t != pr_tm1)
if(N == 360): range_step = 5
else: range_step = 1
if (mask).any():
for n in range(0, N):
if mask[n]:
if(N <= 72):
pitch = 25 + n
else:
pitch = 24 + round(n/5)
if int(pr_t[n] * 127) >= 50:
velocity = 127
else:
velocity = 0
# Time is incremented since last event
t_event = t - t_last
t_last = t
list_event.append((pitch, velocity, t_event))
pr_tm1 = pr_t
list_event.append((0, 0, T - t_last))
return list_event
# Tempo
microseconds_per_beat = mido.bpm2tempo(tempo)
# Write a pianoroll in a midi file
mid = mido.MidiFile()
mid.ticks_per_beat = ticks_per_beat
# Add a new track with the instrument name to the midi file
track = mid.add_track("Voice Aah")
# transform the matrix in a list of (pitch, velocity, time)
events = pr_to_list(pr)
#print(events)
# Tempo
track.append(mido.MetaMessage('set_tempo', tempo=microseconds_per_beat))
track.append(mido.MetaMessage('channel_prefix', channel=channel))
# Add the program_change
#Choir Aahs = 53, Voice Oohs (or Doos) = 54, Synch Choir = 55
track.append(mido.Message('program_change', program=program, channel=channel))
# This list is required to shut down
# notes that are on, intensity modified, then off only 1 time
# Example :
# (60,20,0)
# (60,40,10)
# (60,0,15)
notes_on_list = []
# Write events in the midi file
for event in events:
pitch, velocity, time = event
if velocity == 0:
# Get the channel
track.append(mido.Message('note_off', note=pitch, velocity=0, time=time, channel=channel))
if(pitch in notes_on_list):
notes_on_list.remove(pitch)
else:
if pitch in notes_on_list:
track.append(mido.Message('note_off', note=pitch, velocity=0, time=time, channel=channel))
notes_on_list.remove(pitch)
time = 0
track.append(mido.Message('note_on', note=pitch, velocity=velocity, time=time, channel=channel))
notes_on_list.append(pitch)
if save_to_file:
mid.save(write_path)
return mid
############################################################
def song_to_midi(sop, alto, ten, bass):
savepath = './output.mid'
down_sop = downsample_bins(sop.T)
down_alto = downsample_bins(alto.T)
down_ten = downsample_bins(ten.T)
down_bass = downsample_bins(bass.T)
mid_sop = create_midi(down_sop, save_to_file=False, program=52, channel=0)
mid_alto = create_midi(down_alto, save_to_file=False, program=53, channel=1)
mid_ten = create_midi(down_ten, save_to_file=False, program=49, channel=2)
mid_bass = create_midi(down_bass, save_to_file=False, program=50, channel=3)
mid_mix = mido.MidiFile()
mid_mix.ticks_per_beat = mid_sop.ticks_per_beat
mid_mix.tracks = mid_sop.tracks + mid_alto.tracks + mid_ten.tracks + mid_bass.tracks
mid_mix.save(savepath)
return savepath
############################################################
def song_to_tables(sop, alto, ten, bass):
savepath_csv = './output.csv'
savepath_hdf5 = './output.hdf5'
timescale = np.arange(0, 0.011609977 * (sop.shape[1]), 0.011609977)[:sop.shape[1]]
s_argmax = vec_bin_to_freq(np.argmax(sop, axis=0))
a_argmax = vec_bin_to_freq(np.argmax(alto, axis=0))
t_argmax = vec_bin_to_freq(np.argmax(ten, axis=0))
b_argmax = vec_bin_to_freq(np.argmax(bass, axis=0))
data = np.array([timescale, s_argmax, a_argmax, t_argmax, b_argmax], dtype=np.float32).T
columns = ['Timestep', 'Soprano', 'Alto', 'Tenor', 'Bass']
df = pd.DataFrame(data, columns=columns)
df.to_csv(savepath_csv, mode='w', header=True)
df.to_hdf(savepath_hdf5, key='F0', mode='w', complevel=9, complib='blosc', append=False, format='table')
return savepath_csv, savepath_hdf5
############################################################
def prediction_postproc(input_array, argmax_and_threshold=True, gaussian_blur=True):
prediction = np.moveaxis(input_array, 0, 1).reshape(360, -1)
if(argmax_and_threshold):
prediction = np.argmax(prediction, axis=0)
prediction = np.array([i if i <= 357 else 0 for i in prediction])
threshold = np.zeros((360, prediction.shape[0]))
threshold[prediction, np.arange(prediction.size)] = 1
prediction = threshold
if(gaussian_blur):
prediction = np.array(gaussian_filter1d(prediction, 1, axis=0, mode='wrap'))
prediction = (prediction - np.min(prediction))/(np.max(prediction)-np.min(prediction))
return prediction
############################################################
def get_hcqt_params():
bins_per_octave = 60
n_octaves = 6
over_sample = 5
harmonics = [1, 2, 3, 4, 5]
sr = 22050
fmin = 32.7
hop_length = 256
return bins_per_octave, n_octaves, harmonics, sr, fmin, hop_length, over_sample
############################################################
def create_pump_object():
(bins_per_octave, n_octaves, harmonics,
sr, f_min, hop_length, over_sample) = get_hcqt_params()
p_phdif = pumpp.feature.HCQTPhaseDiff(name='dphase', sr=sr, hop_length=hop_length,
fmin=f_min, n_octaves=n_octaves, over_sample=over_sample, harmonics=harmonics, log=True)
pump = pumpp.Pump(p_phdif)
return pump
############################################################
def compute_pump_features(pump, audio_fpath):
data = pump(audio_f=audio_fpath)
return data
############################################################
def get_mpe_prediction(model, audio_file=None):
"""Generate output from a model given an input numpy file.
Part of this function is part of deepsalience
"""
split_value = 4000
if audio_file is not None:
pump = create_pump_object()
features = compute_pump_features(pump, audio_file)
input_hcqt = features['dphase/mag'][0]
input_dphase = features['dphase/dphase'][0]
else:
raise ValueError("One audio_file must be specified")
input_hcqt = input_hcqt.transpose(1, 2, 0)[np.newaxis, :, :, :]
input_dphase = input_dphase.transpose(1, 2, 0)[np.newaxis, :, :, :]
n_t = input_hcqt.shape[3]
t_slices = list(np.arange(0, n_t, split_value))
output_list = []
for t in t_slices:
p = model.predict([np.transpose(input_hcqt[:, :, :, t:t+split_value], (0, 1, 3, 2)),
np.transpose(input_dphase[:, :, :, t:t+split_value], (0, 1, 3, 2))]
)[0, :, :]
output_list.append(p)
predicted_output = np.hstack(output_list).astype(np.float32)
return predicted_output
############################################################
def get_va_prediction(model, f0_matrix):
splits = f0_matrix.shape[1]//256
splits_diff = 256 - (f0_matrix.shape[1] - splits * 256)
fill = np.zeros((360, splits_diff))
mix_filled = np.concatenate((np.copy(f0_matrix), fill), axis=1)
mix_filled = np.reshape(mix_filled, (360, -1, 256, 1)).transpose((1, 0, 2, 3))
batches = math.ceil(mix_filled.shape[0]/24)
s_pred_result = np.zeros((0, 360, 256))
a_pred_result = np.zeros((0, 360, 256))
t_pred_result = np.zeros((0, 360, 256))
b_pred_result = np.zeros((0, 360, 256))
for i in range(batches):
s_pred, a_pred, t_pred, b_pred = model.predict(mix_filled[i*24:(i+1)*24])
s_pred_result = np.append(s_pred_result, s_pred, axis=0)
a_pred_result = np.append(a_pred_result, a_pred, axis=0)
t_pred_result = np.append(t_pred_result, t_pred, axis=0)
b_pred_result = np.append(b_pred_result, b_pred, axis=0)
s_pred_result = prediction_postproc(s_pred_result)[:, :f0_matrix.shape[1]]
a_pred_result = prediction_postproc(a_pred_result)[:, :f0_matrix.shape[1]]
t_pred_result = prediction_postproc(t_pred_result)[:, :f0_matrix.shape[1]]
b_pred_result = prediction_postproc(b_pred_result)[:, :f0_matrix.shape[1]]
return s_pred_result, a_pred_result, t_pred_result, b_pred_result
############################################################
def cqfe(audiofile, mpe=late_deep_cnn_model(), va=mask_voas_cnn_v2_model()):
mpe_pred = get_mpe_prediction(mpe, audiofile)
s_pred, a_pred, t_pred, b_pred = get_va_prediction(va, mpe_pred)
output_midi = song_to_midi(s_pred, a_pred, t_pred, b_pred)
output_csv, output_hdf5 = song_to_tables(s_pred, a_pred, t_pred, b_pred)
return [output_midi, output_csv, output_hdf5]
############################################################