'''
Base Module for Pipelines
'''
[docs]__author__ = 'Elisha Yadgaran'
from simpleml.constants import TRAIN_SPLIT
from simpleml.imports import Sequence
from simpleml.persistables.base_persistable import Persistable
from simpleml.save_patterns.decorators import ExternalArtifactDecorators
from simpleml.registries import PipelineRegistry
from simpleml.persistables.sqlalchemy_types import GUID, MutableJSON
from simpleml.pipelines.external_pipelines import DefaultPipeline, SklearnPipeline
from simpleml.pipelines.validation_split_mixins import Split
from simpleml.utils.errors import PipelineError
from sqlalchemy import Column, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import relationship
from future.utils import with_metaclass
import logging
import numpy as np
import pandas as pd
[docs]LOGGER = logging.getLogger(__name__)
[docs]@ExternalArtifactDecorators.register_artifact(
artifact_name='pipeline', save_attribute='external_pipeline', restore_attribute='_external_file')
class AbstractPipeline(with_metaclass(PipelineRegistry, Persistable)):
'''
Abstract Base class for all Pipelines objects.
Relies on mixin classes to define the split_dataset method. Will throw
an error on use otherwise
-------
Schema
-------
params: pipeline parameter metadata for easy insight into hyperparameters across trainings
'''
# Additional pipeline specific metadata
[docs] params = Column(MutableJSON, default={})
[docs] object_type = 'PIPELINE'
def __init__(self, has_external_files=True, transformers=None,
external_pipeline_class='default', fitted=False,
**kwargs):
# If no save patterns are set, specify a default for disk_pickled
if 'save_patterns' not in kwargs:
kwargs['save_patterns'] = {'pipeline': ['disk_pickled']}
super(AbstractPipeline, self).__init__(
has_external_files=has_external_files, **kwargs)
# Instantiate pipeline
if transformers is None:
transformers = []
self.config['external_pipeline_class'] = external_pipeline_class
self._external_file = self._create_external_pipeline(
external_pipeline_class, transformers, **kwargs)
# Initialize fit state -- pass as true to skip fitting transformers
self.fitted = fitted
@property
[docs] def fitted(self):
return self.state.get('fitted')
@fitted.setter
def fitted(self, value):
self.state['fitted'] = value
@property
[docs] def external_pipeline(self):
'''
All pipeline objects are going to require some filebase persisted object
Wrapper around whatever underlying class is desired
(eg sklearn or native)
'''
self.load_if_unloaded('pipeline')
return self._external_file
[docs] def _create_external_pipeline(self, external_pipeline_class, transformers,
**kwargs):
'''
should return the desired pipeline object
:param external_pipeline_class: str of class to use, can be 'default' or 'sklearn'
'''
if external_pipeline_class == 'default':
return DefaultPipeline(transformers)
elif external_pipeline_class == 'sklearn':
return SklearnPipeline(
transformers,
# Only supported sklearn params
**{k: v for k, v in kwargs.items() if k in ('memory', 'verbose')}
)
else:
raise NotImplementedError('Only default or sklearn pipelines supported')
[docs] def add_dataset(self, dataset):
'''
Setter method for dataset used
'''
self.dataset = dataset
[docs] def assert_dataset(self, msg=''):
'''
Helper method to raise an error if dataset isn't present
'''
if self.dataset is None:
raise PipelineError(msg)
[docs] def assert_fitted(self, msg=''):
'''
Helper method to raise an error if pipeline isn't fit
'''
if not self.fitted:
raise PipelineError(msg)
[docs] def _hash(self):
'''
Hash is the combination of the:
1) Dataset
2) Transformers
3) Transformer Params
4) Pipeline Config
'''
dataset_hash = self.dataset.hash_ or self.dataset._hash()
transformers = self.get_transformers()
transformer_params = self.get_params(params_only=True)
pipeline_config = self.config
return self.custom_hasher((dataset_hash, transformers, transformer_params, pipeline_config))
[docs] def save(self, **kwargs):
'''
Extend parent function with a few additional save routines
1) save params
2) save transformer metadata
3) features
'''
self.assert_dataset('Must set dataset before saving')
self.assert_fitted('Must fit pipeline before saving')
self.params = self.get_params(params_only=True, **kwargs)
self.metadata_['transformers'] = self.get_transformers()
self.metadata_['feature_names'] = self.get_feature_names()
# Skip file-based persistence if there are no transformers
if not self.get_transformers():
self.has_external_files = False
super(AbstractPipeline, self).save(**kwargs)
# Sqlalchemy updates relationship references after save so reload class
self.dataset.load(load_externals=False)
[docs] def load(self, **kwargs):
'''
Extend main load routine to load relationship class
'''
super(AbstractPipeline, self).load(**kwargs)
# Create dummy pipeline if one wasnt saved
if not self.has_external_files:
self._external_file = self._create_external_pipeline(
self.config['external_pipeline_class'], [], **self.params)
# By default dont load data unless it actually gets used
self.dataset.load(load_externals=False)
[docs] def get_dataset_split(self, split=None, return_generator=False, return_sequence=False, **kwargs):
'''
Get specific dataset split
Assumes a Split object (`simpleml.pipelines.validation_split_mixins.Split`)
is returned. Inherit or implement similar expected attributes to replace
Uses internal `self._dataset_splits` as the split container - assumes
dictionary like itemgetter
'''
if not hasattr(self, '_dataset_splits') or self._dataset_splits is None:
self.split_dataset()
if return_sequence: # Use keras thread safe sequence
return self._iterate_split_using_sequence(self._dataset_splits[split], **kwargs)
if return_generator: # Vanilla generator form
return self._iterate_split(self._dataset_splits[split], **kwargs)
return self._dataset_splits[split]
[docs] def _iterate_split(self, split, infinite_loop=False, batch_size=32, shuffle=True, **kwargs):
'''
Turn a dataset split into a generator
'''
X = split.X
y = split.y
dataset_size = X.shape[0]
if dataset_size == 0: # Return None
return
# Extract indices to subsample from
if isinstance(X, pd.DataFrame):
indices = X.index.tolist()
elif isinstance(X, np.ndarray):
indices = np.arange(X.shape[0])
else:
raise NotImplementedError
# Loop through and sample indefinitely
first_run = True
current_index = 0
while True:
if current_index == 0 and shuffle and not first_run:
np.random.shuffle(indices)
batch = indices[current_index:min(current_index + batch_size, dataset_size)]
if y is not None and (isinstance(y, (pd.DataFrame, pd.Series)) and not y.empty): # Supervised
if isinstance(X, (pd.DataFrame, pd.Series)):
yield Split(X=X.loc[batch], y=np.stack(y.loc[batch].squeeze().values))
else:
yield Split(X=X[batch], y=y[batch])
else: # Unsupervised
if isinstance(X, (pd.DataFrame, pd.Series)):
yield Split(X=X.loc[batch])
else:
yield Split(X=X[batch])
current_index += batch_size
# Loop so that infinite batches can be generated
if current_index >= dataset_size:
if infinite_loop:
current_index = 0
first_run = False
else:
break
[docs] def _iterate_split_using_sequence(self, split, batch_size=32, shuffle=True, **kwargs):
'''
Different version of iterate split that uses a keras.utils.sequence object
to play nice with keras and enable thread safe generation.
'''
return DatasetSequence(split, batch_size, shuffle, **kwargs)
[docs] def X(self, split=None):
'''
Get X for specific dataset split
'''
return self.get_dataset_split(split=split).X
[docs] def y(self, split=None):
'''
Get labels for specific dataset split
'''
return self.get_dataset_split(split=split).y
[docs] def fit(self):
'''
Pass through method to external pipeline
'''
self.assert_dataset('Must set dataset before fitting')
if self.fitted:
LOGGER.warning('Cannot refit pipeline, skipping operation')
return self
# Only use default (train) fold to fit
# No constraint on split -- can be a dataframe, ndarray, or generator
# but must be encased in a Split object
# Explicitly prevent generator fit for pipelines
split = self.get_dataset_split(split=TRAIN_SPLIT, return_generator=False)
self.external_pipeline.fit(**split)
self.fitted = True
return self
'''
Pass-through methods to external pipeline
'''
[docs] def get_params(self, **kwargs):
'''
Pass through method to external pipeline
'''
return self.external_pipeline.get_params(**kwargs)
[docs] def set_params(self, **params):
'''
Pass through method to external pipeline
'''
return self.external_pipeline.set_params(**params)
[docs] def get_feature_names(self):
'''
Pass through method to external pipeline
Should return a list of the final features generated by this pipeline
'''
initial_features = self.dataset.get_feature_names()
return self.external_pipeline.get_feature_names(feature_names=initial_features)
[docs]class Pipeline(AbstractPipeline):
'''
Base class for all Pipeline objects.
-------
Schema
-------
dataset_id: foreign key relation to the dataset used as input
'''
[docs] __tablename__ = 'pipelines'
[docs] dataset_id = Column(GUID, ForeignKey("datasets.id", name="pipelines_dataset_id_fkey"))
[docs] dataset = relationship("Dataset", enable_typechecks=False, foreign_keys=[dataset_id])
[docs] __table_args__ = (
# Unique constraint for versioning
UniqueConstraint('name', 'version', name='pipeline_name_version_unique'),
# Index for searching through friendly names
Index('pipeline_name_index', 'name'),
)
[docs]class DatasetSequence(Sequence):
'''
Sequence wrapper for internal datasets. Only used for raw data mapping so
return type is internal `Split` object. Transformed sequences are used to
conform with external input types (keras tuples)
'''
def __init__(self, split, batch_size, shuffle):
self.X = self.validated_split(split.X)
self.y = self.validated_split(split.y)
self.dataset_size = self.X.shape[0]
if self.dataset_size == 0: # Return None
raise ValueError('Attempting to create sequence with no data')
# Extract indices to subsample from
if isinstance(self.X, pd.DataFrame):
self.indices = self.X.index.tolist()
elif isinstance(self.X, np.ndarray):
self.indices = np.arange(self.X.shape[0])
else:
raise NotImplementedError
self.batch_size = batch_size
self.shuffle = shuffle
@staticmethod
[docs] def validated_split(split):
'''
Confirms data is valid, otherwise returns None (makes downstream checking
simpler)
'''
if split is None:
return None
elif isinstance(split, (pd.DataFrame, pd.Series)) and split.empty:
return None
return split
[docs] def __getitem__(self, index):
"""Gets batch at position `index`.
# Arguments
index: position of the batch in the Sequence.
# Returns
A batch
"""
current_index = index * self.batch_size # list index of batch start
batch = self.indices[current_index:min(current_index + self.batch_size, self.dataset_size)]
if self.y is not None: # Supervised
if isinstance(self.X, (pd.DataFrame, pd.Series)):
return Split(X=self.X.loc[batch], y=np.stack(self.y.loc[batch].squeeze().values))
else:
return Split(X=self.X[batch], y=self.y[batch])
else: # Unsupervised
if isinstance(self.X, (pd.DataFrame, pd.Series)):
return Split(X=self.X.loc[batch])
else:
return Split(X=self.X[batch])
[docs] def __len__(self):
"""Number of batch in the Sequence.
# Returns
The number of batches in the Sequence.
"""
return int(np.ceil(len(self.X) / float(self.batch_size)))
[docs] def on_epoch_end(self):
"""Method called at the end of every epoch.
"""
if self.shuffle:
np.random.shuffle(self.indices)