Source code for pyseqlab.fo_crf

'''
@author: ahmed allam <ahmed.allam@yale.edu>

'''

from collections import OrderedDict
import numpy
from .linear_chain_crf import LCRFModelRepresentation, LCRF
from .utilities import FO_AStarSearcher, vectorized_logsumexp

[docs]class FirstOrderCRFModelRepresentation(LCRFModelRepresentation): """Model representation that will hold data structures to be used in :class:`FirstOrderCRF` class it includes all attributes in the :class:`LCRFModelRepresentation` parent class Attributes: Y_codebook_rev: reversed codebook (dictionary) of :attr:`Y_codebook` startstate_flag: boolean indicating if to use an edge/boundary state (i.e. __START__ state) """ def __init__(self): super().__init__() self.Y_codebook_rev = None self.startstate_flag = None
[docs] def setup_model(self, modelfeatures, states, L): """setup and create the model representation Creates all maps and codebooks needed by the :class:`FirstOrderCRF` class Args: modelfeatures: set of features defining the model states: set of states (i.e. tags) L: length of longest segment """ super().setup_model(modelfeatures, states, L)
[docs] def generate_instance_properties(self): """generate instance properties that will be later used by :class:`FirstOrderCRF` class """ super().generate_instance_properties() self.Y_codebook_rev = self.get_Y_codebook_reversed()
[docs] def get_modelstates_codebook(self, states): """create states codebook by mapping each state to a unique code/number Args: states: set of tags identified in training sequences Example:: states = {'B-PP', 'B-NP', ...} """ start_state = '__START__' if(start_state in states): del states[start_state] Y_codebook = {s:i+1 for (i, s) in enumerate(states)} Y_codebook[start_state] = 0 states[start_state] = 1 self.startstate_flag = True else: Y_codebook = {s:i for (i, s) in enumerate(states)} self.startstate_flag = False return(Y_codebook)
[docs] def get_Y_codebook_reversed(self): """generate reversed codebook of :attr:`Y_codebook` """ Y_codebook = self.Y_codebook return({code:state for state, code in Y_codebook.items()})
[docs]class FirstOrderCRF(LCRF): """first-order CRF model Args: model: an instance of :class:`FirstOrderCRFModelRepresentation` class seqs_representer: an instance of :class:`SeqsRepresenter` class seqs_info: dictionary holding sequences info Keyword Arguments: load_info_fromdisk: integer from 0 to 5 specifying number of cached data to be kept in memory. 0 means keep everything while 5 means load everything from disk Attributes: model: an instance of :class:`FirstOrderCRFModelRepresentation` class weights: a numpy vector representing feature weights seqs_representer: an instance of :class:`pyseqlab.feature_extraction.SeqsRepresenter` class seqs_info: dictionary holding sequences info beam_size: determines the size of the beam for state pruning fun_dict: a function map def_cached_entities: a list of the names of cached entities sorted (descending) based on estimated space required in memory """ def __init__(self, model, seqs_representer, seqs_info, load_info_fromdisk=5): super().__init__(model, seqs_representer, seqs_info, load_info_fromdisk)
[docs] def cached_entitites(self, load_info_fromdisk): """construct list of names of cached entities in memory """ def_cached_entities = super().cached_entitites(load_info_fromdisk) inmemory_info = ["alpha", "Z", "beta", "potentialmat_perboundary"] def_cached_entities += inmemory_info return(def_cached_entities)
# def compute_psi_potential(self, w, seq_id): # """ assumes that activefeatures_matrix has been already generated and saved in self.seqs_info dictionary """ # Y_codebook = self.model.Y_codebook # Z_lendict = self.model.Z_lendict # Z_elems = self.model.Z_elems # # T is the length of the sequence # T = self.seqs_info[seq_id]["T"] # # number of possible states including the __START__ and __STOP__ states # M = self.model.num_states # # get activefeatures_matrix # activefeatures = self.seqs_info[seq_id]["activefeatures"] # potential_matrix = numpy.zeros((T+1,M,M), dtype='longdouble') # for boundary in activefeatures: # t = boundary[0] # for y_patt in activefeatures[boundary]: # f_val = list(activefeatures[boundary][y_patt].values()) # w_indx = list(activefeatures[boundary][y_patt].keys()) # # potential = numpy.dot(w[w_indx], f_val) # if(Z_lendict[y_patt] == 1): # y_c = Z_elems[y_patt][0] # potential_matrix[t, :, Y_codebook[y_c]] += potential # else: # # case of len(parts) = 2 # y_p = Z_elems[y_patt][0] # y_c = Z_elems[y_patt][1] # potential_matrix[t, Y_codebook[y_p], Y_codebook[y_c]] += potential # # print("potential_matrix {}".format(potential_matrix)) # return(potential_matrix)
[docs] def compute_potential(self, w, active_features): """compute the potential matrix of active features in a specified boundary Args: w: weight vector (numpy vector) active_features: dictionary of activated features in a specified boundary """ model = self.model Y_codebook = model.Y_codebook Z_len = model.Z_len Z_elems = model.Z_elems # number of possible states including the __START__ and __STOP__ states M = model.num_states # get activefeatures_matrix potential_matrix = numpy.zeros((M,M), dtype='longdouble') for y_patt in active_features: w_indx, f_val = active_features[y_patt] potential = numpy.dot(w[w_indx], f_val) if(Z_len[y_patt] == 1): y_c = Z_elems[y_patt][0] potential_matrix[:, Y_codebook[y_c]] += potential else: # case of len(parts) = 2 y_p = Z_elems[y_patt][0] y_c = Z_elems[y_patt][1] potential_matrix[Y_codebook[y_p], Y_codebook[y_c]] += potential # print("potential_matrix {}".format(potential_matrix)) return(potential_matrix)
[docs] def compute_forward_vec(self, w, seq_id): """compute the forward matrix (alpha matrix) Args: w: weight vector (numpy vector) seq_id: integer representing unique id assigned to the sequence .. note:: activefeatures need to be loaded first in :attr:`seqs.info` """ model = self.model # T is the length of the sequence T = self.seqs_info[seq_id]["T"] # number of possible states including the __START__ and __STOP__ states M = model.num_states startstate_flag = model.startstate_flag active_features = self.seqs_info[seq_id]['activefeatures'] potentialmat_perboundary = {} alpha = numpy.ones((T+1, M), dtype='longdouble') * (-numpy.inf) if(startstate_flag): alpha[0,0] = 0 # corner case at t = 1 t = 1; i = 0 boundary = (t,t) potential_matrix = self.compute_potential(w, active_features[boundary]) potentialmat_perboundary[boundary] = potential_matrix alpha[t, :] = potential_matrix[i, :] for t in range(1, T): boundary = (t+1, t+1) potential_matrix = self.compute_potential(w, active_features[boundary]) potentialmat_perboundary[boundary] = potential_matrix for j in range(M): alpha[t+1, j] = vectorized_logsumexp(alpha[t, :] + potential_matrix[:, j]) self.seqs_info[seq_id]['potentialmat_perboundary'] = potentialmat_perboundary return(alpha)
[docs] def compute_backward_vec(self, w, seq_id): """compute the backward matrix (beta matrix) Args: w: weight vector (numpy vector) seq_id: integer representing unique id assigned to the sequence .. note:: potential matrix per boundary dictionary should be available in :attr:`seqs.info` """ # length of the sequence without the appended states __START__ and __STOP__ T = self.seqs_info[seq_id]["T"] # number of possible states including the __START__ and __STOP__ states M = self.model.num_states beta = numpy.ones((T+1, M), dtype = 'longdouble') * (-numpy.inf) beta[T, :] = 0 # get the potential matrix potentialmat_perboundary = self.seqs_info[seq_id]["potentialmat_perboundary"] for t in reversed(range(1, T+1)): potential_matrix = potentialmat_perboundary[t,t] for i in range(M): beta[t-1, i] = vectorized_logsumexp(potential_matrix[i, :] + beta[t, :]) return(beta)
[docs] def compute_marginals(self, seq_id): """ compute the marginal (i.e. probability of each y pattern at each position) Args: seq_id: integer representing unique id assigned to the sequence .. note:: - potential matrix per boundary dictionary should be available in :attr:`seqs.info` - alpha matrix should be available in :attr:`seqs.info` - beta matrix should be available in :attr:`seqs.info` - Z (i.e. P(x)) should be available in :attr:`seqs.info` """ model = self.model Y_codebook = model.Y_codebook Z_codebook = model.Z_codebook Z_len = model.Z_len Z_elems = model.Z_elems T = self.seqs_info[seq_id]["T"] alpha = self.seqs_info[seq_id]["alpha"] beta = self.seqs_info[seq_id]["beta"] Z = self.seqs_info[seq_id]["Z"] potentialmat_perboundary = self.seqs_info[seq_id]["potentialmat_perboundary"] P_marginals = numpy.zeros((T+1, len(Z_codebook)), dtype='longdouble') for j in range(1, T+1): potential_matrix = potentialmat_perboundary[j, j] for y_patt in Z_codebook: # print("y_patt {}".format(y_patt)) if(Z_len[y_patt] == 1): y_c = Y_codebook[Z_elems[y_patt][0]] accumulator = alpha[j, y_c] + beta[j, y_c] - Z else: # case of len(parts) = 2 y_b = Y_codebook[Z_elems[y_patt][0]] y_c = Y_codebook[Z_elems[y_patt][1]] accumulator = alpha[j-1, y_b] + potential_matrix[y_b, y_c] + beta[j, y_c] - Z P_marginals[j, Z_codebook[y_patt]] = numpy.exp(accumulator) return(P_marginals)
[docs] def compute_feature_expectation(self, seq_id, P_marginals, grad): """compute the features expectations (i.e. expected count of the feature based on learned model) Args: seq_id: integer representing unique id assigned to the sequence P_marginals: probability matrix for y patterns at each position in time grad: numpy vector with dimension equal to the weight vector. It represents the gradient that will be computed using the feature expectation and the global features of the sequence .. note:: - activefeatures (per boundary) dictionary should be available in :attr:`seqs.info` - P_marginal (marginal probability matrix) should be available in :attr:`seqs.info` """ activefeatures = self.seqs_info[seq_id]["activefeatures"] Z_codebook = self.model.Z_codebook for boundary, features_dict in activefeatures.items(): t = boundary[0] for z_patt in features_dict: w_indx, f_val = features_dict[z_patt] grad[w_indx] += f_val * P_marginals[t, Z_codebook[z_patt]]
[docs] def prune_states(self, j, score_mat, beam_size): """prune states that fall off the specified beam size Args: j: current position (integer) in the sequence score_mat: score matrix beam_size: specified size of the beam (integer) """ Y_codebook_rev = self.model.Y_codebook_rev # using argpartition as better alternative to argsort indx_partitioned_y = numpy.argpartition(-score_mat[j, :], beam_size) # identify top-k states/pi indx_topk_y = indx_partitioned_y[:beam_size] # # identify states falling out of the beam indx_falling_y = indx_partitioned_y[beam_size:] # remove the effect of states/pi falling out of the beam score_mat[j, indx_falling_y] = -numpy.inf # get topk states topk_y = {Y_codebook_rev[indx] for indx in indx_topk_y} return(topk_y)
[docs] def viterbi(self, w, seq_id, beam_size, stop_off_beam = False, y_ref=[], K=1): """decode sequences using viterbi decoder Args: w: weight vector (numpy vector) seq_id: integer representing unique id assigned to the sequence beam_size: integer representing the size of the beam Keyword Arguments: stop_off_beam: boolean indicating if to stop when the reference state \ falls off the beam (used in perceptron/search based learning) y_ref: reference sequence list of labels (used while learning) K: integer indicating number of decoded sequences required (i.e. top-k list) """ model = self.model # number of possible states M = model.num_states T = self.seqs_info[seq_id]['T'] Y_codebook_rev = model.Y_codebook_rev Y_codebook = model.Y_codebook score_mat = numpy.ones((T+1, M), dtype='longdouble') * -numpy.inf score_mat[0,0] = 0 # back pointer to hold the index of the state that achieved highest score while decoding backpointer = numpy.ones((T+1, M), dtype='int') * (-1) viol_index = [] if(beam_size == M): # case of exact search and decoding l = {} l['activefeatures'] = (seq_id, ) self.check_cached_info(seq_id, l) active_features = self.seqs_info[seq_id]['activefeatures'] # corner case at t = 1 t = 1; i = 0 potential_matrix = self.compute_potential(w, active_features[t,t]) score_mat[t, :] = potential_matrix[i, :] backpointer[t, :] = 0 for t in range(2, T+1): potential_matrix = self.compute_potential(w, active_features[t,t]) for j in range(M): vec = score_mat[t-1, :] + potential_matrix[:, j] score_mat[t, j] = numpy.max(vec) backpointer[t, j] = numpy.argmax(vec) else: # case of inexact search and decoding l = {} l['seg_features'] = (seq_id, ) self.check_cached_info(seq_id, l) accum_activestates = {} for t in range(1, T+1): boundary = (t, t) active_features = self.identify_activefeatures(seq_id, boundary, accum_activestates) potential_matrix = self.compute_potential(w, active_features) for j in range(M): vec = score_mat[t-1, :] + potential_matrix[:, j] score_mat[t, j] = numpy.max(vec) backpointer[t, j] = numpy.argmax(vec) topk_states = self.prune_states(t, score_mat, beam_size) # update tracked active states -- to consider renaming it accum_activestates[t,t] = accum_activestates[t,t].intersection(topk_states) #^print('score_mat[{},:] = {} '.format(j, score_mat[j,:])) #^print("topk_states ", topk_states) if(y_ref): if(y_ref[t-1] not in topk_states): viol_index.append(t) if(stop_off_beam): T = t break if(K == 1): # decoding the sequence y_c_T = numpy.argmax(score_mat[T:]) Y_decoded = [y_c_T] counter = 0 for t in reversed(range(2, T+1)): Y_decoded.append(backpointer[t, Y_decoded[counter]]) counter += 1 Y_decoded.reverse() Y_decoded = [Y_codebook_rev[y_code] for y_code in Y_decoded] return(Y_decoded, viol_index) else: asearcher = FO_AStarSearcher(Y_codebook_rev) topK = asearcher.search(score_mat, backpointer, T, K) return(topK, viol_index)
[docs] def perstate_posterior_decoding(self, w, seq_id): """decode sequences using posterior probability (per state) decoder Args: w: weight vector (numpy vector) seq_id: integer representing unique id assigned to the sequence """ Y_codebook_rev = self.model.Y_codebook_rev # get alpha, beta and Z l = OrderedDict() l['activefeatures'] = (seq_id, ) l['alpha'] = (w, seq_id) l['beta'] = (w, seq_id) self.check_cached_info(seq_id, l) alpha = self.seqs_info[seq_id]["alpha"] beta = self.seqs_info[seq_id]["beta"] Z = self.seqs_info[seq_id]["Z"] # print("alpha \n {}".format(alpha)) # print("beta \n {}".format(beta)) score_mat = alpha + beta - Z # print("score mat is \n {}".format(score_mat)) # remove the corner cases t=0 and t=T+1 score_mat_ = score_mat[:,1:-1] max_indices = list(numpy.argmax(score_mat_, axis = 0)) # print("max indices \n {}".format(max_indices)) Y_decoded = max_indices Y_decoded = [Y_codebook_rev[y_code] for y_code in Y_decoded] return(Y_decoded)
[docs] def validate_forward_backward_pass(self, w, seq_id): """check the validity of the forward backward pass Args: w: weight vector (numpy vector) seq_id: integer representing unique id assigned to the sequence """ self.clear_cached_info([seq_id]) # this will compute alpha and beta matrices and save them in seqs_info dict l = OrderedDict() l['activefeatures'] = (seq_id, ) l['alpha'] = (w, seq_id) l['beta'] = (w, seq_id) self.check_cached_info(seq_id, l) alpha = self.seqs_info[seq_id]["alpha"] beta = self.seqs_info[seq_id]["beta"] Z_alpha = vectorized_logsumexp(alpha[-1,:]) Z_beta = numpy.max(beta[0, :]) raw_diff = numpy.abs(Z_alpha - Z_beta) print("alpha[-1,:] = {}".format(alpha[-1,:])) print("beta[0,:] = {}".format(beta[0,:])) print("Z_alpha : {}".format(Z_alpha)) print("Z_beta : {}".format(Z_beta)) print("Z_aplha - Z_beta {}".format(raw_diff)) rel_diff = raw_diff/(Z_alpha + Z_beta) print("rel_diff : {}".format(rel_diff)) self.clear_cached_info([seq_id]) #print("seqs_info {}".format(self.seqs_info)) return((raw_diff, rel_diff))
if __name__ == "__main__": pass