Source code for skfolio.model_selection._combinatorial

"""Combinatorial module"""

# Copyright (c) 2023
# Author: Hugo Delatte <delatte.hugo@gmail.com>
# License: BSD 3 clause
# Implementation derived from:
# scikit-portfolio, Copyright (c) 2022, Carlo Nicolini, Licensed under MIT Licence.
# scikit-learn, Copyright (c) 2007-2010 David Cournapeau, Fabian Pedregosa, Olivier
# Grisel Licensed under BSD 3 clause.

import itertools
import math
import numbers
from abc import ABC, abstractmethod
from collections.abc import Iterator

import numpy as np
import numpy.typing as npt
import pandas as pd
import plotly.graph_objects as go
import sklearn.model_selection as sks
import sklearn.utils as sku

import skfolio.typing as skt


[docs] class BaseCombinatorialCV(ABC): """Base class for all combinatorial cross-validators. Implementations must define `split` or `get_path_ids`. """ @abstractmethod def split(self, X: npt.ArrayLike, y=None) -> tuple[np.ndarray, list[np.ndarray]]: pass
[docs] @abstractmethod def get_path_ids(self) -> np.ndarray: """Return the path id of each test sets in each split""" pass
__repr__ = sks.BaseCrossValidator.__repr__
# TODO: review params and function naming
[docs] class CombinatorialPurgedCV(BaseCombinatorialCV): """Combinatorial Purged Cross-Validation. Provides train/test indices to split time series data samples based on Combinatorial Purged Cross-Validation [1]_. Compared to `KFold`, which splits the data into `k` folds with `1` fold for the test set and `k - 1` folds for the training set, `CombinatorialPurgedCV` uses `k - p` folds for the training set with `p > 1` being the number of test folds. `KFold` can recombine one single testing path while `CombinatorialPurgedCV` can recombine multiple testing paths from the combinations of the train/test sets. To avoid data leakage, purging and embargoing can be performed. Purging consist of removing from the training set all observations whose labels overlapped in time with those labels included in the testing set. Embargoing consist of removing from the training set all observations that immediately follow an observation in the testing set, since financial features often incorporate series that exhibit serial correlation (like ARMA processes). Parameters ---------- n_folds : int, default=10 Number of folds. Must be at least 3. n_test_folds : int, default=8 Number of test folds. Must be at least 2. For only one test fold, use `sklearn.model_validation.KFold`. purged_size : int, default=0 Number of observations to exclude from the start of each train set that are after a test set **and** the number of observations to exclude from the end of each training set that are before a test set. embargo_size : int, default=0 Number of observations to exclude from the start of each training set that are after a test set. Attributes ---------- index_train_test_ : ndarray of shape (n_observations, n_splits) Examples -------- >>> import numpy as np >>> from skfolio.model_selection import CombinatorialPurgedCV >>> X = np.random.randn(12, 2) >>> cv = CombinatorialPurgedCV(n_folds=3, n_test_folds=2) >>> for i, (train_index, tests) in enumerate(cv.split(X)): ... print(f"Split {i}:") ... print(f" Train: index={train_index}") ... for j, test_index in enumerate(tests): ... print(f" Test {j}: index={test_index}") Split 0: Train: index=[ 8 9 10 11] Test 0: index=[0 1 2 3] Test 1: index=[4 5 6 7] Split 1: Train: index=[4 5 6 7] Test 0: index=[0 1 2 3] Test 1: index=[ 8 9 10 11] Split 2: Train: index=[0 1 2 3] Test 0: index=[4 5 6 7] Test 1: index=[ 8 9 10 11] >>> cv = CombinatorialPurgedCV(n_folds=3, n_test_folds=2, purged_size=1) >>> for i, (train_index, tests) in enumerate(cv.split(X)): ... print(f"Split {i}:") ... print(f" Train: index={train_index}") ... for j, test_index in enumerate(tests): ... print(f" Test {j}: index={test_index}") Split 0: Train: index=[ 9 10 11] Test 0: index=[0 1 2 3] Test 1: index=[4 5 6 7] Split 1: Train: index=[5 6] Test 0: index=[0 1 2 3] Test 1: index=[ 8 9 10 11] Split 2: Train: index=[0 1 2] Test 0: index=[4 5 6 7] Test 1: index=[ 8 9 10 11] >>> cv = CombinatorialPurgedCV(n_folds=3, n_test_folds=2, embargo_size=1) >>> for i, (train_index, tests) in enumerate(cv.split(X)): ... print(f"Split {i}:") ... print(f" Train: index={train_index}") ... for j, test_index in enumerate(tests): ... print(f" Test {j}: index={test_index}") Split 0: Train: index=[ 9 10 11] Test 0: index=[0 1 2 3] Test 1: index=[4 5 6 7] Split 1: Train: index=[5 6 7] Test 0: index=[0 1 2 3] Test 1: index=[ 8 9 10 11] Split 2: Train: index=[0 1 2 3] Test 0: index=[4 5 6 7] Test 1: index=[ 8 9 10 11] References ---------- .. [1] "Advances in Financial Machine Learning", Marcos López de Prado (2018) """ index_train_test_: np.ndarray def __init__( self, n_folds: int = 10, n_test_folds: int = 8, purged_size: int = 0, embargo_size: int = 0, ): if not isinstance(n_folds, numbers.Integral): raise ValueError( "The number of folds must be of Integral type. " f"{n_folds} of type {type(n_folds)} was passed." ) n_folds = int(n_folds) if n_folds <= 2: raise ValueError(f"`n_folds` must be at least 3`, got `n_folds={n_folds}`.") if n_test_folds <= 1: raise ValueError( f"`n_test_folds` must at least 2, got `n_test_folds={n_test_folds}`." ) if n_test_folds >= n_folds: raise ValueError( "Combinatorial purged cross-validation requires `n_folds` " "to be greater than `n_test_folds`." ) if purged_size < 0: raise ValueError("`purged_size` cannot be negative") if embargo_size < 0: raise ValueError("`embargo_size` cannot be negative") self.n_folds = n_folds self.n_test_folds = n_test_folds self.purged_size = purged_size self.embargo_size = embargo_size @property def n_splits(self) -> int: """Number of splits""" return _n_splits(n_folds=self.n_folds, n_test_folds=self.n_test_folds) @property def n_test_paths(self) -> int: """Number of test paths that can be reconstructed from the train/test combinations""" return _n_test_paths(n_folds=self.n_folds, n_test_folds=self.n_test_folds) @property def test_set_index(self) -> np.ndarray: """Location of each test set""" return np.array( list(itertools.combinations(np.arange(self.n_folds), self.n_test_folds)) ).reshape(-1, self.n_test_folds) @property def binary_train_test_sets(self) -> np.ndarray: """Identify training and test folds for each combinations by assigning `0` to training folds and `1` to test folds""" folds_train_test = np.zeros((self.n_folds, self.n_splits)) folds_train_test[ self.test_set_index, np.arange(self.n_splits)[:, np.newaxis] ] = 1 return folds_train_test @property def recombined_paths(self) -> np.ndarray: """Recombine each test path by returning the test set location in each split.""" return np.argwhere(self.binary_train_test_sets == 1)[:, 1].reshape( self.n_folds, -1 )
[docs] def get_path_ids(self) -> np.ndarray: """Return the path id of each test sets in each split""" recombine_paths = self.recombined_paths path_ids = np.zeros((self.n_splits, self.n_test_folds), dtype=int) for i in range(self.n_splits): for j in range(self.n_test_folds): path_ids[i, j] = np.argwhere(recombine_paths == i)[j][1] return path_ids
[docs] def split( self, X: npt.ArrayLike, y=None, groups=None ) -> Iterator[tuple[np.ndarray, list[np.ndarray]]]: """Generate indices to split data into training and test set. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data, where `n_samples` is the number of samples and `n_features` is the number of features. y : array-like of shape (n_samples,), optional The (multi-)target variable groups : array-like of shape (n_samples,), optional Group labels for the samples used while splitting the dataset into train/test set. Yields ------ train : ndarray The training set indices for that split. test : ndarray The testing set indices for that split. """ test_set_index = self.test_set_index recombine_paths = self.recombined_paths X, y = sku.indexable(X, y) n_samples = X.shape[0] min_fold_size = n_samples // self.n_folds if self.purged_size + self.embargo_size >= min_fold_size - 1: raise ValueError( "The sum of `purged_size` and `embargo_size` must be smaller than the" f" size of a train fold which is {min_fold_size}" ) fold_index_num = np.arange(n_samples) // (n_samples // self.n_folds) fold_index_num[fold_index_num == self.n_folds] = self.n_folds - 1 index_train_test = np.zeros((n_samples, self.n_splits)) for i in range(self.n_splits): index_train_test[ np.argwhere([fold_index_num == j for j in test_set_index[i]])[:, 1], i ] = 1 diff = np.diff(index_train_test, axis=0) # Purge before before_index = np.argwhere(diff == 1) for k in range(self.purged_size): index_train_test[ np.maximum(0, before_index[:, 0] - k), before_index[:, 1] ] = -1 # Purge after and Embargo after_index = np.argwhere(diff == -1) for k in range(self.purged_size + self.embargo_size): index_train_test[ np.minimum(n_samples - 1, after_index[:, 0] + k + 1), after_index[:, 1] ] = -1 self.index_train_test_ = index_train_test fold_index = { fold_id: np.argwhere(fold_index_num == fold_id).reshape(-1) for fold_id in range(self.n_folds) } for i in range(self.n_splits): train_index = np.argwhere(index_train_test[:, i] == 0).reshape(-1) test_index_list = [ fold_index[fold_id] for fold_id, _ in np.argwhere(recombine_paths == i) ] yield train_index, test_index_list
def summary(self, X) -> pd.Series: n_observations = X.shape[0] avg_train_size = _avg_train_size( n_observations=n_observations, n_folds=self.n_folds, n_test_folds=self.n_test_folds, ) return pd.Series( { "Number of Observations": n_observations, "Total Number of Folds": self.n_folds, "Number of Test Folds": self.n_test_folds, "Purge Size": self.purged_size, "Embargo Size": self.embargo_size, "Average Training Size": int(avg_train_size), "Number of Test Paths": self.n_test_paths, "Number of Training Combinations": self.n_splits, } )
[docs] def plot_train_test_folds(self) -> skt.Figure: """Plot the train/test fold locations""" values = self.binary_train_test_sets fill_color = np.where(values == 0, "blue", "red") fill_color = fill_color.astype(object) fill_color = np.insert( fill_color, 0, np.array(["darkblue" for _ in range(self.n_splits)]), axis=0 ) values = np.insert(values, 0, np.arange(self.n_splits), axis=0) fig = go.Figure( data=[ go.Table( header=dict( values=["Train Combinations"] + [f"Fold {i}" for i in range(self.n_folds)], fill_color="darkblue", font=dict(color="white"), align="left", ), cells=dict( values=values, font=dict(color="white"), fill_color=fill_color, line_color="grey", align="left", ), ) ] ) fig.update_layout(title="Split Train (0) /Test (1) Folds per Combination") return fig
[docs] def plot_train_test_index(self, X) -> skt.Figure: """Plot the training and test indices for each combinations by assigning `0` to training, `1` to test and `-1` to both purge and embargo indices.""" next(self.split(X)) n_samples = X.shape[0] cond = [ self.index_train_test_ == -1, self.index_train_test_ == 0, self.index_train_test_ == 1, ] values = self.index_train_test_.T values = np.insert(values, 0, np.arange(n_samples), axis=0) fill_color = np.select(cond, ["green", "blue", "red"], default="green").T fill_color = fill_color.astype(object) fill_color = np.insert( fill_color, 0, np.array(["darkblue" for _ in range(n_samples)]), axis=0 ) fig = go.Figure( data=[ go.Table( header=dict( values=["observations"] + [f"Split {i}" for i in range(self.n_splits)], fill_color="darkblue", font=dict(color="white"), align="left", ), cells=dict( values=values, font=dict(color="white"), fill_color=fill_color, line_color="grey", align="left", ), ) ] ) fig.update_layout( title="Train (0), Test (1) and Purge/Embargo (-1) observations per splits" ) return fig
def _n_splits(n_folds: int, n_test_folds: int) -> int: """Number of splits. Parameters ---------- n_folds : int Number of folds. n_test_folds : int Number of test folds. Returns ------- n_splits : int Number of splits """ return int(math.comb(n_folds, n_test_folds)) def _n_test_paths(n_folds: int, n_test_folds: int) -> int: """Number of test paths that can be reconstructed from the train/test combinations Parameters ---------- n_folds : int Number of folds. n_test_folds : int Number of test folds. Returns ------- n_splits : int Number of test paths. """ return ( _n_splits(n_folds=n_folds, n_test_folds=n_test_folds) * n_test_folds // n_folds ) def _avg_train_size(n_observations: int, n_folds: int, n_test_folds: int) -> float: """Average number of observations contained in each training set. Parameters ---------- n_observations : int Number of observations. n_folds : int Number of folds. n_test_folds : int Number of test folds. Returns ------- avg_train_size : float Average number of observations contained in each training set. """ return n_observations / n_folds * (n_folds - n_test_folds)
[docs] def optimal_folds_number( n_observations: int, target_train_size: int, target_n_test_paths: int, weight_train_size: float = 1, weight_n_test_paths: float = 1, ) -> tuple[int, int]: r"""Find the optimal number of folds (total folds and test folds) for a target training size and a target number of test paths. We find `x = n_folds` and `y = n_test_folds` that minimizes the below cost function of the relative distance from the two targets: .. math:: cost(x,y) = w_{f} \times \lvert\frac{f(x,y)-f_{target}}{f_{target}}\rvert + w_{g} \times \lvert\frac{g(x,y)-g_{target}}{g_{target}}\rvert with :math:`w_{f}` and :math:`w_{g}` the weights assigned to the distance from each target and :math:`f(x,y)` and :math:`g(x,y)` the average training size and the number of test paths as a function of the number of total folds and test folds. This is a combinatorial problem with :math:`\frac{T\times(T-3)}{2}` combinations, with :math:`T` the number of observations. We reduce the search space by using the combinatorial symetry :math:`{n \choose k}={n \choose n-k}` and skipping cost computation above 1e5. Parameters ---------- n_observations : int Number of observations. target_train_size : int The target number of observation in the training set. target_n_test_paths : int The target number of test paths (that can be reconstructed from the train/test combinations). weight_train_size : float, default=1 The weight assigned to the distance from the target train size. The default value is 1. weight_n_test_paths : float, default=1 The weight assigned to the distance from the target number of test paths. The default value is 1. Returns ------- n_folds : int Optimal number of total folds. n_test_folds : int Optimal number of test folds. """ def _cost( x: int, y: int, ) -> float: n_test_paths = _n_test_paths(n_folds=x, n_test_folds=y) avg_train_size = _avg_train_size( n_observations=n_observations, n_folds=x, n_test_folds=y ) return ( weight_n_test_paths * abs(n_test_paths - target_n_test_paths) / target_n_test_paths + weight_train_size * abs(avg_train_size - target_train_size) / target_train_size ) costs = [] res = [] for n_folds in range(3, n_observations + 1): i = None for n_test_folds in range(2, n_folds): if i is None or n_folds - n_test_folds <= i: cost = _cost(x=n_folds, y=n_test_folds) costs.append(cost) res.append((n_folds, n_test_folds)) if i is None and cost > 1e5: i = n_test_folds j = np.argmin(costs) return res[j]