Source code for pyseqlab.ho_crf

'''
@author: ahmed allam <ahmed.allam@yale.edu>

'''


from collections import OrderedDict
import numpy
from .ho_crf_ad import  HOCRFADModelRepresentation, HOCRFAD
from .utilities import vectorized_logsumexp

[docs]class HOCRFModelRepresentation(HOCRFADModelRepresentation): """Model representation that will hold data structures to be used in :class:`HOCRF` class """ def __init__(self): super().__init__() self.S_codebook = None self.S_len = None self.S_numchar = None self.b_transition = None self.ysk_codebook = None self.si_ysk_map = None self.z_ysk_map = None
[docs] def setup_model(self, modelfeatures, states, L): """setup and create the model representation Creates all maps and codebooks needed by the :class:`HOSemiCRFAD` class Args: modelfeatures: set of features defining the model states: set of states (i.e. tags) L: length of longest segment """ super().setup_model(modelfeatures, states, L)
[docs] def generate_instance_properties(self): """generate instance properties that will be later used by :class:`HOSemiCRFAD` class """ super().generate_instance_properties() self.S_codebook = self.get_backward_states() self.S_len, self.S_numchar = self.get_S_info() self.b_transition = self.get_backward_transitions() self.ysk_codebook = self.get_ysk_codebook() self.si_ysk_map= self.get_si_ysk_map() self.z_ysk_map = self.map_z_ysk()
[docs] def get_backward_states(self): Y_codebook = self.Y_codebook Z_elems = self.Z_elems Z_len = self.Z_len S = {} for z_patt in Z_elems: elems = Z_elems[z_patt] z_len = Z_len[z_patt] #print("z_patt") for i in range(1, z_len): S["|".join(elems[i:])] = 1 #print("i = {}".format(i)) #print("suffix {}".format("|".join(elems[i:]))) for y in Y_codebook: S[y] = 1 # empty element S[""] = 1 S_codebook = {s:i for (i, s) in enumerate(S)} #print("S_codebook ", S_codebook) return(S_codebook)
[docs] def get_S_info(self): S_codebook = self.S_codebook S_len = {} S_numchar = {} for si in S_codebook: if(si == ""): S_len[si] = 0 S_numchar[si] = 0 else: S_len[si] = len(si.split("|")) S_numchar[si] = len(si) return(S_len, S_numchar)
[docs] def get_backward_transitions(self): Y_codebook = self.Y_codebook S_codebook = self.S_codebook S_numchar = self.S_numchar sk_y = {} for s in S_codebook: for y in Y_codebook: sk_y[(s, y)] = 1 sk_y_prefix = {} for s in S_codebook: # if(s != ""): len_s = S_numchar[s] for (sk, y) in sk_y: ref_str = y + "|" + sk #check prefix relation check = ref_str[:len_s] == s #check = self.check_prefix(s, ref_str) if(check): if((sk, y) in sk_y_prefix): sk_y_prefix[(sk, y)].append(s) else: sk_y_prefix[(sk, y)] = [s] sk_y_prefix = self.keep_longest_elems(sk_y_prefix) b_transition = {} for (sk, y), si in sk_y_prefix.items(): if(sk == ""): elmkey = y else: elmkey = y + "|" + sk if(si in b_transition): b_transition[si][elmkey] = sk else: b_transition[si] = {elmkey:sk} return(b_transition)
[docs] def get_ysk_codebook(self): b_transition = self.b_transition ysk_codebook = {} counter = 0 for si in b_transition: for ysk in b_transition[si]: ysk_codebook[ysk] = counter counter += 1 return(ysk_codebook)
[docs] def map_z_ysk(self): Z_codebook = self.Z_codebook Z_numchar = self.Z_numchar ysk_codebook = self.ysk_codebook z_ysk = {} for ysk in ysk_codebook: for z in Z_codebook: len_z = Z_numchar[z] #check prefix relation check = ysk[:len_z] == z if(check): ysk_c = ysk_codebook[ysk] if(z in z_ysk): z_ysk[z].append(ysk_c) else: z_ysk[z] = [ysk_c] return(z_ysk)
[docs] def get_si_ysk_map(self): b_transition = self.b_transition ysk_codebook = self.ysk_codebook S_codebook = self.S_codebook si_ysk_map = {} for si in b_transition: si_ysk_map[si] = ([],[]) for ysk, sk in b_transition[si].items(): si_ysk_map[si][0].append(ysk_codebook[ysk]) si_ysk_map[si][1].append(S_codebook[sk]) return(si_ysk_map)
[docs]class HOCRF(HOCRFAD): """higher-order CRF model - currently it supports *only* search-based training methods such as `COLLINS-PERCEPTRON` or `SAPO` - it implements the model discussed in: https://papers.nips.cc/paper/3815-conditional-random-fields-with-high-order-features-for-sequence-labeling.pdf """ def __init__(self, model, seqs_representer, seqs_info, load_info_fromdisk = 5): super().__init__(model, seqs_representer, seqs_info, load_info_fromdisk)
[docs] def compute_bpotential(self, w, active_features): model = self.model ysk_codebook = model.ysk_codebook z_ysk = model.z_ysk_map b_potential = numpy.zeros(len(ysk_codebook)) # to consider caching the w_indx and fval as in cached_pf for z in active_features: w_indx, f_val = active_features[z] potential = numpy.dot(w[w_indx], f_val) # get all ysk's in coded format where z maintains a prefix relation with them ysk_c_list = z_ysk[z] b_potential[ysk_c_list] += potential return(b_potential)
[docs] def compute_backward_vec(self, w, seq_id): model = self.model si_ysk_map = model.si_ysk_map S_codebook = model.S_codebook ysk_codebook = model.ysk_codebook patts_len = model.patts_len Z_len = model.Z_len T = self.seqs_info[seq_id]["T"] activefeatures_perboundary = self.seqs_info[seq_id]['activefeatures'] beta = numpy.ones((T+2, len(S_codebook)), dtype='longdouble') * (-numpy.inf) beta[T+1, S_codebook[""]] = 0 for j in reversed(range(1, T+1)): for si in si_ysk_map: b_potential = numpy.zeros(len(ysk_codebook)) si_c = S_codebook[si] for z_len in patts_len: b = j + z_len - 1 if(b <= T): boundary = (b, b) active_features = activefeatures_perboundary[boundary] features = {z:active_features[z] for z in active_features if Z_len[z] == z_len} # compute b_potential vector b_potential += self.compute_bpotential(w, features) ysk_list_c, sk_list_c = si_ysk_map[si] vec = b_potential[ysk_list_c] + beta[j+1, sk_list_c] beta[j, si_c] = vectorized_logsumexp(vec) return(beta)
[docs] def compute_seq_gradient(self, w, seq_id, grad): """sequence gradient computation .. warning:: the :class:`HOCRF` currently **does not support** gradient based training. Use search based training methods such as `COLLINS-PERCEPTRON` or `SAPO` this class is used for demonstration of the computation of the backward matrix using suffix relation as outlined in: https://papers.nips.cc/paper/3815-conditional-random-fields-with-high-order-features-for-sequence-labeling.pdf """ try: raise("this model can only be trained using search-based methods. Use HOCRFAD if you want gradient based training") except Exception: print("this model can only be trained using search-based methods. Use HOCRFAD if you want gradient based training")
[docs] def validate_forward_backward_pass(self, w, seq_id): self.clear_cached_info([seq_id]) # this will compute alpha and beta matrices and save them in seqs_info dict l = OrderedDict() l['activefeatures'] = (seq_id, ) l['alpha'] = (w, seq_id) l['beta'] = (w, seq_id) self.check_cached_info(seq_id, l) alpha = self.seqs_info[seq_id]["alpha"] beta = self.seqs_info[seq_id]["beta"] Z_alpha = vectorized_logsumexp(alpha[-1,:]) Z_beta = vectorized_logsumexp(beta[1, :]) raw_diff = numpy.abs(Z_alpha - Z_beta) print("alpha[-1,:] = {}".format(alpha[-1,:])) print("beta[1,:] = {}".format(beta[1,:])) print("Z_alpha : {}".format(Z_alpha)) print("Z_beta : {}".format(Z_beta)) print("Z_aplha - Z_beta {}".format(raw_diff)) rel_diff = raw_diff/(Z_alpha + Z_beta) print("rel_diff : {}".format(rel_diff)) self.clear_cached_info([seq_id]) #print("seqs_info {}".format(self.seqs_info)) return((raw_diff, rel_diff))
if __name__ == "__main__": pass