'''
Base Module for Pipelines
'''
__author__ = 'Elisha Yadgaran'
from simpleml import TRAIN_SPLIT
from simpleml.persistables.base_persistable import Persistable
from simpleml.persistables.saving import AllSaveMixin
from simpleml.persistables.meta_registry import PipelineRegistry
from simpleml.persistables.guid import GUID
from simpleml.pipelines.external_pipelines import DefaultPipeline, SklearnPipeline
from simpleml.utils.errors import PipelineError
from sqlalchemy import Column, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import JSONB
from future.utils import with_metaclass
import logging
import numpy as np
import pandas as pd
LOGGER = logging.getLogger(__name__)
[docs]class AbstractPipeline(with_metaclass(PipelineRegistry, Persistable, AllSaveMixin)):
'''
Abstract Base class for all Pipelines objects.
Relies on mixin classes to define the split_dataset method. Will throw
an error on use otherwise
-------
Schema
-------
params: pipeline parameter metadata for easy insight into hyperparameters across trainings
'''
__abstract__ = True
# Additional pipeline specific metadata
params = Column(JSONB, default={})
object_type = 'PIPELINE'
def __init__(self, has_external_files=True, transformers=[],
external_pipeline_class='default', fitted=False,
**kwargs):
super(AbstractPipeline, self).__init__(
has_external_files=has_external_files, **kwargs)
# Instantiate pipeline
self.config['external_pipeline_class'] = external_pipeline_class
self._external_file = self._create_external_pipeline(
external_pipeline_class, transformers, **kwargs)
# Initialize fit state -- pass as true to skip fitting transformers
self.fitted = fitted
@property
def fitted(self):
return self.state.get('fitted')
@fitted.setter
def fitted(self, value):
self.state['fitted'] = value
@property
def external_pipeline(self):
'''
All pipeline objects are going to require some filebase persisted object
Wrapper around whatever underlying class is desired
(eg sklearn or native)
'''
if self.unloaded_externals:
self._load_external_files()
return self._external_file
def _create_external_pipeline(self, external_pipeline_class, transformers,
**kwargs):
'''
should return the desired pipeline object
:param external_pipeline_class: str of class to use, can be 'default' or 'sklearn'
'''
if external_pipeline_class == 'default':
return DefaultPipeline(transformers)
elif external_pipeline_class == 'sklearn':
return SklearnPipeline(transformers, **kwargs)
else:
raise NotImplementedError('Only default or sklearn pipelines supported')
[docs] def add_dataset(self, dataset):
'''
Setter method for dataset used
'''
self.dataset = dataset
[docs] def assert_dataset(self, msg=''):
'''
Helper method to raise an error if dataset isn't present
'''
if self.dataset is None:
raise PipelineError(msg)
[docs] def assert_fitted(self, msg=''):
'''
Helper method to raise an error if pipeline isn't fit
'''
if not self.fitted:
raise PipelineError(msg)
def _hash(self):
'''
Hash is the combination of the:
1) Dataset
2) Transformers
3) Transformer Params
4) Pipeline Config
'''
dataset_hash = self.dataset.hash_ or self.dataset._hash()
transformers = self.get_transformers()
transformer_params = self.get_params()
pipeline_config = self.config
return self.custom_hasher((dataset_hash, transformers, transformer_params, pipeline_config))
[docs] def save(self, **kwargs):
'''
Extend parent function with a few additional save routines
1) save params
2) save transformer metadata
3) features
'''
self.assert_dataset('Must set dataset before saving')
self.assert_fitted('Must fit pipeline before saving')
self.params = self.get_params(**kwargs)
self.metadata_['transformers'] = self.get_transformers()
self.metadata_['feature_names'] = self.get_feature_names()
# Skip file-based persistence if there are no transformers
if not self.get_transformers():
self.has_external_files = False
super(AbstractPipeline, self).save(**kwargs)
# Sqlalchemy updates relationship references after save so reload class
self.dataset.load(load_externals=False)
[docs] def load(self, **kwargs):
'''
Extend main load routine to load relationship class
'''
super(AbstractPipeline, self).load(**kwargs)
# Create dummy pipeline if one wasnt saved
if not self.has_external_files:
self._external_file = self._create_external_pipeline(
self.config['external_pipeline_class'], [], **self.params)
# By default dont load data unless it actually gets used
self.dataset.load(load_externals=False)
[docs] def get_dataset_split(self, split=None):
'''
Get specific dataset split
By default no constraint imposed, but convention is that return should
be a tuple of (X, y)
'''
if split is None:
split = TRAIN_SPLIT
if not hasattr(self, '_dataset_splits') or self._dataset_splits is None:
self.split_dataset()
return self._dataset_splits.get(split)
[docs] def fit(self, **kwargs):
'''
Pass through method to external pipeline
'''
self.assert_dataset('Must set dataset before fitting')
if self.fitted:
LOGGER.warning('Cannot refit pipeline, skipping operation')
return self
# Only use train fold to fit
X, y = self.get_dataset_split(TRAIN_SPLIT)
self.external_pipeline.fit(X, y, **kwargs)
self.fitted = True
return self
[docs] def get_params(self, **kwargs):
'''
Pass through method to external pipeline
'''
return self.external_pipeline.get_params(**kwargs)
[docs] def set_params(self, **params):
'''
Pass through method to external pipeline
'''
return self.external_pipeline.set_params(**params)
[docs] def get_feature_names(self):
'''
Pass through method to external pipeline
Should return a list of the final features generated by this pipeline
'''
initial_features = self.dataset.get_feature_names()
return self.external_pipeline.get_feature_names(feature_names=initial_features)
[docs]class Pipeline(AbstractPipeline):
'''
Base class for all Pipeline objects.
-------
Schema
-------
dataset_id: foreign key relation to the dataset used as input
'''
__tablename__ = 'pipelines'
dataset_id = Column(GUID, ForeignKey("datasets.id", name="pipelines_dataset_id_fkey"))
dataset = relationship("Dataset", enable_typechecks=False, foreign_keys=[dataset_id])
__table_args__ = (
# Unique constraint for versioning
UniqueConstraint('name', 'version', name='pipeline_name_version_unique'),
# Index for searching through friendly names
Index('pipeline_name_index', 'name'),
)
[docs]class GeneratorPipeline(Pipeline):
'''
Generator form of pipeline. Overwrites standard methods with ones that
return generator objects
'''
[docs] def get_dataset_split(self, split=None, infinite_loop=False, batch_size=32, shuffle=True, **kwargs):
'''
Get specific dataset split
'''
if split is None:
split = TRAIN_SPLIT
if not hasattr(self, '_dataset_splits') or self._dataset_splits is None:
self.split_dataset()
# Data generators are formatted for keras models
X, y = self._dataset_splits.get(split)
dataset_size = X.shape[0]
if isinstance(X, pd.DataFrame):
indices = X.index.tolist()
elif isinstance(X, np.ndarray):
indices = np.arange(X.shape[0])
else:
raise NotImplementedError
if dataset_size == 0: # Return None
return
first_run = True
current_index = 0
while True:
if current_index == 0 and shuffle and not first_run:
np.random.shuffle(indices)
batch = indices[current_index:min(current_index + batch_size, dataset_size)]
if y is not None and (isinstance(y, (pd.DataFrame, pd.Series)) and not y.empty):
if isinstance(X, (pd.DataFrame, pd.Series)):
yield X.loc[batch], np.stack(y.loc[batch].squeeze().values)
else:
yield X[batch], y[batch]
else:
if isinstance(X, (pd.DataFrame, pd.Series)):
yield X.loc[batch], None
else:
yield X[batch], None
current_index += batch_size
# Loop so that infinite batches can be generated
if current_index >= dataset_size:
if infinite_loop:
current_index = 0
first_run = False
else:
break
[docs] def fit(self, **kwargs):
'''
Pass through method to external pipeline
Assumes underlying pipeline can make use of a generator to fit
'''
if self.dataset is None:
raise PipelineError('Must set dataset before fitting')
if self.state['fitted']:
LOGGER.warning('Cannot refit pipeline, skipping operation')
return self
# Only use train fold to fit
generator = self.get_dataset_split(TRAIN_SPLIT)
self.external_pipeline.fit(generator, **kwargs)
self.state['fitted'] = True
return self