'''
@author: ahmed allam <ahmed.allam@yale.edu>
'''
import numpy
from .linear_chain_crf import LCRFModelRepresentation, LCRF
from .utilities import HOSemi_AStarSearcher, vectorized_logsumexp, generate_partitions, generate_partition_boundaries
[docs]class HOSemiCRFADModelRepresentation(LCRFModelRepresentation):
r"""Model representation that will hold data structures to be used in :class:`HOSemiCRF` class
Attributes:
P_codebook: set of proper prefixes of the elements in the set of patterns :attr:`Z_codebook`
e.g. {'':0, 'P':1, 'L':2, 'O':3, 'L|O':4, ...}
P_codebook_rev: reversed codebook of :attr:`P_codebook`
e.g. {0:'', 1:'P', 2:'L', 3:'O', 4:'L|O', ...}
P_len: dictionary comprising the length (i.e. number of elements) of elements in :attr:`P_codebook`
e.g. {'':0, 'P':1, 'L':1, 'O':1, 'L|O':2, ...}
P_elems: dictionary comprising the composing elements of every prefix in :attr:`P_codebook`
e.g. {'':('',), 'P':('P',), 'L':('L',), 'O':('O',), 'L|O':('L','O'), ...}
P_numchar: dictionary comprising the number of characters for every prefix in :attr:`P_codebook`
e.g. {'':0, 'P':1, 'L':1, 'O':1, 'L|O':3, ...}
f_transition: a dictionary representing forward transition data structure having the form:
{pi:{pky, (pk, y)}} where pi represents the longest prefix element in :attr:`P_codebook`
for pky (representing the concatenation of elements in :attr:`P_codebook` and :attr:`Y_codebook`)
pky_codebook: generate a codebook for the elements of the set PY (the product of set P and Y)
pi_pky_map: a map between P elements and PY elements
z_pky_map: a map between elements of the Z set and PY set
it has the form/template {ypattern:[pky_elements]}
z_pi_piy_map: a map between elements of the Z set and PY set
it has the form/template {ypattern:(pk, pky, pi)}
"""
def __init__(self):
# call super class
super().__init__()
self.P_codebook = None
self.P_codebook_rev = None
self.P_len = None
self.P_elems = None
self.P_numchar = None
self.f_transition = None
self.pky_codebook = None
self.pi_pky_map = None
self.z_pky_map = None
self.z_pi_piy_map = None
[docs] def setup_model(self, modelfeatures, states, L):
"""setup and create the model representation
Creates all maps and codebooks needed by the :class:`HOSemiCRFAD` class
Args:
modelfeatures: set of features defining the model
states: set of states (i.e. tags)
L: length of longest segment
"""
super().setup_model(modelfeatures, states, L)
[docs] def generate_instance_properties(self):
"""generate instance properties that will be later used by :class:`HOSemiCRFAD` class
"""
super().generate_instance_properties()
self.P_codebook = self.get_forward_states()
self.P_codebook_rev = self.get_P_codebook_rev()
self.P_len, self.P_elems, self.P_numchar = self.get_P_info()
self.f_transition = self.get_forward_transition()
self.pky_codebook = self.get_pky_codebook()
self.pi_pky_map = self.get_pi_pky_map()
self.z_pky_map, self.z_pi_piy_map = self.map_pky_z()
[docs] def get_forward_states(self):
"""create set of forward states (referred to set P) and map each element to unique code
P is set of proper prefixes of the elements in :attr:`Z_codebook` set
"""
Y_codebook = self.Y_codebook
Z_elems = self.Z_elems
Z_len = self.Z_len
P = {}
for z_patt in Z_elems:
elems = Z_elems[z_patt]
z_len = Z_len[z_patt]
for i in range(z_len-1):
P["|".join(elems[:i+1])] = 1
for y in Y_codebook:
P[y] = 1
# empty element
P[""] = 1
P_codebook = {s:i for (i, s) in enumerate(P)}
#print("P_codebook ", P_codebook)
return(P_codebook)
[docs] def get_P_codebook_rev(self):
"""generate reversed codebook of :attr:`P_codebook`
"""
P_codebook = self.P_codebook
P_codebook_rev = {code:pi for pi, code in P_codebook.items()}
return(P_codebook_rev)
[docs] def get_P_info(self):
"""get the properties of P set (proper prefixes)
"""
P_codebook = self.P_codebook
P_len = {}
P_numchar = {}
P_elems = {}
for pi in P_codebook:
elems = pi.split("|")
P_elems[pi] = elems
if(pi == ""):
P_len[pi] = 0
P_numchar[pi] = 0
else:
P_len[pi] = len(elems)
P_numchar[pi] = len(pi)
return(P_len, P_elems, P_numchar)
[docs] def get_forward_transition(self):
"""generate forward transition data structure
Main tasks:
- create a set PY from the product of P and Y sets
- for each element in PY, determine the longest suffix existing in set P
- include all this info in :attr:`f_transition` dictionary
"""
Y_codebook = self.Y_codebook
P_codebook = self.P_codebook
P_numchar = self.P_numchar
Z_numchar = self.Z_numchar
# pk_y= {}
# for p in P_codebook:
# for y in Y_codebook:
# pk_y[(p, y)] = 1
pk_y = {(p,y) for p in P_codebook for y in Y_codebook}
pk_y_suffix = {}
for p in P_codebook:
if(p != ""):
len_p = P_numchar[p]
for (pk, y) in pk_y:
ref_str = pk + "|" + y
if(pk == ""):
len_ref = Z_numchar[y] + 1
else:
len_ref = P_numchar[pk] + Z_numchar[y] + 1
start_pos = len_ref - len_p
if(start_pos>=0):
# check suffix relation
check = ref_str[start_pos:] == p
#check = self.check_suffix(p, ref_str)
if(check):
if((pk, y) in pk_y_suffix):
pk_y_suffix[(pk, y)].append(p)
else:
pk_y_suffix[(pk, y)] = [p]
pk_y_suffix = self.keep_longest_elems(pk_y_suffix)
f_transition = {}
for (pk, y), pi in pk_y_suffix.items():
if(pk == ""):
elmkey = y
else:
elmkey = pk + "|" + y
if(pi in f_transition):
f_transition[pi][elmkey] = (pk, y)
else:
f_transition[pi] = {elmkey:(pk, y)}
#print("f_transition ", f_transition)
return(f_transition)
[docs] def get_pky_codebook(self):
"""generate a codebook for the elements of the set PY (the product of set P and Y)
"""
f_transition = self.f_transition
pky_codebook = {}
counter = 0
for pi in f_transition:
for pky in f_transition[pi]:
pky_codebook[pky] = counter
counter += 1
return(pky_codebook)
[docs] def map_pky_z(self):
"""generate a map between elements of the Z set and PY set"""
f_transition = self.f_transition
Z_codebook = self.Z_codebook
# given that we demand to have a unigram label features then Z set will always contain Y elems
Z_numchar = self.Z_numchar
P_numchar = self.P_numchar
pky_codebook = self.pky_codebook
P_codebook = self.P_codebook
z_pi_piy = {}
z_pky = {}
for pi in f_transition:
for pky, pk_y_tup in f_transition[pi].items():
pk, y = pk_y_tup
# get number of characters in the pky
if(pk == ""):
len_pky = Z_numchar[y]
else:
# +1 is for the separator '|'
len_pky = P_numchar[pk] + Z_numchar[y] + 1
for z in Z_codebook:
len_z = Z_numchar[z]
# check suffix relation
start_pos = len_pky - len_z
if(start_pos >= 0):
check = pky[start_pos:] == z
if(check):
pky_c = pky_codebook[pky]
pk_c = P_codebook[pk]
if(z in z_pky):
z_pky[z].append(pky_c)
z_pi_piy[z][0].append(pk_c)
z_pi_piy[z][1].append(pky_c)
z_pi_piy[z][2].append(P_codebook[pi])
else:
z_pky[z] = [pky_c]
z_pi_piy[z] = ([pk_c], [pky_c], [P_codebook[pi]])
return(z_pky, z_pi_piy)
[docs] def get_pi_pky_map(self):
""" generate map between P elements and PY elements
Main tasks:
- for every element in PY, determine the longest suffix in P
- determine the two components in PY (i.e. p and y element)
- represent this info in a dictionary that will be used for forward/alpha matrix
"""
f_transition = self.f_transition
pky_codebook = self.pky_codebook
P_codebook = self.P_codebook
pi_pky_map = {}
for pi in f_transition:
pi_pky_map[pi]=[[],[]]
for pky, (pk, __) in f_transition[pi].items():
pi_pky_map[pi][0].append(pky_codebook[pky])
pi_pky_map[pi][1].append(P_codebook[pk])
# convert list to numpy arrays
# for i in range(2):
# pi_pky_map[pi][i] = numpy.array(pi_pky_map[pi][i])
# pi_pky_map[pi] = tuple(pi_pky_map[pi])
return(pi_pky_map)
[docs] def filter_activated_states(self, activated_states, accum_active_states, curr_boundary):
"""filter/prune states and y features
Args:
activaed_states: dictionary containing possible active states/y features
it has the form {patt_len:{patt_1, patt_2, ...}}
accum_active_states: dictionary of only possible active states by position
it has the form {pos_1:{state_1, state_2, ...}}
boundary: tuple (u,v) representing the current boundary in the sequence
"""
Z_elems = self.Z_elems
filtered_activestates = {}
# generate partition boundaries
depth_node_map = {}
generate_partitions(curr_boundary, self.L, self.max_patt_len, {}, depth_node_map, None)
partition_boundaries = generate_partition_boundaries(depth_node_map)
for z_len in activated_states:
if(z_len == 1):
continue
if(z_len in partition_boundaries):
partitions = partition_boundaries[z_len]
filtered_activestates[z_len] = set()
for partition in partitions:
for z_patt in activated_states[z_len]:
check = True
zelems = Z_elems[z_patt]
for i in range(z_len):
bound = partition[i]
if(zelems[i] not in accum_active_states[bound]):
check = False
break
if(check):
filtered_activestates[z_len].add(z_patt)
return(filtered_activestates)
[docs]class HOSemiCRFAD(LCRF):
"""higher-order semi-CRF model that uses algorithmic differentiation in gradient computation
Args:
model: an instance of :class:`HOSemiCRFADModelRepresentation` class
seqs_representer: an instance of :class:`SeqsRepresenter` class
seqs_info: dictionary holding sequences info
Keyword Arguments:
load_info_fromdisk: integer from 0 to 5 specifying number of cached data
to be kept in memory. 0 means keep everything while
5 means load everything from disk
Attributes:
model: an instance of :class:`HOSemiCRFADModelRepresentation` class
weights: a numpy vector representing feature weights
seqs_representer: an instance of :class:`pyseqlab.feature_extraction.SeqsRepresenter` class
seqs_info: dictionary holding sequences info
beam_size: determines the size of the beam for state pruning
fun_dict: a function map
def_cached_entities: a list of the names of cached entities sorted (descending)
based on estimated space required in memory
"""
def __init__(self, model, seqs_representer, seqs_info, load_info_fromdisk = 5):
super().__init__(model, seqs_representer, seqs_info, load_info_fromdisk)
[docs] def cached_entitites(self, load_info_fromdisk):
"""construct list of names of cached entities in memory
"""
def_cached_entities = super().cached_entitites(load_info_fromdisk)
inmemory_info = ["alpha", "Z", "beta", "fpotential"]
def_cached_entities += inmemory_info
return(def_cached_entities)
[docs] def compute_fpotential(self, w, active_features):
"""compute the potential of active features in a specified boundary
Args:
w: weight vector (numpy vector)
active_features: dictionary of activated features in a specified boundary
"""
model = self.model
pky_codebook = model.pky_codebook
z_pky_map = model.z_pky_map
f_potential = numpy.zeros(len(pky_codebook))
# to consider caching the w_indx and fval as in cached_pf
for z in active_features:
w_indx, f_val = active_features[z]
potential = numpy.dot(w[w_indx], f_val)
# get all pky's in coded format where z maintains a suffix relation with them
pky_c_list = z_pky_map[z]
f_potential[pky_c_list] += potential
return(f_potential)
[docs] def compute_forward_vec(self, w, seq_id):
"""compute the forward matrix (alpha matrix)
Args:
w: weight vector (numpy vector)
seq_id: integer representing unique id assigned to the sequence
.. note::
activefeatures need to be loaded first in :attr:`seqs.info`
"""
model = self.model
pi_pky_map = model.pi_pky_map
P_len = model.P_len
P_codebook = model.P_codebook
T = self.seqs_info[seq_id]["T"]
L = self.model.L
activefeatures = self.seqs_info[seq_id]['activefeatures']
alpha = numpy.ones((T+1,len(P_codebook)), dtype='longdouble') * (-numpy.inf)
alpha[0,P_codebook[""]] = 0
fpotential_perboundary = {}
for j in range(1, T+1):
accumulator = numpy.ones((len(P_codebook), L), dtype='longdouble') * -numpy.inf
for d in range(L):
u = j - d
if(u <= 0):
break
v = j
f_potential = self.compute_fpotential(w, activefeatures[u,v])
fpotential_perboundary[u,v] = f_potential
for pi in pi_pky_map:
if(j>=P_len[pi]):
pi_c = P_codebook[pi]
pky_c_list, pk_c_list = pi_pky_map[pi]
vec = f_potential[pky_c_list] + alpha[u-1, pk_c_list]
accumulator[pi_c, d] = vectorized_logsumexp(vec)
for pi in pi_pky_map:
if(j>=P_len[pi]):
pi_c = P_codebook[pi]
if(L>1):
alpha[j, pi_c] = vectorized_logsumexp(accumulator[pi_c, :])
else:
alpha[j, pi_c] = accumulator[pi_c, 0]
self.seqs_info[seq_id]['fpotential'] = fpotential_perboundary
return(alpha)
[docs] def compute_backward_vec(self, w, seq_id):
"""compute the backward matrix (beta matrix)
Args:
w: weight vector (numpy vector)
seq_id: integer representing unique id assigned to the sequence
.. note::
fpotential per boundary dictionary should be available in :attr:`seqs.info`
"""
model = self.model
pi_pky_map = model.pi_pky_map
P_codebook = model.P_codebook
len_P = len(P_codebook)
T = self.seqs_info[seq_id]["T"]
L = model.L
fpotential_perboundary = self.seqs_info[seq_id]['fpotential']
beta = numpy.ones((T+2, len(P_codebook)), dtype='longdouble') * (-numpy.inf)
beta[T+1, :] = 0
for j in reversed(range(1, T+1)):
accum_mat = numpy.ones((len_P, L), dtype='longdouble') * (-numpy.inf)
for d in range(L):
track_comp = numpy.ones((len_P, len_P), dtype='longdouble') * (-numpy.inf)
u = j
v = j + d
if(v>T):
break
f_potential = fpotential_perboundary[u, v]
for pi in pi_pky_map:
pi_c = P_codebook[pi]
pky_c_list, pk_c_list = pi_pky_map[pi]
vec = f_potential[pky_c_list] + beta[v+1, pi_c]
track_comp[pk_c_list, pi_c] = vec
for p_c in P_codebook.values():
accum_mat[p_c, d] = vectorized_logsumexp(track_comp[p_c, :])
for p_c in P_codebook.values():
beta[u, p_c] = vectorized_logsumexp(accum_mat[p_c, :])
return(beta)
[docs] def compute_marginals(self, seq_id):
""" compute the marginal (i.e. probability of each y pattern at each position)
Args:
seq_id: integer representing unique id assigned to the sequence
.. note::
- fpotential per boundary dictionary should be available in :attr:`seqs.info`
- alpha matrix should be available in :attr:`seqs.info`
- beta matrix should be available in :attr:`seqs.info`
- Z (i.e. P(x)) should be available in :attr:`seqs.info`
"""
model = self.model
Z_codebook = model.Z_codebook
z_pi_piy = model.z_pi_piy_map
T = self.seqs_info[seq_id]["T"]
L = self.model.L
alpha = self.seqs_info[seq_id]["alpha"]
beta = self.seqs_info[seq_id]["beta"]
Z = self.seqs_info[seq_id]["Z"]
fpotential_perboundary = self.seqs_info[seq_id]['fpotential']
P_marginals = numpy.zeros((L, T+1, len(self.model.Z_codebook)), dtype='longdouble')
for j in range(1, T+1):
for d in range(L):
u = j
v = j + d
if(v > T):
break
boundary = (u, v)
f_potential = fpotential_perboundary[boundary]
for z in Z_codebook:
pi_c, piy_c, pk_c = z_pi_piy[z]
numerator = alpha[u-1, pi_c] + f_potential[piy_c] + beta[v+1, pk_c]
P_marginals[d, j, Z_codebook[z]] = numpy.exp(vectorized_logsumexp(numerator) - Z)
return(P_marginals)
[docs] def compute_feature_expectation(self, seq_id, P_marginals, grad):
"""compute the features expectations (i.e. expected count of the feature based on learned model)
Args:
seq_id: integer representing unique id assigned to the sequence
P_marginals: probability matrix for y patterns at each position in time
grad: numpy vector with dimension equal to the weight vector. It represents the gradient
that will be computed using the feature expectation and the global features of the sequence
.. note::
- activefeatures (per boundary) dictionary should be available in :attr:`seqs.info`
- P_marginal (marginal probability matrix) should be available in :attr:`seqs.info`
"""
activefeatures = self.seqs_info[seq_id]["activefeatures"]
Z_codebook = self.model.Z_codebook
for boundary, features_dict in activefeatures.items():
u, v = boundary
d = v-u
for z_patt in features_dict:
w_indx, f_val = features_dict[z_patt]
grad[w_indx] += f_val * P_marginals[d, u, Z_codebook[z_patt]]
[docs] def prune_states(self, score_vec, beam_size):
"""prune states that fall off the specified beam size
Args:
score_vec: score matrix
beam_size: specified size of the beam (integer)
"""
P_codebook_rev = self.model.P_codebook_rev
P_elems = self.model.P_elems
# using argpartition as better alternative to argsort
indx_partitioned_pi = numpy.argpartition(-score_vec, beam_size)
# identify top-k states/pi
indx_topk_pi = indx_partitioned_pi[:beam_size]
# get topk states
topk_pi = {P_codebook_rev[indx] for indx in indx_topk_pi}
topk_states = {P_elems[pi][-1] for pi in topk_pi}
return(topk_states)
[docs] def viterbi(self, w, seq_id, beam_size, stop_off_beam = False, y_ref=[], K=1):
"""decode sequences using viterbi decoder
Args:
w: weight vector (numpy vector)
seq_id: integer representing unique id assigned to the sequence
beam_size: integer representing the size of the beam
Keyword Arguments:
stop_off_beam: boolean indicating if to stop when the reference state \
falls off the beam (used in perceptron/search based learning)
y_ref: reference sequence list of labels (used while learning)
K: integer indicating number of decoded sequences required (i.e. top-k list)
A* searcher with viterbi will be used to generate k-decoded list
"""
model = self.model
P_elems = model.P_elems
pi_pky_map = model.pi_pky_map
P_codebook = model.P_codebook
P_codebook_rev = model.P_codebook_rev
L = model.L
len_P = len(P_codebook)
num_states = model.num_states
T = self.seqs_info[seq_id]["T"]
# records max score at every time step
delta = numpy.ones((T+1,len(P_codebook)), dtype='longdouble') * (-numpy.inf)
pi_mat = numpy.ones((len_P, L), dtype='longdouble')* (-numpy.inf)
# the score for the empty sequence at time 0 is 1
delta[0, P_codebook[""]] = 0
back_track = {}
# records where violation occurs -- it is 1-based indexing
viol_index = []
if(beam_size == num_states):
# case of exact search and decoding
l = {}
l['activefeatures'] = (seq_id, )
self.check_cached_info(seq_id, l)
active_features = self.seqs_info[seq_id]['activefeatures']
for j in range(1, T+1):
# reset pi_mat at every loop
pi_mat.fill(-numpy.inf)
backpointer = {}
for d in range(L):
u = j-d
if(u <= 0):
break
v = j
boundary = (u, v)
# vector of size len(pky)
f_potential = self.compute_fpotential(w, active_features[boundary])
for pi in pi_pky_map:
pi_c = P_codebook[pi]
pky_c_list, pk_c_list = pi_pky_map[pi]
vec = f_potential[pky_c_list] + delta[u-1, pk_c_list]
# print("f_potential[pky_c_list] ", f_potential[pky_c_list])
# print("delta[u-1, pk_c_list] ", delta[u-1, pk_c_list])
# print("vec ", vec)
pi_mat[pi_c, d] = numpy.max(vec)
argmax_indx = numpy.argmax(vec)
#print("argmax chosen ", argmax_ind)
pk_c_max = pk_c_list[argmax_indx]
#print('pk_c ', pk_c)
pk = P_codebook_rev[pk_c_max]
y = P_elems[pk][-1]
backpointer[d, pi_c] = (pk_c_max, y)
# print("backpointer ")
# print(backpointer)
# print("pi_mat")
# print(pi_mat)
# get the max for each pi across all segment lengths
for pi in pi_pky_map:
pi_c = P_codebook[pi]
delta[j, pi_c] = numpy.max(pi_mat[pi_c, :])
argmax_indx = numpy.argmax(pi_mat[pi_c, :])
pk_c, y = backpointer[argmax_indx, pi_c]
back_track[j, pi_c] = (argmax_indx, pk_c, y)
# print("delta ")
# print(delta)
# print("backtrack ")
# print(back_track)
else:
# case of inexact search and decoding
l = {}
l['seg_features'] = (seq_id, )
self.check_cached_info(seq_id, l)
# tracks active states by boundary
accum_activestates = {}
for j in range(1, T+1):
# reset pi_mat at every loop
pi_mat.fill(-numpy.inf)
backpointer = {}
for d in range(L):
u = j-d
if(u <= 0):
break
v = j
boundary = (u, v)
active_features = self.identify_activefeatures(seq_id, boundary, accum_activestates)
# vector of size len(pky)
f_potential = self.compute_fpotential(w, active_features)
for pi in pi_pky_map:
pi_c = P_codebook[pi]
pky_c_list, pk_c_list = pi_pky_map[pi]
vec = f_potential[pky_c_list] + delta[u-1, pk_c_list]
pi_mat[pi_c, d] = numpy.max(vec)
argmax_indx = numpy.argmax(vec)
#print("argmax chosen ", argmax_ind)
pk_c_max = pk_c_list[argmax_indx]
#print('pk_c ', pk_c)
pk = P_codebook_rev[pk_c_max]
y = P_elems[pk][-1]
backpointer[d, pi_c] = (pk_c_max, y)
topk_states = self.prune_states(pi_mat[:,d], beam_size)
# update tracked active states -- to consider renaming it
accum_activestates[boundary] = accum_activestates[boundary].intersection(topk_states)
# get the max for each pi across all segment lengths
for pi in pi_pky_map:
pi_c = P_codebook[pi]
delta[j, pi_c] = numpy.max(pi_mat[pi_c, :])
argmax_indx = numpy.argmax(pi_mat[pi_c, :])
pk_c, y = backpointer[argmax_indx, pi_c]
back_track[j, pi_c] = (argmax_indx, pk_c, y)
# in case we are using viterbi for learning
if(y_ref):
topk_states = self.prune_states(delta[j, :], beam_size)
if(y_ref[j-1] not in topk_states):
viol_index.append(j)
if(stop_off_beam):
T = j
break
if(K == 1):
# decoding the sequence
Y_decoded = []
p_T_c = numpy.argmax(delta[T,:])
p_T = P_codebook_rev[p_T_c]
y_T = P_elems[p_T][-1]
d, pt_c, yt = back_track[T, p_T_c]
for _ in range(d+1):
Y_decoded.append(y_T)
t = T - d - 1
while t>0:
new_d, new_pt_c, new_yt = back_track[t, pt_c]
for _ in range(new_d+1):
Y_decoded.append(yt)
t = t - new_d -1
pt_c = new_pt_c
yt = new_yt
Y_decoded.reverse()
#print("y_decoded ", Y_decoded)
return(Y_decoded, viol_index)
else:
asearcher = HOSemi_AStarSearcher(P_codebook_rev, P_elems)
topK = asearcher.search(delta, back_track, T, K)
# print('topk ', topK)
return(topK, viol_index)
if __name__ == "__main__":
pass