Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
# encoding: utf-8 | |
# The MIT License (MIT) | |
# Copyright (c) 2014-2016 CNRS | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
# AUTHORS | |
# Hervé BREDIN - http://herve.niderb.fr | |
from __future__ import unicode_literals | |
import six.moves | |
import numpy as np | |
import itertools | |
VITERBI_CONSTRAINT_NONE = 0 | |
VITERBI_CONSTRAINT_FORBIDDEN = 1 | |
VITERBI_CONSTRAINT_MANDATORY = 2 | |
LOG_ZERO = np.log(1e-200) | |
# handling 'consecutive' constraints is achieved by duplicating states | |
# the following functions are here to help in this process | |
# create new transition prob. matrix accounting for duplicated states. | |
def _update_transition(transition, consecutive): | |
# initialize with LOG_ZERO everywhere | |
# except on the +1 diagonal np.log(1) | |
new_n_states = np.sum(consecutive) | |
new_transition = LOG_ZERO * np.ones((new_n_states, new_n_states)) | |
for i in range(1, new_n_states): | |
new_transition[i - 1, i] = np.log(1) | |
n_states = len(consecutive) | |
boundary = np.hstack(([0], np.cumsum(consecutive))) | |
start = boundary[:-1] | |
end = boundary[1:] - 1 | |
for i, j in itertools.product(six.moves.range(n_states), repeat=2): | |
new_transition[end[i], start[j]] = transition[i, j] | |
return new_transition | |
# create new initial prob. matrix accounting for duplicated states. | |
def _update_initial(initial, consecutive): | |
new_n_states = np.sum(consecutive) | |
new_initial = LOG_ZERO * np.ones((new_n_states, )) | |
n_states = len(consecutive) | |
boundary = np.hstack(([0], np.cumsum(consecutive))) | |
start = boundary[:-1] | |
for i in range(n_states): | |
new_initial[start[i]] = initial[i] | |
return new_initial | |
# create new emission prob. matrix accounting for duplicated states. | |
def _update_emission(emission, consecutive): | |
return np.vstack( | |
np.tile(e, (c, 1)) # duplicate emission probabilities c times | |
for e, c in six.moves.zip(emission.T, consecutive) | |
).T | |
# create new constraint matrix accounting for duplicated states | |
def _update_constraint(constraint, consecutive): | |
return np.vstack( | |
np.tile(e, (c, 1)) # duplicate constraint probabilities c times | |
for e, c in six.moves.zip(constraint.T, consecutive) | |
).T | |
# convert sequence of duplicated states back to sequence of original states. | |
def _update_states(states, consecutive): | |
boundary = np.hstack(([0], np.cumsum(consecutive))) | |
start = boundary[:-1] | |
end = boundary[1:] | |
new_states = np.empty(states.shape) | |
for i, (s, e) in enumerate(six.moves.zip(start, end)): | |
new_states[np.where((s <= states) & (states < e))] = i | |
return new_states | |
def viterbi_decoding(emission, transition, | |
initial=None, consecutive=None, constraint=None): | |
"""(Constrained) Viterbi decoding | |
Parameters | |
---------- | |
emission : array of shape (n_samples, n_states) | |
E[t, i] is the emission log-probabilities of sample t at state i. | |
transition : array of shape (n_states, n_states) | |
T[i, j] is the transition log-probabilities from state i to state j. | |
initial : optional, array of shape (n_states, ) | |
I[i] is the initial log-probabilities of state i. | |
Defaults to equal log-probabilities. | |
consecutive : optional, int or int array of shape (n_states, ) | |
C[i] is a the minimum-consecutive-states constraint for state i. | |
C[i] = 1 is equivalent to no constraint (default). | |
constraint : optional, array of shape (n_samples, n_states) | |
K[t, i] = 1 forbids state i at time t. | |
K[t, i] = 2 forces state i at time t. | |
Use K[t, i] = 0 for no constraint (default). | |
Returns | |
------- | |
states : array of shape (n_samples, ) | |
Most probable state sequence | |
""" | |
# ~~ INITIALIZATION ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
T, k = emission.shape # number of observations x number of states | |
# no minimum-consecutive-states constraints | |
if consecutive is None: | |
consecutive = np.ones((k, ), dtype=int) | |
# same value for all states | |
elif isinstance(consecutive, int): | |
consecutive = consecutive * np.ones((k, ), dtype=int) | |
# (potentially) different values per state | |
else: | |
consecutive = np.array(consecutive, dtype=int).reshape((k, )) | |
# at least one sample | |
consecutive = np.maximum(1, consecutive) | |
# balance initial probabilities when they are not provided | |
if initial is None: | |
initial = np.log(np.ones((k, )) / k) | |
# no constraint? | |
if constraint is None: | |
constraint = VITERBI_CONSTRAINT_NONE * np.ones((T, k)) | |
# artificially create new states to account for 'consecutive' constraints | |
emission = _update_emission(emission, consecutive) | |
transition = _update_transition(transition, consecutive) | |
initial = _update_initial(initial, consecutive) | |
constraint = _update_constraint(constraint, consecutive) | |
T, K = emission.shape # number of observations x number of new states | |
states = np.arange(K) # states 0 to K-1 | |
# set emission probability to zero for forbidden states | |
emission[ | |
np.where(constraint == VITERBI_CONSTRAINT_FORBIDDEN)] = LOG_ZERO | |
# set emission probability to zero for all states but the mandatory one | |
for t, k in six.moves.zip( | |
*np.where(constraint == VITERBI_CONSTRAINT_MANDATORY) | |
): | |
emission[t, states != k] = LOG_ZERO | |
# ~~ FORWARD PASS ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
V = np.empty((T, K)) # V[t, k] is the probability of the | |
V[0, :] = emission[0, :] + initial # most probable state sequence for the | |
# first t observations that has k as | |
# its final state. | |
P = np.empty((T, K), dtype=int) # P[t, k] remembers which state was used | |
P[0, :] = states # to get from time t-1 to time t at | |
# state k | |
for t in range(1, T): | |
# tmp[k, k'] is the probability of the most probable path | |
# leading to state k at time t - 1, plus the probability of | |
# transitioning from state k to state k' (at time t) | |
tmp = (V[t - 1, :] + transition.T).T | |
# optimal path to state k at t comes from state P[t, k] at t - 1 | |
# (find among all possible states at this time t) | |
P[t, :] = np.argmax(tmp, axis=0) | |
# update V for time t | |
V[t, :] = emission[t, :] + tmp[P[t, :], states] | |
# ~~ BACK-TRACKING ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
X = np.empty((T,), dtype=int) | |
X[-1] = np.argmax(V[-1, :]) | |
for t in range(1, T): | |
X[-(t + 1)] = P[-t, X[-t]] | |
# ~~ CONVERT BACK TO ORIGINAL STATES | |
return _update_states(X, consecutive) | |