Source code for simpleml.pipelines.validation_split_mixins

"""
Module for different pipeline split methods for cross validation

    1) No Split -- Just use all the data - hardcoded as the default for all pipelines
    2) Explicit Split -- dataset class defines the split
    3) Percentage -- random split support for train, validation, test
    4) Chronological -- time based split support for train, validation, test
    5) KFold
"""

[docs]__author__ = "Elisha Yadgaran"
from abc import ABCMeta, abstractmethod from typing import Dict, List, Optional, Union import pandas as pd from future.utils import with_metaclass from sklearn.model_selection import train_test_split from simpleml.constants import TEST_SPLIT, TRAIN_SPLIT, VALIDATION_SPLIT from simpleml.datasets.dataset_splits import Split, SplitContainer from simpleml.imports import ddDataFrame, ddSeries from .projected_splits import ( IdentityProjectedDatasetSplit, IndexBasedProjectedDatasetSplit, )
[docs]class SplitMixin(with_metaclass(ABCMeta, object)): @abstractmethod
[docs] def split_dataset(self):
""" Set the split criteria Must set self._dataset_splits """
[docs] def containerize_split(self, split_dict: Dict[str, Split]) -> SplitContainer: return SplitContainer(**split_dict)
[docs]class ExplicitSplitMixin(SplitMixin):
[docs] def split_dataset(self) -> None: """ Method to split the dataframe into different sets. Assumes dataset explicitly delineates between different splits Passes forward dataset split names so uniquely named splits will propagate and can be referenced the same way """ self._dataset_splits = self.containerize_split( { split_name: IdentityProjectedDatasetSplit( dataset=self.dataset, split=split_name ) for split_name in self.dataset.get_split_names()
} )
[docs]class RandomSplitMixin(SplitMixin): """ Class to randomly split dataset into different sets **Redefines splits so custom named splits in dataset cannot be referenced by the same names. Only TRAIN/TEST/VALIDATION** """ def __init__( self, train_size: Union[float, int], test_size: Optional[Union[float, int]] = None, validation_size: Union[float, int] = 0.0, random_state: int = 123, shuffle: bool = True, **kwargs, ): """ Set splitting params: By default validation is 0.0 because it is only used for hyperparameter tuning """ super(RandomSplitMixin, self).__init__(**kwargs) # Pipeline Params self.config.update( { "train_size": train_size, "validation_size": validation_size, "test_size": test_size, "random_state": random_state, "shuffle": shuffle, } ) @staticmethod
[docs] def get_index(data) -> List[int]: """ Helper to extract the index from a dataset. Generates a range index if none exists """ if isinstance(data, (pd.DataFrame, pd.Series)): return data.index.tolist() elif isinstance(data, (ddDataFrame, ddSeries)): return data.index.compute().tolist() else: # no named index, use a linear range return list(range(len(data)))
[docs] def split_dataset(self) -> None: """ Overwrite method to split by percentage """ train_size = self.config.get("train_size") validation_size = self.config.get("validation_size") test_size = self.config.get("test_size") random_state = self.config.get("random_state") shuffle = self.config.get("shuffle") # Sklearn's train test split can only accomodate one split per iteration # find the indices that match to each split # use the X split section index = self.get_index(self.dataset.X) # Convert all sizes to counts to make typing consistent def rate_to_count(i): if isinstance(i, float) and i <= 1.0 and i >= 0.0: return round(i * len(index)) return i train_size: int = rate_to_count(train_size) validation_size: int = rate_to_count(validation_size) test_size: int = rate_to_count(test_size) if train_size is None: # everything not already allocated is attributed to train train_size = len(index) - (validation_size or 0) - (test_size or 0) if test_size is None: # remaining unallocated is attributed to test test_size = len(index) - (validation_size or 0) - train_size if validation_size is None: validation_size = len(index) - train_size - test_size if test_size == 0: # No split necessary test_indices = [] remaining_indices = index elif test_size == len(index): test_indices = index remaining_indices = [] else: remaining_indices, test_indices = train_test_split( index, test_size=test_size, random_state=random_state, shuffle=shuffle ) if validation_size == 0: # No split necessary train_indices = remaining_indices validation_indices = [] elif validation_size == len(remaining_indices): validation_indices = remaining_indices train_indices = [] else: train_indices, validation_indices = train_test_split( remaining_indices, test_size=validation_size, random_state=random_state, shuffle=shuffle, ) self._dataset_splits = self.containerize_split( { TRAIN_SPLIT: IndexBasedProjectedDatasetSplit( dataset=self.dataset, split=None, indices=train_indices ), VALIDATION_SPLIT: IndexBasedProjectedDatasetSplit( dataset=self.dataset, split=None, indices=validation_indices ), TEST_SPLIT: IndexBasedProjectedDatasetSplit( dataset=self.dataset, split=None, indices=test_indices
), } )
[docs]class ChronologicalSplitMixin(SplitMixin): def __init__(self, **kwargs): super(ChronologicalSplitMixin, self).__init__(**kwargs)
[docs]class KFoldSplitMixin(SplitMixin): """ TBD on how to implement this. KFold requires K models and unique datasets so may be easier to wrap a parallelized implementation that internally creates K new Pipeline and Model objects """ pass