Source code for pyseqlab.ho_crf_ad

'''
@author: ahmed allam <ahmed.allam@yale.edu>

'''


import numpy
from .hosemi_crf_ad import HOSemiCRFADModelRepresentation, HOSemiCRFAD
from .utilities import HO_AStarSearcher, vectorized_logsumexp

[docs]class HOCRFADModelRepresentation(HOSemiCRFADModelRepresentation): """Model representation that will hold data structures to be used in :class:`HOCRFAD` class it includes all attributes in the :class:`HOSemiCRFADModelRepresentation` parent class """ def __init__(self): # call super class super().__init__()
[docs] def filter_activated_states(self, activated_states, accum_active_states, boundary): """filter/prune states and y features Args: activaed_states: dictionary containing possible active states/y features it has the form {patt_len:{patt_1, patt_2, ...}} accum_active_states: dictionary of only possible active states by position it has the form {pos_1:{state_1, state_2, ...}} boundary: tuple (u,v) representing the current boundary in the sequence """ Z_elems = self.Z_elems filtered_activestates = {} __, pos = boundary for z_len in activated_states: if(z_len == 1): continue start_pos = pos - z_len + 1 if((start_pos, start_pos) in accum_active_states): filtered_activestates[z_len] = set() for z_patt in activated_states[z_len]: check = True zelems = Z_elems[z_patt] for i in range(z_len): pos_bound = (start_pos+i, start_pos+i) if(pos_bound not in accum_active_states): check = False break if(zelems[i] not in accum_active_states[pos_bound]): check = False break if(check): filtered_activestates[z_len].add(z_patt) return(filtered_activestates)
[docs]class HOCRFAD(HOSemiCRFAD): """higher-order CRF model that uses algorithmic differentiation in gradient computation Args: model: an instance of :class:`HOCRFADModelRepresentation` class seqs_representer: an instance of :class:`SeqsRepresenter` class seqs_info: dictionary holding sequences info Keyword Arguments: load_info_fromdisk: integer from 0 to 5 specifying number of cached data to be kept in memory. 0 means keep everything while 5 means load everything from disk Attributes: model: an instance of :class:`HOCRFADModelRepresentation` class weights: a numpy vector representing feature weights seqs_representer: an instance of :class:`pyseqlab.feature_extraction.SeqsRepresenter` class seqs_info: dictionary holding sequences info beam_size: determines the size of the beam for state pruning fun_dict: a function map def_cached_entities: a list of the names of cached entities sorted (descending) based on estimated space required in memory """ def __init__(self, model, seqs_representer, seqs_info, load_info_fromdisk = 5): super().__init__(model, seqs_representer, seqs_info, load_info_fromdisk)
[docs] def compute_fpotential(self, w, active_features): """compute the potential of active features in a specified boundary Args: w: weight vector (numpy vector) active_features: dictionary of activated features in a specified boundary """ model = self.model pky_codebook = model.pky_codebook z_pky_map = model.z_pky_map f_potential = numpy.zeros(len(pky_codebook)) # to consider caching the w_indx and fval as in cached_pf for z in active_features: w_indx, f_val = active_features[z] potential = numpy.dot(w[w_indx], f_val) # get all pky's in coded format where z maintains a suffix relation with them pky_c_list = z_pky_map[z] f_potential[pky_c_list] += potential return(f_potential)
[docs] def compute_forward_vec(self, w, seq_id): """compute the forward matrix (alpha matrix) Args: w: weight vector (numpy vector) seq_id: integer representing unique id assigned to the sequence .. note:: activefeatures need to be loaded first in :attr:`seqs.info` """ model = self.model pi_pky_map = model.pi_pky_map P_codebook = model.P_codebook P_len = model.P_len T = self.seqs_info[seq_id]["T"] active_features = self.seqs_info[seq_id]['activefeatures'] alpha = numpy.ones((T+1,len(P_codebook)), dtype='longdouble') * (-numpy.inf) alpha[0,P_codebook[""]] = 0 fpotential_perboundary = {} for j in range(1, T+1): boundary = (j, j) # compute f_potential f_potential = self.compute_fpotential(w, active_features[boundary]) fpotential_perboundary[boundary] = f_potential for pi in pi_pky_map: pi_c = P_codebook[pi] if(j >= P_len[pi]): pky_c_list, pk_c_list = pi_pky_map[pi] vec = f_potential[pky_c_list] + alpha[j-1, pk_c_list] alpha[j, pi_c] = vectorized_logsumexp(vec) self.seqs_info[seq_id]['fpotential'] = fpotential_perboundary return(alpha)
[docs] def compute_backward_vec(self, w, seq_id): """compute the backward matrix (beta matrix) Args: w: weight vector (numpy vector) seq_id: integer representing unique id assigned to the sequence .. note:: fpotential per boundary dictionary should be available in :attr:`seqs.info` """ model = self.model pi_pky_map = model.pi_pky_map P_codebook = model.P_codebook len_P = len(P_codebook) T = self.seqs_info[seq_id]["T"] fpotential_perboundary = self.seqs_info[seq_id]['fpotential'] beta = numpy.ones((T+2, len(P_codebook)), dtype='longdouble') * (-numpy.inf) beta[T+1, :] = 0 for j in reversed(range(1, T+1)): track_comp = numpy.ones((len_P, len_P), dtype='longdouble') * (-numpy.inf) f_potential = fpotential_perboundary[j, j] for pi in pi_pky_map: pi_c = P_codebook[pi] pky_c_list, pk_c_list = pi_pky_map[pi] vec = f_potential[pky_c_list] + beta[j+1, pi_c] track_comp[pk_c_list, pi_c] = vec for p_c in P_codebook.values(): beta[j, p_c] = vectorized_logsumexp(track_comp[p_c, :]) return(beta)
[docs] def compute_marginals(self, seq_id): """ compute the marginal (i.e. probability of each y pattern at each position) Args: seq_id: integer representing unique id assigned to the sequence .. note:: - fpotential per boundary dictionary should be available in :attr:`seqs.info` - alpha matrix should be available in :attr:`seqs.info` - beta matrix should be available in :attr:`seqs.info` - Z (i.e. P(x)) should be available in :attr:`seqs.info` """ model = self.model Z_codebook = model.Z_codebook z_pi_piy = model.z_pi_piy_map T = self.seqs_info[seq_id]["T"] L = self.model.L alpha = self.seqs_info[seq_id]["alpha"] beta = self.seqs_info[seq_id]["beta"] Z = self.seqs_info[seq_id]["Z"] fpotential_perboundary = self.seqs_info[seq_id]['fpotential'] P_marginals = numpy.zeros((T+1, len(model.Z_codebook)), dtype='longdouble') for j in range(1, T+1): for d in range(L): u = j v = j + d if(v > T): break boundary = (u, v) f_potential = fpotential_perboundary[boundary] for z in Z_codebook: pi_c, piy_c, pk_c = z_pi_piy[z] numerator = alpha[u-1, pi_c] + f_potential[piy_c] + beta[v+1, pk_c] P_marginals[j, Z_codebook[z]] = numpy.exp(vectorized_logsumexp(numerator) - Z) return(P_marginals)
# def compute_marginals(self, w, seq_id): # model = self.model # P_codebook = model.P_codebook # len_P = len(P_codebook) # pi_z_pk = model.pi_z_pk # Z_codebook = model.Z_codebook # T = self.seqs_info[seq_id]["T"] # alpha = self.seqs_info[seq_id]['alpha'] # beta = self.seqs_info[seq_id]['beta'] # P_marginals = numpy.zeros((T+1, len(model.Z_codebook)), dtype='longdouble') # print("alpha ", alpha) # Z = self.seqs_info[seq_id]['Z'] # print("Z ", Z) # fpotential_perboundary = self.seqs_info[seq_id]['fpotential'] # print(pi_z_pk) # print(P_codebook) # f_transition = model.f_transition # pky_z = model.z_pky # pky_codebook = model.pky_codebook # print("pky_z ", pky_z) # for j in reversed(range(1, T+1)): # marginal_dict = {} # f_potential = fpotential_perboundary[j, j] # for pi in f_transition: # beta_pi = beta[j+1, P_codebook[pi]] # for pky in f_transition[pi]: # pk, y = f_transition[pi][pky] # accum = alpha[j-1, P_codebook[pk]] + f_potential[pky_codebook[pky]] + beta_pi # for z_patt in pky_z[pky]: # if(z_patt in marginal_dict): # marginal_dict[z_patt] = numpy.logaddexp(marginal_dict[z_patt], accum) # else: # marginal_dict[z_patt] = accum # print("j ", j) # print("marginal ", marginal_dict) # for z_patt in marginal_dict: # P_marginals[j, Z_codebook[z_patt]] = numpy.exp(marginal_dict[z_patt]-Z) # self.seqs_info[seq_id]['P_marginal'] = P_marginals # print(P_marginals) # return(P_marginals)
[docs] def compute_feature_expectation(self, seq_id, P_marginals, grad): """compute the features expectations (i.e. expected count of the feature based on learned model) Args: seq_id: integer representing unique id assigned to the sequence P_marginals: probability matrix for y patterns at each position in time grad: numpy vector with dimension equal to the weight vector. It represents the gradient that will be computed using the feature expectation and the global features of the sequence .. note:: - activefeatures (per boundary) dictionary should be available in :attr:`seqs.info` - P_marginal (marginal probability matrix) should be available in :attr:`seqs.info` """ activefeatures = self.seqs_info[seq_id]["activefeatures"] Z_codebook = self.model.Z_codebook for boundary, features_dict in activefeatures.items(): u, __ = boundary for z_patt in features_dict: w_indx, f_val = features_dict[z_patt] grad[w_indx] += f_val * P_marginals[u, Z_codebook[z_patt]]
[docs] def prune_states(self, j, delta, beam_size): """prune states that fall off the specified beam size Args: j: current position (integer) in the sequence delta: score matrix beam_size: specified size of the beam (integer) """ P_codebook_rev = self.model.P_codebook_rev P_elems = self.model.P_elems # pi_lendict = self.model.pi_lendict # # sort the pi in descending order of their score # indx_sorted_pi = numpy.argsort(delta[j,:])[::-1] # # identify states falling out of the beam # indx_falling_pi = indx_sorted_pi[beam_size:] # # identify top-k states/pi # indx_topk_pi = indx_sorted_pi[:beam_size] # # remove the effect of states/pi falling out of the beam # delta[j, indx_falling_pi] = -numpy.inf # using argpartition as better alternative to argsort indx_partitioned_pi = numpy.argpartition(-delta[j, :], beam_size) # identify top-k states/pi indx_topk_pi = indx_partitioned_pi[:beam_size] # # identify states falling out of the beam # indx_falling_pi = indx_partitioned_pi[beam_size:] # # remove the effect of states/pi falling out of the beam # delta[j, indx_falling_pi] = -numpy.inf # get topk states topk_pi = {P_codebook_rev[indx] for indx in indx_topk_pi} topk_states = set() for pi in topk_pi: topk_states.add(P_elems[pi][-1]) return(topk_states)
[docs] def viterbi(self, w, seq_id, beam_size, stop_off_beam = False, y_ref=[], K=1): """decode sequences using viterbi decoder Args: w: weight vector (numpy vector) seq_id: integer representing unique id assigned to the sequence beam_size: integer representing the size of the beam Keyword Arguments: stop_off_beam: boolean indicating if to stop when the reference state \ falls off the beam (used in perceptron/search based learning) y_ref: reference sequence list of labels (used while learning) K: integer indicating number of decoded sequences required (i.e. top-k list) """ model = self.model P_elems = model.P_elems pi_pky_map = model.pi_pky_map P_codebook = model.P_codebook P_codebook_rev = model.P_codebook_rev len_P = len(P_codebook) P_len = model.P_len num_states = model.num_states T = self.seqs_info[seq_id]["T"] # records max score at every time step delta = numpy.ones((T+1,len_P), dtype='longdouble') * (-numpy.inf) # the score for the empty sequence at time 0 is 1 delta[0, P_codebook[""]] = 0 back_track = {} # records where violation occurs -- it is 1-based indexing viol_index = [] if(beam_size == num_states): # case of exact search and decoding l = {} l['activefeatures'] = (seq_id, ) self.check_cached_info(seq_id, l) active_features = self.seqs_info[seq_id]['activefeatures'] for j in range(1, T+1): boundary = (j, j) # vector of size len(pky) f_potential = self.compute_fpotential(w, active_features[boundary]) #^print("f_potential ", f_potential) for pi in pi_pky_map: pi_c = P_codebook[pi] pky_c_list, pk_c_list = pi_pky_map[pi] vec = f_potential[pky_c_list] + delta[j-1, pk_c_list] delta[j, pi_c] = numpy.max(vec) #print("max chosen ", delta[j, P_codebook[pi]]) argmax_ind = numpy.argmax(vec) #print("argmax chosen ", argmax_ind) pk_c_max = pk_c_list[argmax_ind] pk = P_codebook_rev[pk_c_max] y = P_elems[pk][-1] back_track[j, pi_c] = (pk_c_max, y) else: # case of inexact search and decoding l = {} l['seg_features'] = (seq_id, ) self.check_cached_info(seq_id, l) # tracks active states by boundary accum_activestates = {} for j in range(1, T+1): boundary = (j, j) active_features = self.identify_activefeatures(seq_id, boundary, accum_activestates) # vector of size len(pky) f_potential = self.compute_fpotential(w, active_features) #^print("f_potential ", f_potential) for pi in pi_pky_map: pi_c = P_codebook[pi] pky_c_list, pk_c_list = pi_pky_map[pi] vec = f_potential[pky_c_list] + delta[j-1, pk_c_list] delta[j, pi_c] = numpy.max(vec) #print("max chosen ", delta[j, P_codebook[pi]]) argmax_ind = numpy.argmax(vec) #print("argmax chosen ", argmax_ind) pk_c_max = pk_c_list[argmax_ind] pk = P_codebook_rev[pk_c_max] y = P_elems[pk][-1] back_track[j, pi_c] = (pk_c_max, y) topk_states = self.prune_states(j, delta, beam_size) # update tracked active states -- to consider renaming it accum_activestates[boundary] = accum_activestates[boundary].intersection(topk_states) #^print('delta[{},:] = {} '.format(j, delta[j,:])) #^print("topk_states ", topk_states) if(y_ref): if(y_ref[j-1] not in topk_states): viol_index.append(j) if(stop_off_beam): T = j break if(K == 1): # decoding the sequence Y_decoded = [] p_T_c = numpy.argmax(delta[T,:]) p_T = P_codebook_rev[p_T_c] y_T = P_elems[p_T][-1] Y_decoded.append((p_T_c,y_T)) t = T - 1 while t>0: p_tplus1_c = Y_decoded[-1][0] p_t_c, y_t = back_track[(t+1, p_tplus1_c)] Y_decoded.append((p_t_c, y_t)) t -= 1 Y_decoded.reverse() Y_decoded = [yt for __,yt in Y_decoded] # print("Y_decoded {}".format(Y_decoded)) # print('delta ', delta) # print('backtrack ', back_track) # print("P_codebook ", P_codebook) return(Y_decoded, viol_index) else: asearcher = HO_AStarSearcher(P_codebook_rev, P_elems) topK = asearcher.search(delta, back_track, T, K) # print('topk ', topK) return(topK, viol_index)
if __name__ == "__main__": pass