Source code for pyseqlab.features_extraction

'''
@author: ahmed allam <ahmed.allam@yale.edu>
'''

import os
from copy import deepcopy
from datetime import datetime
from collections import Counter
import numpy
from .utilities import ReaderWriter, create_directory, generate_datetime_str
from .attributes_extraction import AttributeScaler

[docs]class FeatureExtractor(object): """Generic feature extractor class that contains feature functions/templates Args: templateX: dictionary specifying template to follow for observation features extraction. It has the form: ``{attr_name: {x_offset:tuple(y_offsets)}}``. e.g. ``{'w': {(0,):((0,), (-1,0), (-2,-1,0))}}`` templateY: dictionary specifying template to follow for y pattern features extraction. It has the form: ``{Y: tuple(y_offsets)}``. e.g. ``{'Y': ((0,), (-1,0), (-2,-1,0))}`` attr_desc: dictionary containing description and the encoding of the attributes/observations e.g. attr_desc['w'] = {'description':'the word/token','encoding':'categorical'} for more details/info check the :attr:`attr_desc` of the :class:`NERSegmentAttributeExtractor` Attributes: template_X: dictionary specifying template to follow for observation features extraction. It has the form: ``{attr_name: {x_offset:tuple(y_offsets)}}`` e.g. ``{'w': {(0,):((0,), (-1,0), (-2,-1,0))}}`` template_Y: dictionary specifying template to follow for y pattern features extraction. It has the form: ``{Y: tuple(y_offsets)}`` e.g. ``{'Y': ((0,), (-1,0), (-2,-1,0))}`` attr_desc: dictionary containing description and the encoding of the attributes/observations. e.g. ``attr_desc['w'] = {'description':'the word/token','encoding':'categorical'}``. For more details/info check the :attr:`attr_desc` of the :class:`NERSegmentAttributeExtractor` """ def __init__(self, templateX, templateY, attr_desc): self.template_X = templateX self.template_Y = templateY self.attr_desc = attr_desc self.attr_represent_func = self.attr_represent_funcmapper()
[docs] def attr_represent_funcmapper(self): """assign a representation function based on the encoding (i.e. categorical or continuous) of each attribute name """ attr_represent_func = {} attr_desc = self.attr_desc for attr_name in attr_desc: if(attr_desc[attr_name]["encoding"] == "categorical"): attr_represent_func[attr_name] = self._represent_categorical_attr else: attr_represent_func[attr_name] = self._represent_continuous_attr return(attr_represent_func)
@property def template_X(self): return self._template_X @template_X.setter def template_X(self, template): r"""setup/verify template_X Args: template: dictionary specifying template to follow for observation features extraction Example:: template_X = {'w': {(0,):((0,), (-1,0), (-2,-1,0))}} = {attr_name: {x_offset:tuple(y_offsets)}} """ if(type(template) == dict): self._template_X = {} self.y_offsets = set() self.x_featurenames = {} for attr_name, templateX in template.items(): self._template_X[attr_name] = {} self.x_featurenames[attr_name] = {} for offset_x, offsets_y in templateX.items(): s_offset_x = tuple(sorted(offset_x)) feature_name = '|'.join([attr_name + "[" + str(ofst_x) + "]" for ofst_x in s_offset_x]) self.x_featurenames[attr_name][offset_x] = feature_name unique_dict = {} for offset_y in offsets_y: s_offset_y = tuple(sorted(offset_y)) check = self._validate_template(s_offset_y) if(check): unique_dict[s_offset_y] = 1 self.y_offsets.add(s_offset_y) if(unique_dict): self._template_X[attr_name][s_offset_x] = tuple(unique_dict.keys()) @property def template_Y(self): return self._template_Y @template_Y.setter def template_Y(self, template): r"""setup/verify template_X Args: template: dictionary specifying template to follow for y pattern features extraction Example: :: template_Y = {'Y': ((0,), (-1,0), (-2,-1,0))} = {Y: tuple(y_offsets)} """ if(type(template) == dict): self._template_Y = {} unique_dict = {} offsets_y = template['Y'] for offset_y in offsets_y: s_offset_y = tuple(sorted(offset_y)) check = self._validate_template(s_offset_y) if(check): unique_dict[s_offset_y] = 1 if(unique_dict): self._template_Y['Y'] = tuple(unique_dict.keys()) else: self._template_Y['Y'] = () def _validate_template(self, template): """validate passed template Args: template: a tuple comprising the order of y pattern (i.e. (-2,-1,0)) """ check = True if(len(template) > 1): for i in range(len(template)-1): curr_elem = template[i] next_elem = template[i+1] diff = curr_elem - next_elem if(diff != -1): check = False break else: if(template[0] != 0): check = False return(check)
[docs] def extract_seq_features_perboundary(self, seq, seg_features=None): """extract features (observation and y pattern features) per boundary Args: seq: a sequence instance of :class:`SequenceStruct` Keywords Arguments: seg_features: optional dictionary of observation features """ # this method is used to extract features from sequences with known labels # (i.e. we know the Y labels and boundaries) Y = seq.Y features = {} for boundary in Y: xy_feat = self.extract_features_XY(seq, boundary, seg_features) y_feat = self.extract_features_Y(seq, boundary, self.template_Y) y_feat = y_feat['Y'] #print("boundary {}".format(boundary)) #print("boundary {}".format(boundary)) #print("y_feat {}".format(y_feat)) #print("xy_feat {}".format(xy_feat)) for offset_tup_y in y_feat: for y_patt in y_feat[offset_tup_y]: if(y_patt in xy_feat): xy_feat[y_patt].update(y_feat[offset_tup_y]) else: xy_feat[y_patt] = y_feat[offset_tup_y] features[boundary] = xy_feat # #print("features {}".format(features[boundary])) # #print("*"*40) return(features)
[docs] def aggregate_seq_features(self, features, boundaries): """aggregate features across all boundaries it is usually used to aggregate features in the dictionary obtained from :func:`extract_seq_features_perboundary()` function Args: features: dictionary of sequence features per boundary boundaries: list of boundaries where detected features are aggregated """ # summing up all local features across all boundaries seq_features = {} for boundary in boundaries: xy_features = features[boundary] for y_patt in xy_features: if(y_patt in seq_features): seq_features[y_patt].update(xy_features[y_patt]) # seq_features[y_patt] + xy_features[y_patt] else: seq_features[y_patt] = Counter(xy_features[y_patt]) return(seq_features)
# def extract_seq_features(self, seq): # features_per_boundary = self.extract_seq_features_perboundary(seq) # seq_features = self.agggregate_features(features_per_boundary, boundaries=seq.Y) # return(seq_features)
[docs] def extract_features_Y(self, seq, boundary, templateY): """extract y pattern features for a given sequence and template Y Args: seq: a sequence instance of :class:`SequenceStruct` boundary: tuple (u,v) representing current boundary templateY: dictionary specifying template to follow for extraction. It has the form: {Y: tuple(y_offsets)} e.g. ``{'Y': ((0,), (-1,0), (-2,-1,0))}`` """ # to remove y_range and substitute it by checking if pos is within 0 and seq.T template_Y = templateY['Y'] if(template_Y): Y = seq.Y y_sboundaries = seq.y_sboundaries y_boundpos_map = seq.y_boundpos_map curr_pos = y_boundpos_map[boundary] range_y = seq.y_range y_patt_features = {} feat_template = {} for offset_tup_y in template_Y: y_pattern = [] for offset_y in offset_tup_y: # offset_y should be always <= 0 pos = curr_pos + offset_y if(pos in range_y): b = y_sboundaries[pos] y_pattern.append(Y[b]) else: y_pattern = [] break if(y_pattern): feat_template[offset_tup_y] = {"|".join(y_pattern):1} y_patt_features['Y'] = feat_template else: y_patt_features = {'Y':{}} # #print("X"*40) # #print("boundary {}".format(boundary)) # for attr_name, f_template in y_patt_features.items(): # for offset, features in f_template.items(): # #print("{} -> {}".format(offset, features)) # #print("X"*40) return(y_patt_features)
[docs] def extract_features_X(self, seq, boundary): """extract observation features for a given sequence at a specified boundary Args: seq: a sequence instance of :class:`SequenceStruct` boundary: tuple (u,v) representing current boundary """ # get template X template_X = self.template_X x_featurenames = self.x_featurenames # current boundary begin and end u, v = boundary # #print("positions {}".format(positions)) seg_features = {} for attr_name in template_X: attr_represent_func = self.attr_represent_func[attr_name] # #print("attr_name {}".format(attr_name)) # check the type of attribute # to use instead a function mapper -- check the init method feat_template = {} for offset_tup_x in template_X[attr_name]: attributes = [] # #print("feature_name {}".format(feature_name)) for offset_x in offset_tup_x: # #print("offset_x {}".format(offset_x)) if(offset_x > 0): pos = (v + offset_x, v + offset_x) elif(offset_x < 0): pos = (u + offset_x, u + offset_x) else: pos = (u, v) if(pos in seq.seg_attr): attributes.append(seq.seg_attr[pos][attr_name]) # #print("attributes {}".format(attributes)) else: attributes = [] break if(attributes): # feat_template[offset_tup_x] = represent_attr(attributes, feature_name) feat_template[offset_tup_x] = attr_represent_func(attributes, x_featurenames[attr_name][offset_tup_x]) seg_features[attr_name] = feat_template # # #print("X"*40) # #print("boundary {}".format(boundary)) # for attr_name, f_template in seg_features.items(): # for offset, features in f_template.items(): # #print("{} -> {}".format(offset, features)) # #print("X"*40) return(seg_features)
[docs] def extract_features_XY(self, seq, boundary, seg_features = None): """extract/join observation features with y pattern features as specified :attr:`template_X` Args: seq: a sequence instance of :class:`SequenceStruct` boundary: tuple (u,v) representing current boundary Keywords Arguments: seg_features: optional dictionary of observation features Example:: template_X = {'w': {(0,):((0,), (-1,0), (-2,-1,0))}} Using template_X the function will extract all unigram features of the observation 'w' (0, ) and join it with: - zero-order y pattern features (0,) - first-order y pattern features (-1, 0) - second-order y pattern features (-2, -1, 0) template_Y = {'Y': ((0,), (-1,0), (-2,-1,0))} """ if(not seg_features): seg_feat_templates = self.extract_features_X(seq, boundary) else: seg_feat_templates = seg_features[boundary] y_feat_template = self.extract_features_Y(seq, boundary, {'Y':self.y_offsets}) # print(y_feat_template) # print(self.y_offsets) y_feat_template = y_feat_template['Y'] templateX = self.template_X # #print("seg_feat_templates {}".format(seg_feat_templates)) xy_features = {} for attr_name, seg_feat_template in seg_feat_templates.items(): for offset_tup_x in seg_feat_template: for offset_tup_y in templateX[attr_name][offset_tup_x]: if(offset_tup_y in y_feat_template): for y_patt in y_feat_template[offset_tup_y]: if(y_patt in xy_features): xy_features[y_patt].update(seg_feat_template[offset_tup_x]) else: xy_features[y_patt] = dict(seg_feat_template[offset_tup_x]) # #print("xy_features {}".format(xy_features)) return(xy_features)
[docs] def lookup_features_X(self, seq, boundary): """lookup observation features for a given sequence using varying boundaries (i.e. g(X, u, v)) Args: seq: a sequence instance of :class:`SequenceStruct` boundary: tuple (u,v) representing current boundary """ # get template X template_X = self.template_X x_featurenames = self.x_featurenames # current boundary begin and end u = boundary[0] v = boundary[-1] # #print("positions {}".format(positions)) seg_features = {} for attr_name in template_X: # #print("attr_name {}".format(attr_name)) # check the type of attribute attr_represent_func = self.attr_represent_func[attr_name] # the offset_tup_x is sorted tuple -- this is helpful in case of out of boundary tuples for offset_tup_x in template_X[attr_name]: attributes = [] # feature_name = '|'.join(['{}[{}]'.format(attr_name, offset_x) for offset_x in offset_tup_x]) # #print("feature_name {}".format(feature_name)) for offset_x in offset_tup_x: # #print("offset_x {}".format(offset_x)) if(offset_x > 0): pos = (v + offset_x, v + offset_x) elif(offset_x < 0): pos = (u + offset_x, u + offset_x) else: pos = (u, v) # #print("pos {}".format(pos)) if(pos in seq.seg_attr): attributes.append(seq.seg_attr[pos][attr_name]) # #print("attributes {}".format(attributes)) else: attributes = [] break if(attributes): seg_features.update(attr_represent_func(attributes, x_featurenames[attr_name][offset_tup_x])) # #print("seg_features lookup {}".format(seg_features)) return(seg_features)
[docs] def flatten_segfeatures(self, seg_features): """flatten observation features dictionary Args: seg_features: dictionary of observation features """ flat_segfeatures = {} for attr_name in seg_features: for offset in seg_features[attr_name]: flat_segfeatures.update(seg_features[attr_name][offset]) return(flat_segfeatures)
[docs] def lookup_seq_modelactivefeatures(self, seq, model, learning=False): """lookup/search model active features for a given sequence using varying boundaries (i.e. g(X, u, v)) Args: seq: a sequence instance of :class:`SequenceStruct` model: a model representation instance of the CRF class (i.e. the class having `ModelRepresentation` suffix) Keyword Arguments: learning: optional boolean indicating if this function is used wile learning model parameters """ # segment length L = model.L T = seq.T seg_features = {} l_segfeatures = {} for j in range(1, T+1): for d in range(L): if(j-d <= 0): break # start boundary u = j-d # end boundary v = j boundary = (u, v) # used in the case of model training if(learning): l_segfeatures[boundary] = self.extract_features_X(seq, boundary) seg_features[boundary] = self.flatten_segfeatures(l_segfeatures[boundary]) else: seg_features[boundary] = self.lookup_features_X(seq, boundary) return(seg_features, l_segfeatures)
######################################################## # functions used to represent continuous and categorical attributes ######################################################## def _represent_categorical_attr(self, attributes, feature_name): """function to represent categorical attributes """ # #print("attributes ",attributes) # #print("featurename ", feature_name) feature_val = '|'.join(attributes) # feature = '{}={}'.format(feature_name, feature_val) feature = feature_name + "=" + feature_val return({feature:1}) def _represent_continuous_attr(self, attributes, feature_name): """function to represent continuous attributes """ feature_val = sum(attributes) return({feature_name:feature_val})
[docs] def save(self, folder_dir): """store the templates used -- templateX and templateY""" save_info = {'FE_templateX': self.template_X, 'FE_templateY': self.template_Y } for name in save_info: ReaderWriter.dump_data(save_info[name], os.path.join(folder_dir, name))
[docs]class HOFeatureExtractor(FeatureExtractor): """Feature extractor class for higher order CRF models """ def __init__(self, templateX, templateY, attr_desc): super().__init__(templateX, templateY, attr_desc)
[docs]class FOFeatureExtractor(FeatureExtractor): r"""Feature extractor class for first order CRF models it supports the addition of start_state and **potentially** stop_state in the future release Args: templateX: dictionary specifying template to follow for observation features extraction. It has the form: ``{attr_name: {x_offset:tuple(y_offsets)}}`` e.g. ``{'w': {(0,):((0,), (-1,0), (-2,-1,0))}}`` templateY: dictionary specifying template to follow for y pattern features extraction. It has the form: ``{Y: tuple(y_offsets)}`` e.g. ``{'Y': ((0,), (-1,0), (-2,-1,0))}`` attr_desc: dictionary containing description and the encoding of the attributes/observations e.g. ``attr_desc['w'] = {'description':'the word/token','encoding':'categorical'}``. For more details/info check the :attr:`attr_desc` of the :class:`NERSegmentAttributeExtractor` start_state: boolean indicating if __START__ state is required in the model Attributes: templateX: dictionary specifying template to follow for observation features extraction. It has the form: ``{attr_name: {x_offset:tuple(y_offsets)}}`` e.g. ``{'w': {(0,):((0,), (-1,0), (-2,-1,0))}}`` templateY: dictionary specifying template to follow for y pattern features extraction. It has the form: ``{Y: tuple(y_offsets)}`` e.g. ``{'Y': ((0,), (-1,0), (-2,-1,0))}`` attr_desc: dictionary containing description and the encoding of the attributes/observations e.g. ``attr_desc['w'] = {'description':'the word/token','encoding':'categorical'}``. For more details/info check the :attr:`attr_desc` of the :class:`NERSegmentAttributeExtractor` start_state: boolean indicating if __START__ state is required in the model .. note:: The addition of this class is to add support for __START__ and potentially __STOP__ states """ def __init__(self, templateX, templateY, attr_desc, start_state = True): super().__init__(templateX, templateY, attr_desc) self.start_state = start_state def _validate_template(self, template): """validate passed template Args: template: a tuple comprising the order of y pattern (i.e. (-2,-1,0)) """ valid_offsets = {(0,), (-1,0)} if(template in valid_offsets): check = True else: check = False return(check)
[docs] def extract_features_Y(self, seq, boundary, templateY): """extract y pattern features for a given sequence and template Y Args: seq: a sequence instance of :class:`SequenceStruct` boundary: tuple (u,v) representing current boundary templateY: dictionary specifying template to follow for extraction. It has the form: ``{Y: tuple(y_offsets)}`` e.g. ``{'Y': ((0,), (-1,0)}`` """ template_Y = templateY['Y'] if(template_Y): Y = seq.Y y_sboundaries = seq.y_sboundaries y_boundpos_map = seq.y_boundpos_map curr_pos = y_boundpos_map[boundary] range_y = seq.y_range startstate_flag = self.start_state y_patt_features = {} feat_template = {} for offset_tup_y in template_Y: y_pattern = [] for offset_y in offset_tup_y: # offset_y should be always <= 0 pos = curr_pos + offset_y if(pos in range_y): b = y_sboundaries[pos] y_pattern.append(Y[b]) else: if(startstate_flag and pos == -1): y_pattern.append("__START__") else: y_pattern = [] break if(y_pattern): feat_template[offset_tup_y] = {"|".join(y_pattern):1} y_patt_features['Y'] = feat_template else: y_patt_features = {'Y':{}} return(y_patt_features)
[docs]class SeqsRepresenter(object): """Sequence representer class that prepares, pre-process and transform sequences for learning/decoding tasks Args: attr_extractor: instance of attribute extractor class such as :class:`NERSegmentAttributeExtractor` it is used to apply defined observation functions generating features for the observations fextractor: instance of feature extractor class such as :class:`FeatureExtractor` it is used to extract features from the observations and generated observation features using the observation functions Attributes: attr_extractor: instance of attribute extractor class such as :class:`NERSegmentAttributeExtractor` fextractor: instance of feature extractor class such as :class:`FeatureExtractor` attr_scaler: instance of scaler class :class:`AttributeScaler` it is used for scaling features that are continuous --not categorical (using standardization or rescaling) """ def __init__(self, attr_extractor, fextractor): self.attr_extractor = attr_extractor self.feature_extractor = fextractor self.attr_scaler = None @property def feature_extractor(self): return self._feature_extractor @feature_extractor.setter def feature_extractor(self, fextractor): # make a copy to preserve the template_X and template_Y used in the extractor self._feature_extractor = deepcopy(fextractor)
[docs] def prepare_seqs(self, seqs_dict, corpus_name, working_dir, unique_id=True, log_progress=True): r"""prepare sequences to be used in the CRF models Main task: - generate attributes (i.e. apply observation functions) on the sequence - create a directory for every sequence where we save the relevant data - create and return seqs_info dictionary comprising info about the prepared sequences Args: seqs_dict: dictionary containing sequences and corresponding ids where each sequence is an instance of the :class:`SequenceStruct` class corpus_name: string specifying the name of the corpus that will be used as corpus folder name working_dir: string representing the directory where the parsing and saving info on disk will occur unique_id: boolean indicating if the generated corpus folder will include a generated id Return: seqs_info (dictionary): dictionary comprising the the info about the prepared sequences Example:: seqs_info = {'seq_id':{'globalfeatures_dir':directory, 'T': length of sequence 'L': length of the longest segment } .... } """ attr_extractor = self.attr_extractor if(unique_id): corpus_folder = "{}_{}".format(corpus_name, generate_datetime_str()) else: corpus_folder = corpus_name target_dir = create_directory("global_features", create_directory(corpus_folder, working_dir)) seqs_info = {} start_time = datetime.now() for seq_id, seq in seqs_dict.items(): # boundaries of X generate segments of length equal 1 x_boundaries = seq.get_x_boundaries() # this will update the seg_attr of the sequence attr_extractor.generate_attributes(seq, x_boundaries) # create a folder for every sequence seq_dir = create_directory("seq_{}".format(seq_id), target_dir) ReaderWriter.dump_data(seq, os.path.join(seq_dir, "sequence"), mode = "wb") seqs_info[seq_id] = {'globalfeatures_dir': seq_dir, 'T':seq.T, 'L':seq.L} end_time = datetime.now() # log progress if(log_progress): log_file = os.path.join(target_dir, "log.txt") line = "---Preparing/parsing sequences--- starting time: {} \n".format(start_time) line += "Number of sequences prepared/parsed: {}\n".format(len(seqs_dict)) line += "Corpus directory of the parsed sequences is: {} \n".format(target_dir) line += "---Preparing/parsing sequences--- end time: {} \n".format(end_time) line += "\n \n" ReaderWriter.log_progress(line, log_file) return(seqs_info)
[docs] def preprocess_attributes(self, seqs_id, seqs_info, method = "rescaling"): r"""preprocess sequences by generating attributes for segments with L >1 Main task: - generate attributes (i.e. apply observation functions) on segments (i.e. L>1) - scale continuous attributes and building the relevant scaling info needed - create a directory for every sequence where we save the relevant data Args: seqs_id: list of sequence ids to be processed seqs_info: dictionary comprising the the info about the prepared sequences Keyword Arguments: method: string determining the type of scaling (if applicable) it supports {standardization, rescaling} """ attr_extractor = self.attr_extractor grouped_attr = attr_extractor.group_attributes() if(grouped_attr.get("continuous")): active_attr = list(self.feature_extractor.template_X.keys()) active_continuous_attr = [attr for attr in active_attr if attr in grouped_attr['continuous']] else: active_continuous_attr = {} attr_dict = {} seq_dir = None start_time = datetime.now() for seq_id in seqs_id: # length of longest entity in a sequence seq_L = seqs_info[seq_id]['L'] if(seq_L > 1 or active_continuous_attr): seq_dir = seqs_info[seq_id]["globalfeatures_dir"] seq = ReaderWriter.read_data(os.path.join(seq_dir, "sequence"), mode = "rb") y_boundaries = seq.y_sboundaries # generate attributes for segments if(seq_L>1): # this will update the value of the seg_attr of the sequence new_boundaries = attr_extractor.generate_attributes(seq, y_boundaries) # this condition might be redundant -- consider to remove and directly dump the sequence if(new_boundaries): ReaderWriter.dump_data(seq, os.path.join(seq_dir, "sequence"), mode = "wb") # gather stats for rescaling/standardizing continuous variables if(active_continuous_attr): for attr_name in active_continuous_attr: for y_boundary in y_boundaries: attr_val = seq.seg_attr[y_boundary][attr_name] if(attr_name in attr_dict): attr_dict[attr_name].append(attr_val) else: attr_dict[attr_name] = [attr_val] # generate attribute scaler object if(attr_dict): scaling_info = {} if(method == "rescaling"): for attr_name in attr_dict: scaling_info[attr_name] = {} scaling_info[attr_name]['max'] = numpy.max(attr_dict[attr_name]) scaling_info[attr_name]['min'] = numpy.min(attr_dict[attr_name]) elif(method == "standardization"): for attr_name in attr_dict: scaling_info[attr_name] = {} scaling_info[attr_name]['mean'] = numpy.mean(attr_dict[attr_name]) scaling_info[attr_name]['sd'] = numpy.std(attr_dict[attr_name]) attr_scaler = AttributeScaler(scaling_info, method) self.attr_scaler = attr_scaler # scale the attributes self.scale_attributes(seqs_id, seqs_info) end_time = datetime.now() # any sequence would lead to the parent directory of prepared/parsed sequences # using the last sequence id and corresponding sequence directory if(seq_dir): target_dir = os.path.dirname(seq_dir) log_file = os.path.join(target_dir, "log.txt") line = "---Rescaling continuous features--- starting time: {} \n".format(start_time) line += "Number of instances/training data processed: {}\n".format(len(seqs_id)) line += "---Rescaling continuous features--- end time: {} \n".format(end_time) line += "\n \n" ReaderWriter.log_progress(line, log_file)
[docs] def scale_attributes(self, seqs_id, seqs_info): """scale continuous attributes Args: seqs_id: list of sequence ids to be processed seqs_info: dictionary comprising the the info about the prepared sequences """ attr_scaler = self.attr_scaler if(attr_scaler): for seq_id in seqs_id: seq_dir = seqs_info[seq_id]["globalfeatures_dir"] seq = ReaderWriter.read_data(os.path.join(seq_dir, "sequence"), mode = "rb") boundaries = list(seq.seg_attr.keys()) attr_scaler.scale_continuous_attributes(seq, boundaries) ReaderWriter.dump_data(seq, os.path.join(seq_dir, "sequence"), mode = "wb")
[docs] def extract_seqs_globalfeatures(self, seqs_id, seqs_info, dump_gfeat_perboundary=False): r"""extract globalfeatures (i.e. F(X,Y)) from every sequence Main task: - parses each sequence and generates global feature :math:`F_j(X,Y) = \sum_{t=1}^{T}f_j(X,Y)` - for each sequence we obtain a set of generated global feature functions where each :math:`F_j(X,Y)` represents the sum of the value of its corresponding low-level/local feature function :math:`f_j(X,Y)` (i.e. :math:`F_j(X,Y) = \sum_{t=1}^{T+1} f_j(X,Y)`) - saves all the results on disk Args: seqs_id: list of sequence ids to be processed seqs_info: dictionary comprising the the info about the prepared sequences .. note:: it requires that the sequences have been already parsed and preprocessed (if applicable) """ feature_extractor = self.feature_extractor start_time = datetime.now() counter = 0 for seq_id in seqs_id: seq_dir = seqs_info[seq_id]["globalfeatures_dir"] seq = ReaderWriter.read_data(os.path.join(seq_dir, "sequence"), mode = "rb") # extract the sequence global features per boundary gfeatures_perboundary = feature_extractor.extract_seq_features_perboundary(seq) y_boundaries = seq.y_sboundaries # gfeatures has this format {'Y_patt':Counter(features)} gfeatures = feature_extractor.aggregate_seq_features(gfeatures_perboundary, y_boundaries) # store the features' sum (i.e. F_j(X,Y) for every sequence on disk) ReaderWriter.dump_data(gfeatures, os.path.join(seq_dir, "globalfeatures")) # case of perceptron/search based training with pruned beam if(dump_gfeat_perboundary): ReaderWriter.dump_data(gfeatures_perboundary, os.path.join(seq_dir, "globalfeatures_per_boundary")) counter+=1 print("dumping globalfeatures -- processed seqs: ", counter) end_time = datetime.now() # any sequence would lead to the parent directory of prepared/parsed sequences # using the last sequence id and corresponding sequence directory target_dir = os.path.dirname(seq_dir) log_file = os.path.join(target_dir, "log.txt") line = "---Generating Global Features F_j(X,Y)--- starting time: {} \n".format(start_time) line += "Number of instances/training data processed: {}\n".format(len(seqs_id)) line += "---Generating Global Features F_j(X,Y)--- end time: {} \n".format(end_time) line += "\n \n" ReaderWriter.log_progress(line, log_file)
[docs] def create_model(self, seqs_id, seqs_info, model_repr_class, filter_obj = None): r"""aggregate all identified features in the training sequences to build one model Main task: - use the sequences assigned in the training set to build the model - takes the union of the detected global feature functions :math:`F_j(X,Y)` for each chosen parsed sequence from the training set to form the set of model features - construct the tag set Y_set (i.e. possible tags assumed by y_t) using the chosen parsed sequences from the training data set - determine the longest segment length (if applicable) - apply feature filter (if applicable) Args: seqs_id: list of sequence ids to be processed seqs_info: dictionary comprising the the info about the prepared sequences model_repr_class: class name of model representation (i.e. class that has suffix `ModelRepresentation` such as :class:`HOCRFModelRepresentation`) Keyword Arguments: filter_obj: optional instance of :class:`FeatureFilter` class to apply filter .. note:: it requires that the sequences have been already parsed and global features were generated using :func:`extract_seqs_globalfeatures` """ Y_states = {} modelfeatures = {} # length of default entity in a segment L = 1 counter = 0 start_time = datetime.now() for seq_id in seqs_id: seq_dir = seqs_info[seq_id]["globalfeatures_dir"] # get the largest length of an entity in the segment seq_L = seqs_info[seq_id]['L'] if(seq_L > L): L = seq_L gfeatures = ReaderWriter.read_data(os.path.join(seq_dir, "globalfeatures")) # generate a global vector for the model for y_patt, featuresum in gfeatures.items(): if(y_patt in modelfeatures): modelfeatures[y_patt].update(featuresum) else: modelfeatures[y_patt] = featuresum # record all encountered states/labels parts = y_patt.split("|") for state in parts: Y_states[state] = 1 counter+=1 print("constructing model -- processed seqs: ", counter) # apply a filter if(filter_obj): # this will trim unwanted features from modelfeatures dictionary modelfeatures = filter_obj.apply_filter(modelfeatures) #^print("modelfeatures ", modelfeatures) # create model representation model = model_repr_class() model.setup_model(modelfeatures, Y_states, L) end_time = datetime.now() # any sequence would lead to the parent directory target_dir = os.path.dirname(seq_dir) # log progress log_file = os.path.join(target_dir, "log.txt") line = "---Constructing model--- starting time: {} \n".format(start_time) line += "Number of instances/training data processed: {}\n".format(len(seqs_id)) line += "Number of features: {} \n".format(model.num_features) line += "Number of labels: {} \n".format(model.num_states) line += "---Constructing model--- end time: {} \n".format(end_time) line += "\n \n" ReaderWriter.log_progress(line, log_file) return(model)
[docs] def extract_seqs_modelactivefeatures(self, seqs_id, seqs_info, model, output_foldername, learning=False): """identify for every sequence model active states and features Main task: - generate attributes for all segments with length 1 to maximum length defined in the model it is an optional step and only applied in case of having segmentation problems - generate segment features, potential activated states and a representation of segment features to be used potentially while learning - dump all info on disk Args: seqs_id: list of sequence ids to be processed seqs_info: dictionary comprising the the info about the prepared sequences model: an instance of model representation class (i.e. class that has suffix `ModelRepresentation` such as :class:`HOCRFModelRepresentation`) output_foldername: string representing the name of the root folder to be created for containing all saved info Keyword Arguments: learning: boolean indicating if this function used for the purpose of learning (model weights optimization) .. note:: seqs_info dictionary will be updated by including the directory of the saved generatd info """ # get the root_dir seq_id = seqs_id[0] seq_dir = seqs_info[seq_id]["globalfeatures_dir"] root_dir = os.path.dirname(os.path.dirname(seq_dir)) output_dir = create_directory("model_activefeatures_{}".format(output_foldername), root_dir) L = model.L f_extractor = self.feature_extractor counter = 0 start_time = datetime.now() for seq_id in seqs_id: counter += 1 # lookup active features for the current sequence and store them on disk print("identifying model active features -- processed seqs: ", counter) seq_dir = seqs_info[seq_id]["globalfeatures_dir"] seq = ReaderWriter.read_data(os.path.join(seq_dir, "sequence")) if(L > 1): self._lookup_seq_attributes(seq, L) ReaderWriter.dump_data(seq, os.path.join(seq_dir, "sequence"), mode = "wb") seg_features, l_segfeatures = f_extractor.lookup_seq_modelactivefeatures(seq, model, learning=learning) # dump model active features data activefeatures_dir = create_directory("seq_{}".format(seq_id), output_dir) seqs_info[seq_id]["activefeatures_dir"] = activefeatures_dir ReaderWriter.dump_data(seg_features, os.path.join(activefeatures_dir, "seg_features")) # to add condition regarding learning ReaderWriter.dump_data(l_segfeatures, os.path.join(activefeatures_dir, "l_segfeatures")) end_time = datetime.now() log_file = os.path.join(output_dir, "log.txt") line = "---Finding sequences' model active-features--- starting time: {} \n".format(start_time) line += "Total number of parsed sequences: {} \n".format(len(seqs_id)) line += "---Finding sequences' model active-features--- end time: {} \n".format(end_time) line += "\n \n" ReaderWriter.log_progress(line, log_file)
def _lookup_seq_attributes(self, seq, L): """generate the missing attributes if the segment length is greater than 1 Args: seq: a sequence instance of :class:`SequenceStruct` L: longest segment defined in the model .. note:: sequence :attr:`seg_attr` attribute might be update if L > 1 """ # attr_extractor = self.attr_extractor attr_scaler = self.attr_scaler T = seq.T for j in range(1, T+1): for d in range(L): if(j-d <= 0): break boundary = (j-d, j) if(boundary not in seq.seg_attr): # this will update the value of the seg_attr of the sequence attr_extractor.generate_attributes(seq, [boundary]) if(attr_scaler): attr_scaler.scale_continuous_attributes(seq, [boundary])
[docs] def get_seq_activatedstates(self, seq_id, seqs_info): """retrieve identified activated states that were saved on disk using `seqs_info` Args: seqs_id: list of sequence ids to be processed seqs_info: dictionary comprising the the info about the prepared sequences .. note:: this data was generated using :func:`extract_seqs_modelactivefeatures` """ seq_dir = seqs_info[seq_id]["activefeatures_dir"] activated_states = ReaderWriter.read_data(os.path.join(seq_dir,"activated_states")) return(activated_states)
[docs] def get_seq_segfeatures(self, seq_id, seqs_info): """retrieve segment features that were extracted and saved on disk using `seqs_info` Args: seqs_id: list of sequence ids to be processed seqs_info: dictionary comprising the the info about the prepared sequences .. note:: this data was generated using :func:`extract_seqs_modelactivefeatures` """ seq_dir = seqs_info[seq_id]["activefeatures_dir"] seg_features = ReaderWriter.read_data(os.path.join(seq_dir, "seg_features")) return(seg_features)
[docs] def get_seq_lsegfeatures(self, seq_id, seqs_info): """retrieve segment features that were extracted with a modified representation for the purpose of parameter learning Args: seqs_id: list of sequence ids to be processed seqs_info: dictionary comprising the the info about the prepared sequences .. note:: this data was generated using :func:`extract_seqs_modelactivefeatures` """ seq_dir = seqs_info[seq_id]["activefeatures_dir"] seg_features = ReaderWriter.read_data(os.path.join(seq_dir, "l_segfeatures")) return(seg_features)
[docs] def get_seq_activefeatures(self, seq_id, seqs_info): """retrieve sequence model active features that are identified Args: seqs_id: list of sequence ids to be processed seqs_info: dictionary comprising the the info about the prepared sequences """ seq_dir = seqs_info[seq_id]["activefeatures_dir"] try: activefeatures = ReaderWriter.read_data(os.path.join(seq_dir, "activefeatures")) except FileNotFoundError: # consider logging the error #print("activefeatures_per_boundary file does not exist yet !!") activefeatures = None finally: return(activefeatures)
[docs] def get_seq_globalfeatures(self, seq_id, seqs_info, per_boundary=True): r"""retrieves the global features available for a given sequence (i.e. :math:`F(X,Y)` for all :math:`j \in [1...J]` ) Args: seqs_id: list of sequence ids to be processed seqs_info: dictionary comprising the the info about the prepared sequences Keyword Arguments: per_boundary: boolean specifying if the global features representation should be per boundary or aggregated across the whole sequence """ seq_dir = seqs_info[seq_id]['globalfeatures_dir'] if(per_boundary): fname = "globalfeatures_per_boundary" else: fname = "globalfeatures_repr" try: exception_fired = False gfeatures = ReaderWriter.read_data(os.path.join(seq_dir, fname)) except FileNotFoundError: # read the saved globalfeatures on disk gfeatures = ReaderWriter.read_data(os.path.join(seq_dir, "globalfeatures")) exception_fired = True finally: return(gfeatures, exception_fired)
[docs] def aggregate_gfeatures(self, gfeatures, boundaries): """aggregate global features using specified list of boundaries Args: gfeatures: dictionary representing the extracted sequence features (i.e F(X, Y)) boundaries: list of boundaries to use for aggregating global features """ feature_extractor = self.feature_extractor # gfeatures is assumed to be represented by boundaries gfeatures = feature_extractor.aggregate_seq_features(gfeatures, boundaries) return(gfeatures)
[docs] def represent_gfeatures(self, gfeatures, model, boundaries=None): """represent extracted sequence global features two representation could be applied: - (1) features identified by boundary (i.e. f(X,Y)) - (2) features identified and aggregated across all positions in the sequence (i.e. F(X, Y)) Args: gfeatures: dictionary representing the extracted sequence features (i.e F(X, Y)) model: an instance of model representation class (i.e. class that has suffix ModelRepresentation such as :class:`HOCRFModelRepresentation`) Keyword Args: boundaries: if specified (i.e. list of boundaries), then the required representation is global features per boundary (i.e. option (1)) else (i.e. None or empty list), then the required representation is the aggregated global features (option(2)) """ feature_extractor = self.feature_extractor # if boundaries is specified, then gfeatures is assumed to be represented by boundaries if(boundaries): gfeatures = feature_extractor.aggregate_seq_features(gfeatures, boundaries) #^print("gfeatures ", gfeatures) windx_fval = model.represent_globalfeatures(gfeatures) return(windx_fval)
@staticmethod
[docs] def load_seq(seq_id, seqs_info): """load dumped sequences on disk Args: seqs_info: dictionary comprising the the info about the prepared sequences """ seq_dir = seqs_info[seq_id]["globalfeatures_dir"] seq = ReaderWriter.read_data(os.path.join(seq_dir, "sequence"), mode = "rb") return(seq)
[docs] def get_imposterseq_globalfeatures(self, seq_id, seqs_info, y_imposter, seg_other_symbol = None): """get an imposter decoded sequence Main task: - to be used for processing a sequence, generating global features and return back without storing/saving intermediary results on disk Args: seqs_id: list of sequence ids to be processed seqs_info: dictionary comprising the the info about the prepared sequences y_imposter: list of labels (y tags) decoded using a decoder Keyword Arguments: seg_other_symbol: in case of segmentation, this represents the non-entity symbol label used. Otherwise, it is None (default) which translates to be a sequence labeling problem. """ feature_extractor = self.feature_extractor attr_extractor = self.attr_extractor attr_scaler = self.attr_scaler ##print("seqs_info {}".format(seqs_info)) seq_dir = seqs_info[seq_id]["globalfeatures_dir"] ##print("seq_dir {}".format(seq_dir)) seq = ReaderWriter.read_data(os.path.join(seq_dir, "sequence"), mode = "rb") y_ref = list(seq.flat_y) # update seq.Y with the imposter Y seq.Y = (y_imposter, seg_other_symbol) y_imposter_boundaries = seq.y_sboundaries #^print("y_imposter_boundaries ", y_imposter_boundaries) # this will update the value of the seg_attr of the sequence new_boundaries = attr_extractor.generate_attributes(seq, y_imposter_boundaries) #^print("new_boundaries ", new_boundaries) if(new_boundaries): attr_scaler.scale_continuous_attributes(seq, new_boundaries) activefeatures_dir = seqs_info[seq_id]["activefeatures_dir"] l_segfeatures = ReaderWriter.read_data(os.path.join(activefeatures_dir, "l_segfeatures"), mode = "rb") imposter_gfeatures = feature_extractor.extract_seq_features_perboundary(seq, l_segfeatures) #^print("imposter_gfeatures ", imposter_gfeatures) # put back the original Y seq.Y = (y_ref, seg_other_symbol) if(new_boundaries): # write back the sequence on disk given the segment attributes have been updated ReaderWriter.dump_data(seq, os.path.join(seq_dir, "sequence"), mode = "rb") return(imposter_gfeatures, y_imposter_boundaries)
[docs] def save(self, folder_dir): """save essential info about feature extractor Args: folder_dir: string representing directory where files are pickled/dumped """ self.feature_extractor.save(folder_dir) if(self.attr_scaler): self.attr_scaler.save(folder_dir)
[docs]class FeatureFilter(object): r"""class for applying filters by y pattern or feature counts Args: filter_info: dictionary that contains type of filter to be applied Attributes: filter_info: dictionary that contains type of filter to be applied rel_func: dictionary of function map Example:: filter_info dictionary has three keys: - `filter_type` to define the type of filter either {count or pattern} - `filter_val` to define either the y pattern or threshold count - `filter_relation` to define how the filter should be applied *count filter*: - ``filter_info = {'filter_type': 'count', 'filter_val':5, 'filter_relation':'<'}`` this filter would delete all features that have count less than five *pattern filter*: - ``filter_info = {'filter_type': 'pattern', 'filter_val': {"O|L", "L|L"}, 'filter_relation':'in'}`` this filter would delete all features that have associated y pattern ["O|L", "L|L"] """ def __init__(self, filter_info): self.filter_info = filter_info self.rel_func = {"=":self._equal_rel, "<=":self._lequal_rel, "<":self._less_rel, ">=":self._gequal_rel, ">":self._greater_rel, "in":self._in_rel, "not in":self._notin_rel}
[docs] def apply_filter(self, featuresum_dict): """apply define filter on model features dictionary Args: featuresum_dict: dictoinary that represents model features similar to `modelfeatures` attribute in one of model representation instances """ filtered_dict = deepcopy(featuresum_dict) filter_info = self.filter_info rel_func = self.rel_func if(filter_info['filter_type'] == "count"): threshold = filter_info['filter_val'] relation = filter_info['filter_relation'] # filter binary/categorical features that have counts less than specified threshold for z in featuresum_dict: for fname, fsum in featuresum_dict[z].items(): # apply filtering only to binary/categorical features if the threshold is of type int if(type(threshold) == int and type(fsum) == int): rel_func[relation](fsum, threshold, filtered_dict[z], fname) elif(type(threshold)==float): # threshold is of type float -- apply to both categorical and continuous rel_func[relation](fsum, threshold, filtered_dict[z], fname) elif(filter_info['filter_type'] == "pattern"): filter_pattern = filter_info['filter_val'] relation = filter_info['filter_relation'] # filter based on specific patterns for z in featuresum_dict: #^print("z ", z) rel_func[relation](z, filter_pattern, filtered_dict) #^print("filtered_dict ", filtered_dict) return(filtered_dict)
@staticmethod def _equal_rel(x, y, f, z): if(x==y): del f[z] @staticmethod def _lequal_rel(x, y, f, z): if(x<=y): del f[z] @staticmethod def _less_rel(x, y, f, z): if(x<y): del f[z] @staticmethod def _gequal_rel(x, y, f, z): if(x>=y): del f[z] @staticmethod def _greater_rel(x, y, f, z): if(x>y): del f[z] @staticmethod def _in_rel(x, y, z): if(x in y): del z[x] @staticmethod def _notin_rel(x, y, z): if(x not in y): #^print("{} not in {}".format(x, y)) #^print("deleting ", z[x]) del z[x]
[docs]def main(): pass
if __name__ == "__main__": main()