'''
@author: ahmed allam <ahmed.allam@yale.edu>
'''
import os
import pickle
import shutil
from datetime import datetime
from copy import deepcopy
from itertools import combinations
import heapq
import numpy
[docs]class SequenceStruct(object):
r"""class for representing each sequence/segment
Args:
Y: list containing the sequence of states/labels (i.e. ['P','O','O','L','L'])
X: list containing dictionary elements of observation sequences and/or features of the input
seg_other_symbol: string or None (default), if specified then the task is a segmentation problem
where it represents the non-entity symbol else (None) then it is considered
as sequence labeling problem
Attributes:
Y: list containing the sequence of states/labels (i.e. ['P','O','O','L','L'])
X: list containing dictionary elements of observation sequences and/or features of the input
seg_other_symbol: string or None(default), if specified then the task is a segmentation problem
where it represents the non-entity symbol else (None) then it is considered
as sequence labeling problem
T: int, length of a sequence (i.e. len(X))
seg_attr: dictionary comprising the extracted attributes per each boundary of a sequence
L: int, longest length of an identified segment in the sequence
flat_y: list of labels/tags
y_sboundaries: sorted list of boundaries of the :attr:`Y` of the sequence
y_range: range of the sequence
"""
def __init__(self, X, Y, seg_other_symbol = None):
self.seg_attr = {}
self.X = X
self.Y = (Y, seg_other_symbol)
@property
def X(self):
return(self._X)
@X.setter
def X(self, l):
"""setup the observation sequence
Args:
l: a list of elements (i.e. ``X = [{'w':'Michael'}, {'w':'is'}, {'w':'in'}, {'w':'New'}, {'w':'Haven'}]``)
Example::
the output X becomes:
{1:{'w':'Michael'},
2:{'w':'is'},
3:{'w':'in'},
4:{'w':'New'},
5:{'w':'Haven'}
}
"""
self._X = {}
T = len(l)
for i in range(T):
self._X[i+1] = l[i]
# new assignment clear seg_attr
if(self.seg_attr):
self.seg_attr.clear()
self.T = T
@property
def Y(self):
return(self._Y)
@Y.setter
def Y(self, elmtup):
"""setup the label sequence
Args:
elmtup: tuple consisting of:
- **Y** a list of elements (i.e. ``Y = ['P','O','O','L','L']``)
representing the labels of the elements in X
- **non_entity_symbol** which represents the Other category (i.e. non entity element which is 'O' in above example)
Example:
Y after the transformation becomes ``{(1, 1): 'P', (2,2): 'O', (3, 3): 'O', (4, 5): 'L'}``
"""
try:
Y_ref, non_entity_symb = elmtup
except ValueError:
raise ValueError("tuple containing Y and non-entity symbol must be passed")
else:
self._Y = {}
# length of longest entity in a segment
L = 1
if(non_entity_symb):
label_indices = {}
for i in range(len(Y_ref)):
label = Y_ref[i]
if(label in label_indices):
label_indices[label].append(i+1)
else:
label_indices[label] = [i+1]
for label, indices_list in label_indices.items():
if(label == non_entity_symb or len(indices_list) == 1):
for indx in indices_list:
boundary = (indx, indx)
self._Y[boundary] = label
else:
indx_stack = []
for indx in indices_list:
if(not indx_stack):
indx_stack.append(indx)
else:
diff = indx - indx_stack[-1]
if(diff > 1):
boundary = (indx_stack[0], indx_stack[-1])
self._Y[boundary] = label
l = indx_stack[-1] - indx_stack[0] + 1
if(l > L):
L = l
indx_stack = [indx]
else:
indx_stack.append(indx)
if(indx_stack):
boundary = (indx_stack[0], indx_stack[-1])
self._Y[boundary] = label
l = indx_stack[-1] - indx_stack[0] + 1
if(l > L):
L = l
indx_stack = [indx]
else:
for i in range(len(Y_ref)):
label = Y_ref[i]
boundary = (i+1, i+1)
self._Y[boundary] = label
# store the length of longest entity
self.L = L
# keep a copy of Y in as flat list (i.e. ['P','O','O','L','L'])
self.flat_y = Y_ref
# construct a map from the yboundaries to the pos in the list
y_sboundaries = self.get_y_boundaries()
self.y_sboundaries = y_sboundaries
self.y_boundpos_map = {}
pos = 0
for boundary in y_sboundaries:
self.y_boundpos_map[boundary] = pos
pos += 1
self.y_range = set(range(0, pos))
# def update_boundaries(self):
# self.y_boundaries = self.get_y_boundaries()
# self.x_boundaries = self.get_x_boundaries()
[docs] def flatten_y(self, Y):
r"""flatten the :attr:`Y` attribute
Args:
Y: dictionary of this form ``{(1, 1): 'P', (2,2): 'O', (3, 3): 'O', (4, 5): 'L'}``
Example:
flattened y becomes ``['P','O','O','L','L']``
"""
s_boundaries = sorted(Y)
flat_y = []
for u, v in s_boundaries:
for _ in range(u, v+1):
flat_y.append(Y[(u,v)])
return(flat_y)
[docs] def get_y_boundaries(self):
"""return the sorted boundaries of the labels of the sequence"""
return(sorted(self.Y.keys()))
[docs] def get_x_boundaries(self):
"""return the boundaries of the observation sequence"""
boundaries = []
for u in self.X:
boundaries.append((u, u))
return(boundaries)
def __str__(self):
"""return string representation of the parsed sequence"""
out_str = "Y sequence:\n {}\nX sequence:\n {}\n{}".format(self.flat_y, self.X, "-"*40)
return(out_str)
[docs]class DataFileParser(object):
"""class to parse a data file comprising the training/testing data
Attributes:
seqs: list comprising of sequences that are instances of :class:`SequenceStruct` class
header: list of attribute names read from the file
"""
def __init__(self):
self.header = []
[docs] def read_file(self, file_path, header, y_ref = True, seg_other_symbol = None, column_sep = " "):
r"""read and parse a file the contains the sequences following a predefined format
the file should contain label and observation tracks each separated in a column
.. note::
label column is the **LAST** column in the file (i.e. X_a X_b Y)
Args:
file_path: string representing the file path to the data file
header: specifies how the header is reported in the file containing the sequences
options include:
- 'main' -> one header in the beginning of the file
- 'per_sequence' -> a header for every sequence
- list of keywords as header (i.e. ['w', 'part_of_speech'])
Keyword Arguments:
y_ref: boolean specifying if the reference label column in the data file
seg_other_sybmol: string or None(default), if specified then the task is a segmentation problem
where `seg_other_symbol` represents the non-entity symbol. In this case semi-CRF models
are used. Else (i.e. `seg_other_symbol` is not None) then it is considered
as sequence labeling problem.
column_sep: string, separator used between the columns in the file
"""
if(y_ref):
update_seq = self.update_XY
else:
update_seq = self.update_X
with open(file_path) as file_obj:
counter = 0
X = []
Y = []
for line in file_obj:
counter += 1
line = line.rstrip()
# print(line)
if line:
# print(line)
if(y_ref):
*x_arg, y = line.split(column_sep)
self._xarg = x_arg
self._y = y
else:
x_arg = line.split(column_sep)
self._xarg = x_arg
# print(x_arg)
# first line of a sequence
if(counter == 1):
if(header == "main"):
if(self.header):
update_seq(X, Y)
# X.append(self.parse_line(x_arg))
# Y.append(y)
else:
self.parse_header(x_arg)
elif(header == "per_sequence"):
if(not self.header):
self.parse_header(x_arg)
else:
if(self.header):
update_seq(X, Y)
# X.append(self.parse_line(x_arg))
# Y.append(y)
else:
self.parse_header(header)
update_seq(X, Y)
# X.append(self.parse_line(x_arg))
# Y.append(y)
else:
update_seq(X, Y)
# X.append(self.parse_line(x_arg))
# Y.append(y)
else:
seq = SequenceStruct(X, Y, seg_other_symbol)
# reset counter for filling new sequence
counter = 0
X = []
Y = []
self._xarg = None
self._y = None
yield seq
if(X and Y):
seq = SequenceStruct(X, Y, seg_other_symbol)
# reset counter for filling new sequence
counter = 0
X = []
Y = []
self._xarg = None
self._y = None
yield seq
[docs] def update_XY(self, X, Y):
"""update sequence observations and corresponding labels"""
X.append(self.parse_line(self._xarg))
Y.append(self._y)
[docs] def update_X(self, X, Y):
"""update sequence observations"""
X.append(self.parse_line(self._xarg))
[docs] def parse_line(self, x_arg):
"""parse the read line
Args:
x_arg: tuple of observation columns
"""
# fill the sequences X and Y with observations and tags respectively
header = self.header
x = {}
for i in range(len(x_arg)):
x[header[i]] = x_arg[i]
return(x)
[docs]class ReaderWriter(object):
"""class for dumping, reading and logging data"""
def __init__(self):
pass
@staticmethod
[docs] def dump_data(data, file_name, mode = "wb"):
"""dump data by pickling
Args:
data: data to be pickled
file_name: file path where data will be dumped
mode: specify writing options i.e. binary or unicode
"""
with open(file_name, mode) as f:
pickle.dump(data, f, protocol = 4)
@staticmethod
[docs] def read_data(file_name, mode = "rb"):
"""read dumped/pickled data
Args:
file_name: file path where data will be dumped
mode: specify writing options i.e. binary or unicode
"""
with open(file_name, mode) as f:
data = pickle.load(f)
return(data)
@staticmethod
[docs] def log_progress(line, outfile, mode="a"):
"""write data to a file
Args:
line: string representing data to be written out
outfile: file path where data will be written/logged
mode: specify writing options i.e. append, write
"""
with open(outfile, mode) as f:
f.write(line)
[docs]class AStarNode(object):
"""class representing A* node to be used with A* searcher and viterbi for generating k-decoded list
Args:
cost: float representing the score/unnormalized probability of a sequence up to given position
position: integer representing the current position in the sequence
pi_c: prefix or state code of the label
label: label of the current position in a sequence
frwdlink: a link to :class:`AStarNode` node
Attributes:
cost: float representing the score/unnormalized probability of a sequence up to given position
position: integer representing the current position in the sequence
pi_c: prefix or state code of the label
label: label of the current position in a sequence
frwdlink: a link to :class:`AStarNode` node
"""
def __init__(self, cost, position, pi_c, label, frwdlink):
self.cost = cost
self.position = position
self.pi_c = pi_c
self.label = label
self.frwdlink = frwdlink
[docs] def print_node(self):
"""print the info about a node"""
statement = "cost: {}, position: {}, pi_code: {}, label: {}, ".format(self.cost, self.position, self.pi_c, self.label)
if(self.frwdlink):
statement += "forward_link: {}".format(self.frwdlink)
else:
statement += "forward_link: None"
print(statement)
[docs]class AStarAgenda(object):
"""class containing a heap where instances of :class:`AStarNode` class will be pushed
the push operation will use the score matrix (built using viterbi algorithm)
representing the unnormalized probability of the sequences ending at every position
with the different available prefixes/states
Attributes:
qagenda: queue where instances of :class:`AStarNode` are pushed
entry_count: counter that keeps track of the entries and associate each entry(node)
with a unique number. It is useful for resolving nodes with equal costs
"""
def __init__(self):
self.qagenda = []
self.entry_count = 0
[docs] def push(self, astar_node, cost):
"""push instance of :class:`AStarNode` with its associated cost to the heap
Args:
astar_node: instance of :class:`AStarNode` class
cost: float representing the score/unnormalized probability of a sequence up to given position
"""
heapq.heappush(self.qagenda, (-cost, self.entry_count, astar_node))
self.entry_count += 1
[docs] def pop(self):
"""pop nodes with highest score from the heap
"""
astar_node = heapq.heappop(self.qagenda)[-1]
return(astar_node)
[docs]class FO_AStarSearcher(object):
"""A* star searcher associated with first-order CRF model such as :class:`FirstOrderCRF`
Args:
Y_codebook_rev: a reversed version of dictionary comprising the set of states each assigned a unique code
Attributes:
Y_codebook_rev: a reversed version of dictionary comprising the set of states each assigned a unique code
"""
def __init__(self, Y_codebook_rev):
self.Y_codebook_rev = Y_codebook_rev
[docs] def infer_labels(self, top_node, back_track):
"""decode sequence by inferring labels
Args:
top_node: instance of :class:`AStarNode` class
back_track: dictionary containing back pointers built using dynamic programming algorithm
"""
Y_codebook_rev = self.Y_codebook_rev
# decoding the sequence
#print("we are decoding")
#top_node.print_node()
y_c = top_node.pi_c
pos = top_node.position
Y_decoded = []
Y_decoded.append(y_c)
t = pos - 1
while t>0:
y_c_tplus1 = Y_decoded[-1]
y_c_t = back_track[t+1, y_c_tplus1]
Y_decoded.append(y_c_t)
t -= 1
Y_decoded.reverse()
Y_decoded = [Y_codebook_rev[y_code] for y_code in Y_decoded]
while(top_node.frwdlink):
y = top_node.frwdlink.label
Y_decoded.append(y)
top_node = top_node.frwdlink
# print(Y_decoded)
return(Y_decoded)
[docs] def search(self, alpha, back_track, T, K):
"""A* star searcher uses the score matrix (built using viterbi algorithm) to decode top-K list of sequences
Args:
alpha: score matrix build using the viterbi algorithm
back_track: back_pointers dictionary tracking the best paths to every state
T: last decoded position of a sequence (in this context, it is the alpha.shape[0])
K: number of top decoded sequences to be returned
Returns:
topk_list: top-K list of decoded sequences
"""
# push the best astar nodes to the queue (i.e. the states at time T)
q = AStarAgenda()
r = set()
c = 0
Y_codebook_rev = self.Y_codebook_rev
# create nodes from the states at time T
for y_c in Y_codebook_rev:
cost = alpha[T, y_c]
pos = T
frwdlink = None
label = Y_codebook_rev[y_c]
node = AStarNode(cost, pos, y_c, label, frwdlink)
# node.print_node()
q.push(node, cost)
track = []
topk_list = []
try:
while c < K:
#print("heap size ", len(q.qagenda))
top_node = q.pop()
track.append(top_node)
for i in reversed(range(2, top_node.position+1)):
# best previous state at pos = i-1
curr_y_c = top_node.pi_c
bestprev_y_c = back_track[i, curr_y_c]
pos = i - 1
for prev_y_c in Y_codebook_rev:
# create a new astar node
if(prev_y_c != bestprev_y_c):
label = Y_codebook_rev[prev_y_c]
cost = alpha[pos, prev_y_c]
s = AStarNode(cost, pos, prev_y_c, label, top_node)
q.push(s, cost)
# create the backlink of the previous top_node (i.e. create a node from the best_y_c)
cost = alpha[pos, bestprev_y_c]
label = Y_codebook_rev[bestprev_y_c]
top_node = AStarNode(cost, pos, y_c, label, top_node)
# decode and check if it is not saved already in topk list
y_labels = self.infer_labels(track[-1], back_track)
# print(y_labels)
signature = "".join(y_labels)
if(signature not in r):
r.add(signature)
topk_list.append(y_labels)
c += 1
track.pop()
except (KeyError, IndexError) as e:
# consider logging the error
print(e)
finally:
#print('r ', r)
#print('topk ', topk_list)
return(topk_list)
[docs]class HO_AStarSearcher(object):
"""A* star searcher associated with higher-order CRF model such as :class:`HOCRFAD`
Args:
P_codebook_rev: reversed codebook of set of proper prefixes in the `P` set
e.g. ``{0:'', 1:'P', 2:'L', 3:'O', 4:'L|O', ...}``
P_elems: dictionary comprising the composing elements of every prefix in the `P` set
e.g. ``{'':('',), 'P':('P',), 'L':('L',), 'O':('O',), 'L|O':('L','O'), ...}``
Attributes:
P_codebook_rev: reversed codebook of set of proper prefixes in the `P` set
e.g. ``{0:'', 1:'P', 2:'L', 3:'O', 4:'L|O', ...}``
P_elems: dictionary comprising the composing elements of every prefix in the `P` set
e.g. ``{'':('',), 'P':('P',), 'L':('L',), 'O':('O',), 'L|O':('L','O'), ...}``
"""
def __init__(self, P_codebook_rev, P_elems):
self.P_codebook_rev = P_codebook_rev
self.P_elems = P_elems
[docs] def get_node_label(self, pi_code):
"""get the the label/state given a prefix code
Args:
pi_code: prefix code which is an element of :attr:`P_codebook_rev`
"""
pi = self.P_codebook_rev[pi_code]
y = self.P_elems[pi][-1]
return(y)
[docs] def infer_labels(self, top_node, back_track):
"""decode sequence by inferring labels
Args:
top_node: instance of :class:`AStarNode` class
back_track: dictionary containing back pointers tracking the best paths to every state
"""
# decoding the sequence
#print("we are decoding")
#top_node.print_node()
y = top_node.label
pi_c = top_node.pi_c
pos = top_node.position
Y_decoded = []
Y_decoded.append((pi_c, y))
#print("t={}, p_T_code={}, p_T={}, y_T ={}".format(T, p_T_code, p_T, y_T))
t = pos - 1
while t>0:
p_tplus1_c = Y_decoded[-1][0]
p_t_c, y_t = back_track[t+1, p_tplus1_c]
#print("t={}, (t+1, p_t_code)=({}, {})->({},{})".format(t, t+1, P_codebook[p_tplus1], p_t, y_t))
Y_decoded.append((p_t_c, y_t))
t -= 1
Y_decoded.reverse()
Y_decoded = [y for (__, y) in Y_decoded]
while(top_node.frwdlink):
y = top_node.frwdlink.label
Y_decoded.append(y)
top_node = top_node.frwdlink
# print(Y_decoded)
return(Y_decoded)
[docs] def search(self, alpha, back_track, T, K):
"""A* star searcher uses the score matrix (built using viterbi algorithm) to decode top-K list of sequences
Args:
alpha: score matrix build using the viterbi algorithm
back_track: back_pointers dictionary tracking the best paths to every state
T: last decoded position of a sequence (in this context, it is the alpha.shape[0])
K: number of top decoded sequences to be returned
Returns:
topk_list: top-K list of decoded sequences
"""
# push the best astar nodes to the queue (i.e. the pi's at time T)
q = AStarAgenda()
r = set()
c = 0
P_codebook_rev = self.P_codebook_rev
# create nodes from the pi's at time T
for pi_c in P_codebook_rev:
cost = alpha[T, pi_c]
pos = T
frwdlink = None
label = self.get_node_label(pi_c)
node = AStarNode(cost, pos, pi_c, label, frwdlink)
# node.print_node()
q.push(node, cost)
track = []
topk_list = []
try:
while c < K:
#print("heap size ", len(q.qagenda))
top_node = q.pop()
track.append(top_node)
for i in reversed(range(2, top_node.position+1)):
best_prev_pi_c, best_y = back_track[i, top_node.pi_c]
pos = i - 1
for prev_pi_c in P_codebook_rev:
# create a new astar node
if(prev_pi_c != best_prev_pi_c):
label = self.get_node_label(prev_pi_c)
cost = alpha[pos, prev_pi_c]
s = AStarNode(cost, pos, prev_pi_c, label, top_node)
q.push(s, cost)
# create the backlink of the top_node
cost = alpha[pos, best_prev_pi_c]
top_node = AStarNode(cost, pos, best_prev_pi_c, best_y, top_node)
# decode and check if it is not saved already in topk list
y_labels = self.infer_labels(track[-1], back_track)
# print(y_labels)
sig = "".join(y_labels)
if(sig not in r):
r.add(sig)
topk_list.append(y_labels)
c += 1
track.pop()
except (KeyError, IndexError) as e:
# consider logging the error
print(e)
finally:
#print('r ', r)
#print('topk ', topk_list)
return(topk_list)
[docs]class HOSemi_AStarSearcher(object):
"""A* star searcher associated with higher-order CRF model such as :class:`HOSemiCRFAD`
Args:
P_codebook_rev: reversed codebook of set of proper prefixes in the `P` set
e.g. ``{0:'', 1:'P', 2:'L', 3:'O', 4:'L|O', ...}``
P_elems: dictionary comprising the composing elements of every prefix in the `P` set
e.g. ``{'':('',), 'P':('P',), 'L':('L',), 'O':('O',), 'L|O':('L','O'), ...}``
Attributes:
P_codebook_rev: reversed codebook of set of proper prefixes in the `P` set
e.g. ``{0:'', 1:'P', 2:'L', 3:'O', 4:'L|O', ...}``
P_elems: dictionary comprising the composing elements of every prefix in the `P` set
e.g. ``{'':('',), 'P':('P',), 'L':('L',), 'O':('O',), 'L|O':('L','O'), ...}``
"""
def __init__(self, P_codebook_rev, pi_elems):
self.P_codebook_rev = P_codebook_rev
self.pi_elems = pi_elems
[docs] def get_node_label(self, pi_code):
"""get the the label/state given a prefix code
Args:
pi_code: prefix code which is an element of :attr:`P_codebook_rev`
"""
pi = self.P_codebook_rev[pi_code]
y = self.pi_elems[pi][-1]
return(y)
[docs] def infer_labels(self, top_node, back_track):
"""decode sequence by inferring labels
Args:
top_node: instance of :class:`AStarNode` class
back_track: dictionary containing back pointers tracking the best paths to every state
"""
# decoding the sequence
#print("we are decoding")
#top_node.print_node()
y = top_node.label
pi_c = top_node.pi_c
pos = top_node.position
Y_decoded = []
d, pt_c, yt = back_track[pos, pi_c]
for _ in range(d+1):
Y_decoded.append(y)
t = pos - d - 1
while t>0:
new_d, new_pt_c, new_yt = back_track[t, pt_c]
for _ in range(new_d+1):
Y_decoded.append(yt)
t = t - new_d -1
pt_c = new_pt_c
yt = new_yt
Y_decoded.reverse()
while(top_node.frwdlink):
y = top_node.frwdlink.label
Y_decoded.append(y)
top_node = top_node.frwdlink
# print(Y_decoded)
return(Y_decoded)
[docs] def search(self, alpha, back_track, T, K):
"""A* star searcher uses the score matrix (built using viterbi algorithm) to decode top-K list of sequences
Args:
alpha: score matrix build using the viterbi algorithm
back_track: back_pointers dictionary tracking the best paths to every state
T: last decoded position of a sequence (in this context, it is the alpha.shape[0])
K: number of top decoded sequences to be returned
Returns:
topk_list: top-K list of decoded sequences
"""
# push the best astar nodes to the queue (i.e. the pi's at time T)
q = AStarAgenda()
r = set()
c = 0
P_codebook_rev = self.P_codebook_rev
# create nodes from the pi's at time T
for pi_c in P_codebook_rev:
cost = alpha[T, pi_c]
pos = T
frwdlink = None
label = self.get_node_label(pi_c)
node = AStarNode(cost, pos, pi_c, label, frwdlink)
# node.print_node()
q.push(node, cost)
track = []
topk_list = []
try:
while c < K:
#print("heap size ", len(q.qagenda))
top_node = q.pop()
track.append(top_node)
while(True):
curr_pos = top_node.position
if(curr_pos == 1):
break
d, best_prev_pi_c, best_prev_y = back_track[curr_pos, top_node.pi_c]
prev_pos = curr_pos - d - 1
for prev_pi_c in P_codebook_rev:
# create a new astar node
if(prev_pi_c != best_prev_pi_c):
label = self.get_node_label(prev_pi_c)
cost = alpha[prev_pos, prev_pi_c]
s = AStarNode(cost, prev_pos, prev_pi_c, label, top_node)
q.push(s, cost)
# create the backlink of the top_node
cost = alpha[prev_pos, best_prev_pi_c]
top_node = AStarNode(cost, prev_pos, best_prev_pi_c, best_prev_y, top_node)
# decode and check if it is not saved already in topk list
y_labels = self.infer_labels(track[-1], back_track)
# print(y_labels)
sig = "".join(y_labels)
if(sig not in r):
r.add(sig)
topk_list.append(y_labels)
c += 1
track.pop()
except (KeyError, IndexError) as e:
# consider logging the error
print(e)
finally:
#print('r ', r)
#print('topk ', topk_list)
return(topk_list)
[docs]class TemplateGenerator(object):
"""template generator class for feature/function template generation
"""
def __init__(self):
pass
[docs] def generate_template_XY(self, attr_name, x_spec, y_spec, template):
r"""generate template XY for the feature extraction
Args:
attr_name: string representing the attribute name of the atomic observations/tokens
x_spec: tuple of the form (n-gram, range)
that is we can specify the n-gram features required in a specific range/window
for an observation token ``attr_name``
y_spec: string specifying how to join/combine the features on the X observation level
with labels on the Y level.
Example of passed options would be:
- one state (i.e. current state) by passing ``1-state`` or
- two states (i.e. current and previous state) by passing ``2-states`` or
- one and two states (i.e. mix/combine observation features with one state model and two states models)
by passing ``1-state:2-states``. Higher order models support models with states > 2 such as ``3-states`` and above.
template: dictionary that accumulates the generated feature template for all attributes
Example:
suppose we have `word` attribute referenced by 'w' and we need to use the current word
with the current label (i.e. unigram of words with the current label) in a range of (0,1)
::
templateXY = {}
generate_template_XY('w', ('1-gram', range(0, 1)), '1-state', templateXY)
we can also specify a two states/labels features at the Y level
::
generate_template_XY('w', ('1-gram', range(0, 1)), '1-state:2-states', templateXY)
.. note ::
this can be applied for every attribute name and accumulated in the `template` dictionary
"""
ngram_options, wsize = x_spec
templateX = self._traverse_x(attr_name, ngram_options, wsize)
templateY = self.generate_template_Y(y_spec)
templateXY = self._mix_template_XY(templateX, templateY)
#update the template we are building
self._update_template(template, templateXY)
def _update_template(self, template, templateXY):
"""update the accumulated template with the current generated templateXY
Args:
template: dictionary of the accumulated template for the different offsets
and attribute names
templateXY: dictionary of the form ``{attr_name:{x_offset:(y_offsets)}}``
"""
for attr_name in templateXY:
if(attr_name in template):
for x_offset in templateXY[attr_name]:
template[attr_name][x_offset] = templateXY[attr_name][x_offset]
else:
template[attr_name] = templateXY[attr_name]
def _traverse_x(self, attr_name, ngram_options, wsize):
"""generate template on the X observation level only
Args:
attr_name: string representing the attribute name of the atomic observations/tokens
ngram_options: string specifying the n-grams (i.e. ``1-gram``) it also supports multiple
specification such as ``1-gram:2-gram`` where each is separated by a colon
wsize: a range specifying the window size where the template operates
"""
options = ngram_options.split(":")
l = list(wsize)
template = {attr_name:{}}
for option in options:
n = int(option.split("-")[0])
ngram_list = self.generate_ngram(l, n)
for offset in ngram_list:
template[attr_name][offset] = None
return(template)
[docs] def generate_template_Y(self, ngram_options):
"""generate template on the Y labels level
Args:
ngram_options: string specifying the number of states to be use (i.e. ``1-state``).
It also supports multiple specification such as ``1-state:2-states``
where each is separated by a colon
"""
template = {'Y':[]}
options = ngram_options.split(":")
for option in options:
max_order = int(option.split("-")[0])
template['Y'] += self._traverse_y(max_order, accumulative = False)['Y']
return(template)
@staticmethod
def _traverse_y(max_order, accumulative = True):
"""generate the y template"""
attr_name = 'Y'
template = {attr_name:[]}
if(accumulative):
for j in range(max_order):
offsets_y = [-i for i in range(j+1)]
offsets_y = tuple(reversed(offsets_y))
template[attr_name].append(offsets_y)
else:
offsets_y = [-i for i in range(max_order)]
offsets_y = tuple(reversed(offsets_y))
template[attr_name].append(offsets_y)
return(template)
@staticmethod
def _mix_template_XY(templateX, templateY):
"""mix and join the template on the X observation level with the Y level
Args:
templateX: dictionary of the form ``{attr_name:{x_offset:None}}``
e.g. ``{'w': {(0,): None}}``
templateY: dictionary of the form ``{'Y':[y_offset]}``
e.g. ``{'Y': [(0,), (-1, 0)]}``
.. note::
- x_offset is a tuple of offsets representing the ngram options needed
such as (0,) for unigram and (-1,0) for bigram
- y_offset is a tuple of offsets representing the number of states options needed
such as (0,) for 1-state and (-1,0) for 2-states and (-2,-1,0) for 3-states
"""
template_XY = deepcopy(templateX)
for attr_name in template_XY:
for offset_x in template_XY[attr_name]:
template_XY[attr_name][offset_x] = tuple(templateY['Y'])
return(template_XY)
@staticmethod
[docs] def generate_ngram(l, n):
"""n-gram generator based on the length of the window and the ngram option
Args:
l: list of positions of the range representing the window size (i.e. list(wsize))
n: integer representing the n-gram option (i.e. 1 for unigram, 2 for bigram, etc..)
"""
ngram_list = []
for i in range(0, len(l)):
elem = tuple(l[i:i+n])
if(len(elem) != n):
break
ngram_list.append(elem)
return(ngram_list)
@staticmethod
[docs] def generate_combinations(n):
"""generates all possible combinations based on the maximum number of ngrams n
Args:
n: integer specifying the maximum/greatest ngram option
"""
option_names = []
start = 1
for i in range(start, n+1):
option_names.append("{}-gram".format(i))
config = {}
for i in range(start, n+1):
config[i] = list(combinations(option_names, i))
config_combinations = {}
for c_list in config.values():
for c_tup in c_list:
key_name = ":".join(c_tup)
config_combinations[key_name] = set()
elemkeys = config_combinations.keys()
for option_i in config_combinations:
s = config_combinations[option_i]
for option_j in elemkeys:
s.add(option_j)
config_combinations[option_i] = s
return(config_combinations)
[docs]class BoundNode(object):
"""boundary entity class used when generating all possible partitions within specified constraint
Args:
parent: instance of :class:`BoundNode`
boundary: tuple (u,v) representing the current boundary
"""
def __init__(self, parent, boundary):
self.parent = parent
self.boundary = boundary
self.children = []
[docs] def add_child(self, child):
"""add link to the child nodes"""
self.children.append(child)
[docs] def get_child(self):
"""retrieve child nodes"""
return(self.children.pop())
[docs] def get_signature(self):
"""retrieve the id of the node"""
return(id(self))
[docs]def generate_partitions(boundary, L, patt_len, bound_node_map, depth_node_map, parent_node, depth=1):
"""generate all possible partitions within the range of segment length and model order
it transforms the partitions into a tree of nodes starting from the root node
that uses `boundary` argument in its construction
Args:
boundary: tuple (u,v) representing the current boundary in a sequence
L: integer representing the maximum length a segment could be constructed
patt_len: integer representing the maximum model order
bound_node_map: dictionary that keeps track of all possible partitions represented as
instances of :class:`BoundNode`
depth_node_map: dictionary that arranges the generated nodes by their depth in the tree
parent_node: instance of :class:`BoundNode` or None in case of the root node
depth: integer representing the maximum depth of the tree to be reached before stopping
"""
if(depth >= patt_len):
return
if(parent_node):
if(boundary in bound_node_map):
curr_node = bound_node_map[boundary]
else:
curr_node = BoundNode(parent_node, boundary)
bound_node_map[boundary] = curr_node
if(depth in depth_node_map):
depth_node_map[depth].append(curr_node)
else:
depth_node_map[depth] = [curr_node]
else:
# setup root node
curr_node = BoundNode(None, boundary)
bound_node_map[boundary] = curr_node
depth_node_map[depth] = [curr_node]
u= boundary[0]-1
v= u
depth += 1
for d in range(L):
if(u-d < 1):
break
upd_boundary = (u-d, v)
if(upd_boundary in bound_node_map):
child = bound_node_map[upd_boundary]
else:
child = BoundNode(curr_node, upd_boundary)
bound_node_map[upd_boundary] = child
if(depth in depth_node_map):
depth_node_map[depth].append(child)
else:
depth_node_map[depth] = [child]
curr_node.add_child(child)
generate_partitions(upd_boundary, L, patt_len, bound_node_map, depth_node_map, child, depth)
[docs]def generate_partition_boundaries(depth_node_map):
"""generate partitions of the boundaries generated in :func:`generate_partitions` function
Args:
depth_node_map: dictionary that arranges the generated nodes by their depth in the tree
it is constructed using :func:`generate_partitions` function
"""
g = {}
depths = sorted(depth_node_map, reverse=True)
for depth in depths:
g[depth] = []
nodes = depth_node_map[depth]
for curr_node in nodes:
l = []
l.append(curr_node.boundary)
while(True):
curr_node = curr_node.parent
if(curr_node):
l.append(curr_node.boundary)
else:
g[depth].append(l)
break
return(g)
[docs]def delete_directory(directory):
if(os.path.isdir(directory)):
shutil.rmtree(directory)
[docs]def delete_file(filepath):
check = os.path.isfile(filepath)
if(check):
os.remove(filepath)
[docs]def create_directory(folder_name, directory = "current"):
"""create directory/folder (if it does not exist) and returns the path of the directory
Args:
folder_name: string representing the name of the folder to be created
Keyword Arguments:
directory: string representing the directory where to create the folder
if `current` then the folder will be created in the current directory
"""
if directory == "current":
path_current_dir = os.path.dirname(__file__)
else:
path_current_dir = directory
path_new_dir = os.path.join(path_current_dir, folder_name)
if not os.path.exists(path_new_dir):
os.makedirs(path_new_dir)
return(path_new_dir)
[docs]def generate_datetime_str():
"""generate string composed of the date and time"""
datetime_now = datetime.now()
datetime_str = "{}_{}_{}-{}_{}_{}_{}".format(datetime_now.year,
datetime_now.month,
datetime_now.day,
datetime_now.hour,
datetime_now.minute,
datetime_now.second,
datetime_now.microsecond)
return(datetime_str)
# def vectorized_logsumexp(vec):
# """vectorized version of log sum exponential operation
#
# Args:
# vec: numpy vector where entries are in the log domain
# """
# with numpy.errstate(invalid='warn'):
# max_a = numpy.max(vec)
# try:
# res = max_a + numpy.log(numpy.sum(numpy.exp(vec - max_a)))
# except Warning:
# res = max_a
# return(res)
[docs]def vectorized_logsumexp(vec):
"""vectorized version of log sum exponential operation
Args:
vec: numpy vector where entries are in the log domain
"""
max_a = numpy.max(vec)
if(max_a != -numpy.inf):
return(max_a + numpy.log(numpy.sum(numpy.exp(vec - max_a))))
# case where max_a == -numpy.inf
return(max_a)
[docs]def generate_updated_model(modelparts_dir, modelrepr_class,
model_class, aextractor_obj,
fextractor_class, seqrepresenter_class, ascaler_class=None):
"""update/regenerate CRF models using the saved parts/components
Args:
modelparts_dir: string representing the directory where model parts are saved
modelrepr_class: name of the model representation class to be used which has
suffix `ModelRepresentation` such as :class:`HOCRFADModelRepresentation`
model_class: name of the CRF model class such as :class:`HOCRFAD`
aextractor_class: name of the attribute extractor class such as :class:`NERSegmentAttributeExtractor`
fextractor_class: name of the feature extractor class used such as :class:`HOFeatureExtractor`
seqrepresenter_class: name of the sequence representer class such as :class:`SeqsRepresenter`
ascaler_class: name of the attribute scaler class such as :class:`AttributeScaler`
.. note::
This function is equivalent to :func:`generate_trained_model` function. However, this function
uses explicit specification of the arguments (i.e. specifying explicitly the classes to be used)
"""
from pyseqlab.attributes_extraction import GenericAttributeExtractor
ycodebook = ReaderWriter.read_data(os.path.join(modelparts_dir, "MR_Ycodebook"))
mfeatures = ReaderWriter.read_data(os.path.join(modelparts_dir, "MR_modelfeatures"))
mfeatures_codebook = ReaderWriter.read_data(os.path.join(modelparts_dir, "MR_modelfeaturescodebook"))
L = ReaderWriter.read_data(os.path.join(modelparts_dir, "MR_L"))
# generate model representation
new_mrepr = modelrepr_class()
new_mrepr.modelfeatures = mfeatures
new_mrepr.modelfeatures_codebook = mfeatures_codebook
new_mrepr.Y_codebook = ycodebook
new_mrepr.L = L
new_mrepr.generate_instance_properties()
# generate attribute extractor
if(type(aextractor_obj) == type(GenericAttributeExtractor)): # case it is a class
new_attrextractor = aextractor_obj()
else: # case it is an instance of a class
new_attrextractor = aextractor_obj
# generate feature extractor
templateX = ReaderWriter.read_data(os.path.join(modelparts_dir, "FE_templateX"))
templateY = ReaderWriter.read_data(os.path.join(modelparts_dir, "FE_templateY"))
new_fextractor = fextractor_class(templateX, templateY, new_attrextractor.attr_desc)
# generate sequence representer
new_seqrepr = seqrepresenter_class(new_attrextractor, new_fextractor)
# generate attribute scaler if applicable
if(ascaler_class):
scaling_info = ReaderWriter.read_data(os.path.join(modelparts_dir, "AS_scalinginfo"))
method = ReaderWriter.read_data(os.path.join(modelparts_dir, "AS_method"))
new_attrscaler = ascaler_class(scaling_info, method)
new_seqrepr.attr_scaler = new_attrscaler
# generate crf instance
new_crfmodel = model_class(new_mrepr, new_seqrepr, {})
new_crfmodel.weights = ReaderWriter.read_data(os.path.join(modelparts_dir, "weights"))
return(new_crfmodel)
[docs]def generate_trained_model(modelparts_dir, aextractor_obj):
"""regenerate trained CRF models using the saved trained model parts/components
Args:
modelparts_dir: string representing the directory where model parts are saved
aextractor_class: name of the attribute extractor class such as :class:`NERSegmentAttributeExtractor`
"""
# parse the class description file
class_desc = []
with open(os.path.join(modelparts_dir, 'class_desc.txt'), 'r') as f:
for line in f:
class_desc.append(line.strip())
from pyseqlab.features_extraction import HOFeatureExtractor, FOFeatureExtractor, SeqsRepresenter
seqrepresenter_class = SeqsRepresenter
if(class_desc[1] == 'HOCRFAD'):
from pyseqlab.ho_crf_ad import HOCRFAD, HOCRFADModelRepresentation
modelrepr_class = HOCRFADModelRepresentation
model_class = HOCRFAD
fextractor_class = HOFeatureExtractor
elif(class_desc[1] == 'HOCRF'):
from pyseqlab.ho_crf import HOCRF, HOCRFModelRepresentation
modelrepr_class = HOCRFModelRepresentation
model_class = HOCRF
fextractor_class = HOFeatureExtractor
elif(class_desc[1] == 'HOSemiCRFAD'):
from pyseqlab.hosemi_crf_ad import HOSemiCRFAD, HOSemiCRFADModelRepresentation
modelrepr_class = HOSemiCRFADModelRepresentation
model_class = HOSemiCRFAD
fextractor_class = HOFeatureExtractor
elif(class_desc[1] == 'HOSemiCRF'):
from pyseqlab.hosemi_crf import HOSemiCRF, HOSemiCRFModelRepresentation
modelrepr_class = HOSemiCRFModelRepresentation
model_class = HOSemiCRF
fextractor_class = HOFeatureExtractor
elif(class_desc[1] == 'FirstOrderCRF'):
from pyseqlab.fo_crf import FirstOrderCRF, FirstOrderCRFModelRepresentation
modelrepr_class = FirstOrderCRFModelRepresentation
model_class = FirstOrderCRF
fextractor_class = FOFeatureExtractor
# generate attribute scaler if applicable
if(class_desc[-1] != 'None'):
from pyseqlab.attributes_extraction import AttributeScaler
ascaler_class = AttributeScaler
else:
ascaler_class = None
trained_model = generate_updated_model(modelparts_dir, modelrepr_class, model_class,
aextractor_obj, fextractor_class, seqrepresenter_class, ascaler_class)
return(trained_model)
[docs]def split_data(seqs_id, options):
r"""utility function for splitting dataset (i.e. training/testing and cross validation)
Args:
seqs_id: list of processed sequence ids
options: dictionary comprising of the options on how to split data
Example:
To perform cross validation, we need to specify
- cross-validation for the `method`
- the number of folds for the `k_fold`
::
options = {'method':'cross_validation',
'k_fold':number
}
To perform random splitting, we need to specify
- random for the `method`
- number of splits for the `num_splits`
- size of the training set in percentage for the `trainset_size`
::
options = {'method':'random',
'num_splits':number,
'trainset_size':percentage
}
"""
N = len(seqs_id)
data_split = {}
method = options.get('method')
if(method == None):
method = 'cross_validation'
if(method == "cross_validation"):
k_fold = options.get("k_fold")
if(type(k_fold) != int):
# use 10 fold cross validation
k_fold = 10
elif(k_fold <= 0):
k_fold = 10
batch_size = int(numpy.ceil(N/k_fold))
test_seqs = seqs_id.copy()
seqs_len = len(test_seqs)
#numpy.random.shuffle(test_seqs)
indx = numpy.arange(0, seqs_len + 1, batch_size)
if(indx[-1] < seqs_len):
indx = numpy.append(indx, [seqs_len])
for i in range(len(indx)-1):
data_split[i] = {}
current_test_seqs = test_seqs[indx[i]:indx[i+1]]
data_split[i]["test"] = current_test_seqs
data_split[i]["train"] = list(set(seqs_id)-set(current_test_seqs))
elif(method == "random"):
num_splits = options.get("num_splits")
if(type(num_splits) != int):
num_splits = 5
trainset_size = options.get("trainset_size")
if(type(trainset_size) != int):
# 80% of the data set is training and 20% for testing
trainset_size = 80
elif(trainset_size <= 0 or trainset_size >=100):
trainset_size = 80
for i in range(num_splits):
data_split[i] = {}
current_train_seqs = numpy.random.choice(seqs_id, int(N*trainset_size/100), replace = False)
data_split[i]["train"] = list(current_train_seqs)
data_split[i]["test"] = list(set(seqs_id)-set(current_train_seqs))
return(data_split)
"""split data based on sequences length
we need to execute the three functions in order:
(1) :func:`group_seqs_by_length`
(2) :func:`weighted_sample`
(3) :func:`aggregate_weightedsample`
"""
[docs]def group_seqs_by_length(seqs_info):
"""group sequences by their length
Args:
seqs_info: dictionary comprsing info about the sequences
it has this form {seq_id:{T:length of sequence}}
.. note::
sequences that are with unique sequence length are grouped together as singeltons
"""
grouped_seqs = {}
for seq_id, seq_info in seqs_info.items():
T = seq_info["T"]
if(T in grouped_seqs):
grouped_seqs[T].append(seq_id)
else:
grouped_seqs[T] = [seq_id]
# loop to regroup single sequences
singelton = [T for T, seqs_id in grouped_seqs.items() if len(seqs_id) == 1]
singelton_seqs = []
for T in singelton:
singelton_seqs += grouped_seqs[T]
del grouped_seqs[T]
grouped_seqs["singleton"] = singelton_seqs
return(grouped_seqs)
[docs]def weighted_sample(grouped_seqs, trainset_size):
"""get a random split of the grouped sequences
Args:
grouped_seqs: dictionary of the grouped sequences based on their length
it is obtained using :func:`group_seqs_by_length` function
trainset_size: integer representing the size of the training set in percentage
"""
options = {'method':'random', 'num_splits':1, 'trainset_size':trainset_size}
wsample = {}
for group_var, seqs_id in grouped_seqs.items():
# quota = trainset_size*count_seqs[group_var]/total
data_split = split_data(seqs_id, options)
wsample[group_var] = data_split[0]
return(wsample)
[docs]def aggregate_weightedsample(w_sample):
"""represent the random picked sample for training/testing
Args:
w_sample: dictionary representing a random split of the grouped sequences
by their length. it is obtained using :func:`weighted_sample` function
"""
wdata_split= {"train":[],
"test": []}
for grouping_var in w_sample:
for data_cat in w_sample[grouping_var]:
wdata_split[data_cat] += w_sample[grouping_var][data_cat]
return({0:wdata_split})
##################################
[docs]def nested_cv(seqs_id, outer_kfold, inner_kfold):
"""generate nested cross-validation division of sequence ids
"""
outer_split = split_data(seqs_id, {'method':'cross_validation', 'k_fold':outer_kfold})
cv_hierarchy = {}
for outerfold, outer_datasplit in outer_split.items():
cv_hierarchy["{}_{}".format("outer", outerfold)] = outer_datasplit
curr_train_seqs = outer_datasplit['train']
inner_split = split_data(curr_train_seqs, {'method':'cross_validation', 'k_fold':inner_kfold})
for innerfold, inner_datasplit in inner_split.items():
cv_hierarchy["{}_{}_{}_{}".format("outer", outerfold, "inner", innerfold)] = inner_datasplit
return(cv_hierarchy)
[docs]def get_conll00():
current_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(current_dir)
files_info = {'train_short_main.txt':('main', True, " "),
'train_short_none.txt':(('w','pos'), True, " "),
'train_short_per_sequence.txt':('per_sequence', True, " ")
}
for file_name in files_info:
parser = DataFileParser()
print(file_name)
file_path = os.path.join(root_dir, "tests", "dataset","conll00",file_name)
for seq in parser.read_file(file_path, header=files_info[file_name][0], y_ref = files_info[file_name][1], column_sep=files_info[file_name][2]):
print(seq)
if __name__ == "__main__":
pass
#get_conll00()