Source code for pyseqlab.attributes_extraction
'''
@author: ahmed allam <ahmed.allam@yale.edu>
'''
import os
from collections import defaultdict
from pyseqlab.utilities import SequenceStruct, ReaderWriter
[docs]class AttributeScaler(object):
"""attribute scalar class to scale/standardize continuous attributes/features
Args:
scaling_info: dictionary comprising the relevant info for performing standardization
method: string defining the method of scaling {rescaling, standardization}
Attributes:
scaling_info: dictionary comprising the relevant info for performing standardization
method: string defining the method of scaling {rescaling, standardization}
Example::
in case of *standardization*:
- scaling_info has the form: scaling_info[attr_name] = {'mean':value,'sd':value}
in case of *rescaling*
- scaling_info has the form: scaling_info[attr_name] = {'max':value, 'min':value}
"""
def __init__(self, scaling_info, method):
self.scaling_info = scaling_info
self.method = method
[docs] def scale_continuous_attributes(self, seq, boundaries):
"""scale continuous attributes of a sequence for a list of boundaries
Args:
seq: a sequence instance of :class:`SequenceStruct`
boundaries: list of boundaries ``[(1,1), (2,2),...,]``
"""
scaling_info = self.scaling_info
method = self.method
seg_attr = seq.seg_attr
try:
if(method == "standardization"):
for attr_name in scaling_info:
attr_mean = scaling_info[attr_name]['mean']
attr_sd = scaling_info[attr_name]['sd']
for boundary in boundaries:
seg_attr[boundary][attr_name]= (seg_attr[boundary][attr_name] - attr_mean)/(attr_sd)
elif(method == "rescaling"):
for attr_name in scaling_info:
attr_max = scaling_info[attr_name]['max']
attr_min = scaling_info[attr_name]['min']
diff = attr_max - attr_min
if(diff == 0):
for boundary in boundaries:
seg_attr[boundary][attr_name]= 0
else:
for boundary in boundaries:
seg_attr[boundary][attr_name]= self.transform_scale(seg_attr[boundary][attr_name], attr_min, attr_max)
# seg_attr[boundary][attr_name]= (seg_attr[boundary][attr_name] - attr_min)/(diff)
except Exception as e:
print("one of the features is either constant or zero. Division by zero error...")
print(e)
[docs] def transform_scale(self, x, xref_min, xref_max):
"""transforms feature value to scale from [-1,1]"""
x_new = 2*(x-xref_min)/(xref_max-xref_min) - 1
return(x_new)
[docs] def save(self, folder_dir):
"""save relevant info about the scaler on disk
Args:
folder_dir: string representing directory where files are pickled/dumped
"""
save_info = {'AS_scalinginfo': self.scaling_info,
'AS_method':self.method
}
for name in save_info:
ReaderWriter.dump_data(save_info[name], os.path.join(folder_dir, name))
[docs]class GenericAttributeExtractor(object):
"""Generic attribute extractor class implementing observation functions that generates attributes from tokens/observations
Args:
attr_desc: dictionary defining the atomic observation/attribute names including
the encoding of such attribute (i.e. {continuous, categorical}}
Attributes:
attr_desc: dictionary defining the atomic observation/attribute names including
the encoding of such attribute (i.e. {continuous, categorical}}
seg_attr: dictionary comprising the extracted attributes per each boundary of a sequence
"""
def __init__(self, attr_desc):
self.attr_desc = attr_desc
self.determine_attr_encoding(attr_desc)
self.seg_attr = {}
[docs] def determine_attr_encoding(self, attr_desc):
for attr in attr_desc:
if(attr_desc[attr]['encoding'] == 'categorical'):
attr_desc[attr]['repr_func'] = self._represent_categorical_attr
else:
attr_desc[attr]['repr_func'] = self._represent_continuous_attr
[docs] def group_attributes(self):
"""function to group attributes based on the encoding type (i.e. continuous vs. categorical)"""
attr_desc = self.attr_desc
grouped_attr = {}
for attr_name in attr_desc:
encoding_type = attr_desc[attr_name]['encoding']
if(encoding_type in grouped_attr):
grouped_attr[encoding_type].append(attr_name)
else:
grouped_attr[encoding_type] = [attr_name]
return(grouped_attr)
[docs] def generate_attributes(self, seq, boundaries):
X = seq.X
observed_attrnames = list(X[1].keys())
# segment attributes dictionary
self.seg_attr = {}
new_boundaries = []
# create segments from observations using the provided boundaries
for boundary in boundaries:
if(boundary not in seq.seg_attr):
self._create_segment(X, boundary, observed_attrnames)
new_boundaries.append(boundary)
# print("seg_attr {}".format(self.seg_attr))
# print("new_boundaries {}".format(new_boundaries))
if(self.seg_attr):
# save generated attributes in seq
seq.seg_attr.update(self.seg_attr)
# print('saved attribute {}'.format(seq.seg_attr))
# clear the instance variable seg_attr
self.seg_attr = {}
return(new_boundaries)
def _create_segment(self, X, boundary, attr_names, sep = " "):
self.seg_attr[boundary] = {}
attr_desc = self.attr_desc
for attr_name in attr_names:
segment_value = self._get_segment_value(X, boundary, attr_name)
self.seg_attr[boundary][attr_name] = attr_desc[attr_name]['repr_func'](segment_value, sep)
def _get_segment_value(self, X, boundary, target_attr):
u = boundary[0]
v = boundary[1]
segment = []
for i in range(u, v+1):
segment.append(X[i][target_attr])
return(segment)
def _represent_categorical_attr(self, attributes, sep):
"""function to represent categorical attributes
"""
return(sep.join(attributes))
def _represent_continuous_attr(self, attributes, sep=None):
"""function to represent continuous attributes
"""
return(sum(float(attr) for attr in attributes))
[docs]class NERSegmentAttributeExtractor(GenericAttributeExtractor):
"""class implementing observation functions that generates attributes from word tokens/observations
Args:
attr_desc: dictionary defining the atomic observation/attribute names including
the encoding of such attribute (i.e. {continuous, categorical}}
Attributes:
attr_desc: dictionary defining the atomic observation/attribute names including
the encoding of such attribute (i.e. {continuous, categorical}}
seg_attr: dictionary comprising the extracted attributes per each boundary of a sequence
"""
def __init__(self):
attr_desc = self.generate_attributes_desc()
super().__init__(attr_desc)
[docs] def generate_attributes_desc(self):
"""define attributes by including description and encoding of each observation or observation feature
"""
attr_desc = {}
attr_desc['w'] = {'description':'the word/token',
'encoding':'categorical'
}
attr_desc['shape'] = {'description':'the shape of the word',
'encoding':'categorical'
}
attr_desc['shaped'] = {'description':'the compressed/degenerated form/shape of the word',
'encoding':'categorical'
}
attr_desc['seg_numchars'] = {'description':'number of characters in a segment',
'encoding':'continuous'
}
attr_desc['seg_len'] = {'description':'the length of a segment',
'encoding':'continuous'
}
return(attr_desc)
[docs] def generate_attributes(self, seq, boundaries):
"""generate attributes of the sequence observations in a specified list of boundaries
Args:
seq: a sequence instance of :class:`SequenceStruct`
boundaries: list of boundaries [(1,1), (2,2),...,]
.. note::
the generated attributes are saved first in :attr:`seg_attr` and then passed to
the **`seq.seg_attr`**. In other words, at the end :attr:`seg_att` is always cleared
"""
X = seq.X
observed_attrnames = list(X[1].keys() & self.attr_desc.keys())
# segment attributes dictionary
self.seg_attr = {}
new_boundaries = []
# create segments from observations using the provided boundaries
for boundary in boundaries:
if(boundary not in seq.seg_attr):
self._create_segment(X, boundary, observed_attrnames)
new_boundaries.append(boundary)
# print("seg_attr {}".format(self.seg_attr))
# print("new_boundaries {}".format(new_boundaries))
if(self.seg_attr):
attr_names_boa = ('w', 'shaped')
for boundary in new_boundaries:
self.get_shape(boundary)
self.get_degenerateshape(boundary)
self.get_seg_length(boundary)
self.get_num_chars(boundary)
# generate bag of attributes properties in every segment
self.get_seg_bagofattributes(boundary, attr_names_boa)
# save generated attributes in seq
seq.seg_attr.update(self.seg_attr)
# print('saved attribute {}'.format(seq.seg_attr))
# clear the instance variable seg_attr
self.seg_attr = {}
return(new_boundaries)
[docs] def get_shape(self, boundary):
"""get shape of segment
Args:
boundary: tuple (u,v) that marks beginning and end of a segment
"""
segment = self.seg_attr[boundary]['w']
res = ''
for char in segment:
if char.isupper():
res += 'A'
elif char.islower():
res += 'a'
elif char.isdigit():
res += 'D'
else:
res += '_'
self.seg_attr[boundary]['shape'] = res
[docs] def get_degenerateshape(self, boundary):
"""get degenerate shape of segment
Args:
boundary: tuple (u,v) that marks beginning and end of a segment
"""
segment = self.seg_attr[boundary]['shape']
track = ''
for char in segment:
if not track or track[-1] != char:
track += char
self.seg_attr[boundary]['shaped'] = track
[docs] def get_seg_length(self, boundary):
"""get the length of a segment
Args:
boundary: tuple (u,v) that marks beginning and end of a segment
"""
# begin and end of a boundary
u = boundary[0]
v = boundary[-1]
seg_len = v - u + 1
self.seg_attr[boundary]['seg_len'] = seg_len
[docs] def get_num_chars(self, boundary, filter_out = " "):
"""get the number of characters of a segment
Args:
boundary: tuple (u,v) that marks beginning and end of a segment
filter_out: string the default separator between attributes
"""
segment = self.seg_attr[boundary]['w']
filtered_segment = segment.split(sep = filter_out)
num_chars = 0
for entry in filtered_segment:
num_chars += len(entry)
self.seg_attr[boundary]['seg_numchars'] = num_chars
[docs] def get_seg_bagofattributes(self, boundary, attr_names, sep = " "):
"""implements the bag-of-attributes concept within a segment
Args:
boundary: tuple (u,v) representing current boundary
attr_names: list of names of the atomic observations/attributes
sep: separator (by default is the space)
.. note::
it can be used **only** with attributes that have binary_encoding type set equal True
"""
prefix = 'bag_of_attr'
attr_desc = self.attr_desc
# generate bag of attributes properties in every segment
for attr_name in attr_names:
segment = self.seg_attr[boundary][attr_name]
split_segment = segment.split(sep)
count_dict = defaultdict(int)
for elem in split_segment:
count_dict[elem] += 1
for attr_value, count in count_dict.items():
fkey = prefix + '_' + attr_name + '_' + attr_value
self.seg_attr[boundary][fkey] = count
# adding dynamically the description and the encoding of the new bag of attributes property
if(fkey not in attr_desc):
attr_desc[fkey] = {'description':'{} -- bag of attributes property'.format("fkey"),
'encoding':'continuous'
}
if __name__ == "__main__":
# sequence example is from `Cuong et al. paper <>`_
X = [{'w':'Peter'}, {'w':'goes'}, {'w':'to'}, {'w':'Britain'}, {'w':'and'}, {'w':'France'}, {'w':'annually'},{'w':'.'}]
Y = ['P', 'O', 'O', 'L', 'O', 'L', 'O', 'O']
seq = SequenceStruct(X, Y)
attr_extractor = NERSegmentAttributeExtractor()
print("attr_desc {}".format(attr_extractor.attr_desc))
attr_extractor.generate_attributes(seq, seq.get_y_boundaries())
for boundary, seg_attr in seq.seg_attr.items():
print("boundary {}".format(boundary))
print("attributes {}".format(seg_attr))
print("seg_attr {}".format(seq.seg_attr))