Source code for skfolio.model_selection._walk_forward

"""Walk Forward cross-validator"""

# Copyright (c) 2023
# Author: Hugo Delatte <delatte.hugo@gmail.com>
# License: BSD 3 clause
# Implementation derived from:
# scikit-portfolio, Copyright (c) 2022, Carlo Nicolini, Licensed under MIT Licence.
# scikit-learn, Copyright (c) 2007-2010 David Cournapeau, Fabian Pedregosa, Olivier
# Grisel Licensed under BSD 3 clause.

import datetime as dt
from collections.abc import Iterator

import numpy as np
import numpy.typing as npt
import pandas as pd
import sklearn.model_selection as sks
import sklearn.utils as sku


[docs] class WalkForward(sks.BaseCrossValidator): """Walk Forward Cross-Validator. Provides train/test indices to split time series data samples using a walk-forward logic. In each split, test indices must be higher than the previous ones; therefore, shuffling in cross-validator is inappropriate. Compared to `sklearn.model_selection.TimeSeriesSplit`, you control the train/test folds by specifying the number of training and test samples instead of the number of splits, making it more suitable for portfolio cross-validation. If your data is a DataFrame indexed with a DatetimeIndex, you can split the data using specific datetime frequencies and offsets. Parameters ---------- test_size : int Length of each test set. If `freq` is `None` (default), it represents the number of observations. Otherwise, it represents the number of periods defined by `freq`. train_size : int | pandas.offsets.DateOffset | datetime.timedelta Length of each training set. If `freq` is `None` (default), it represents the number of observations. Otherwise, for integers, it represents the number of periods defined by `freq`; for pandas DateOffset or datetime timedelta it represents the date offset applied to the start of each period. freq : str | pandas.offsets.DateOffset, optional If provided, it must be a frequency string or a pandas DateOffset, and the returns `X` must be a DataFrame with an index of type `DatetimeIndex`. For a list of pandas frequencies and offsets, see `here <https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timeseries-offset-aliases>`_. The defaul (`None`) means `test_size` and `train_size` represent the number of observations. Below are some common examples: * Rebalancing : Montly on the first day * Test Duration : 1 month * Train Duration : 6 months >>> cv = WalkForward(test_size=1, train_size=6, freq="MS") * Rebalancing : Quarterly on the first day * Test Duration : 1 quarter * Train Duration : 2 months >>> cv = WalkForward(test_size=1, train_size=pd.DateOffset(months=2), freq="QS") * Rebalancing : Montly on the third Friday * Test Duration : 1 month * Train Duration : 6 weeks >>> cv = WalkForward(test_size=1, train_size=pd.offsets.Week(6), freq= "WOM-3FRI") * Rebalancing : Semi-annually on the last day * Test Duration : 6 months * Train Duration : 1 year >>> cv = WalkForward(test_size=1, train_size=2, freq=pd.offsets.SemiMonthEnd()) * Rebalancing : Every 2 months on the second day * Test Duration : 2 months * Train Duration : 6 months >>> cv = WalkForward(test_size=2, train_size=6, freq="MS", freq_offset=dt.timedelta(days=2)) freq_offset : pandas DateOffset | datetime timedelta, optional Only used if `freq` is provided. Offsets the `freq` by a pandas DateOffset or a datetime timedelta offset. previous : bool, default=False Only used if `freq` is provided. If set to `True`, and if the period start or period end is not in the `DatetimeIndex`, the previous observation is used; otherwise, the next observation is used (default). expend_train : bool, default=False If set to `True`, each subsequent training set after the first one will use all past observations. The default is `False`. reduce_test : bool, default=False If set to `True`, the last train/test split will be returned even if the test set is partial (i.e., it contains fewer observations than `test_size`), otherwise, it will be ignored. The default is `False`. purged_size : int, default=0 The number of observations to exclude from the end of each training set before the test set. The default value is `0`. Examples -------- >>> import numpy as np >>> from skfolio.model_selection import WalkForward >>> X = np.random.randn(6, 2) >>> cv = WalkForward(test_size=1, train_size=2) >>> for i, (train_index, test_index) in enumerate(cv.split(X)): ... print(f"Fold {i}:") ... print(f" Train: index={train_index}") ... print(f" Test: index={test_index}") Fold 0: Train: index=[0 1] Test: index=[2] Fold 1: Train: index=[1 2] Test: index=[3] Fold 2: Train: index=[2 3] Test: index=[4] Fold 3: Train: index=[3 4] Test: index=[5] >>> cv = WalkForward(test_size=1, train_size=2, purged_size=1) >>> for i, (train_index, test_index) in enumerate(cv.split(X)): ... print(f"Fold {i}:") ... print(f" Train: index={train_index}") ... print(f" Test: index={test_index}") Fold 0: Train: index=[0 1] Test: index=[3] Fold 1: Train: index=[1 2] Test: index=[4] Fold 2: Train: index=[2 3] Test: index=[5] >>> cv = WalkForward(test_size=2, train_size=3) >>> for i, (train_index, test_index) in enumerate(cv.split(X)): ... print(f"Fold {i}:") ... print(f" Train: index={train_index}") ... print(f" Test: index={test_index}") Fold 0: Train: index=[0 1 2] Test: index=[3 4] >>> cv = WalkForward(test_size=2, train_size=3, reduce_test=True) >>> for i, (train_index, test_index) in enumerate(cv.split(X)): ... print(f"Fold {i}:") ... print(f" Train: index={train_index}") ... print(f" Test: index={test_index}") Fold 0: Train: index=[0 1 2] Test: index=[3 4] Fold 1: Train: index=[2 3 4] Test: index=[5] >>> cv = WalkForward(test_size=2, train_size=3, expend_train=True, reduce_test=True) >>> for i, (train_index, test_index) in enumerate(cv.split(X)): ... print(f"Fold {i}:") ... print(f" Train: index={train_index}") ... print(f" Test: index={test_index}") Fold 0: Train: index=[0 1 2] Test: index=[3 4] Fold 1: Train: index=[0 1 2 3 4] Test: index=[5] """ def __init__( self, test_size: int, train_size: int | pd.offsets.BaseOffset | dt.timedelta, freq: str | pd.offsets.BaseOffset | None = None, freq_offset: pd.offsets.BaseOffset | dt.timedelta | None = None, previous: bool = False, expend_train: bool = False, reduce_test: bool = False, purged_size: int = 0, ): self.test_size = test_size self.train_size = train_size self.freq = freq self.freq_offset = freq_offset self.previous = previous self.expend_train = expend_train self.reduce_test = reduce_test self.purged_size = purged_size
[docs] def split( self, X: npt.ArrayLike, y=None, groups=None ) -> Iterator[np.ndarray, np.ndarray]: """Generate indices to split data into training and test set. Parameters ---------- X : array-like of shape (n_observations, n_assets) Price returns of the assets. y : array-like of shape (n_observations, n_targets) Always ignored, exists for compatibility. groups : array-like of shape (n_observations,) Always ignored, exists for compatibility. Yields ------ train : ndarray The training set indices for that split. test : ndarray The testing set indices for that split. """ X, y = sku.indexable(X, y) n_samples = X.shape[0] if not isinstance(self.test_size, int): raise ValueError("test_size` must be an integer") if self.freq is None: if not isinstance(self.train_size, int): raise ValueError("When `freq` is None, `train_size` must be an integer") return _split_without_period( n_samples=n_samples, train_size=self.train_size, test_size=self.test_size, purged_size=self.purged_size, expend_train=self.expend_train, reduce_test=self.reduce_test, ) if not hasattr(X, "index") or not isinstance(X.index, pd.DatetimeIndex): raise ValueError( "X must be a DataFrame with an index of type DatetimeIndex" ) if isinstance(self.train_size, int): return _split_from_period_without_train_offset( n_samples=n_samples, train_size=self.train_size, test_size=self.test_size, freq=self.freq, freq_offset=self.freq_offset, previous=self.previous, purged_size=self.purged_size, expend_train=self.expend_train, reduce_test=self.reduce_test, ts_index=X.index, ) return _split_from_period_with_train_offset( n_samples=n_samples, train_size=self.train_size, test_size=self.test_size, freq=self.freq, freq_offset=self.freq_offset, previous=self.previous, purged_size=self.purged_size, expend_train=self.expend_train, reduce_test=self.reduce_test, ts_index=X.index, )
[docs] def get_n_splits(self, X=None, y=None, groups=None) -> int: """Returns the number of splitting iterations in the cross-validator Parameters ---------- X : array-like of shape (n_observations, n_assets) Price returns of the assets. y : array-like of shape (n_observations, n_targets) Always ignored, exists for compatibility. groups : array-like of shape (n_observations,) Always ignored, exists for compatibility. Returns ------- n_folds : int Returns the number of splitting iterations in the cross-validator. """ if X is None: raise ValueError("The 'X' parameter should not be None.") X, y = sku.indexable(X, y) n_samples = X.shape[0] n = n_samples - self.train_size - self.purged_size if self.reduce_test and n % self.test_size != 0: return n // self.test_size + 1 return n // self.test_size
def _split_without_period( n_samples: int, train_size: int, test_size: int, purged_size: int, expend_train: bool, reduce_test: bool, ) -> Iterator[np.ndarray, np.ndarray]: if train_size + purged_size >= n_samples: raise ValueError( "The sum of `train_size` with `purged_size` " f"({train_size + purged_size}) cannot be greater than the" f" number of samples ({n_samples})." ) indices = np.arange(n_samples) test_start = train_size + purged_size while True: if test_start >= n_samples: return test_end = test_start + test_size train_end = test_start - purged_size if expend_train: train_start = 0 else: train_start = train_end - train_size if test_end > n_samples: if not reduce_test: return test_indices = indices[test_start:] else: test_indices = indices[test_start:test_end] train_indices = indices[train_start:train_end] yield train_indices, test_indices test_start = test_end def _split_from_period_without_train_offset( n_samples: int, train_size: int, test_size: int, freq: str, freq_offset: pd.offsets.BaseOffset | dt.timedelta | None, previous: bool, purged_size: int, expend_train: bool, reduce_test: bool, ts_index, ) -> Iterator[np.ndarray, np.ndarray]: start = ts_index[0] end = ts_index[-1] if freq_offset is not None: start = min(start, start - freq_offset) date_range = pd.date_range(start=start, end=end, freq=freq) if freq_offset is not None: date_range += freq_offset idx = ts_index.get_indexer(date_range, method="ffill" if previous else "bfill") n = len(idx) i = 0 while True: if i + train_size >= n: return if i + train_size + test_size >= n: if not reduce_test: return test_indices = np.arange(idx[i + train_size], n_samples) else: test_indices = np.arange( idx[i + train_size], idx[i + train_size + test_size] ) if expend_train: train_start = 0 else: train_start = idx[i] train_indices = np.arange(train_start, idx[i + train_size] - purged_size) yield train_indices, test_indices i += test_size def _split_from_period_with_train_offset( n_samples: int, train_size: pd.offsets.BaseOffset | dt.timedelta, test_size: int, freq: str, freq_offset: pd.offsets.BaseOffset | dt.timedelta | None, previous: bool, purged_size: int, expend_train: bool, reduce_test: bool, ts_index, ) -> Iterator[np.ndarray, np.ndarray]: start = ts_index[0] end = ts_index[-1] if freq_offset is not None: start = min(start, start - freq_offset) date_range = pd.date_range(start=start, end=end, freq=freq) if freq_offset is not None: date_range += freq_offset idx = ts_index.get_indexer(date_range, method="ffill" if previous else "bfill") train_idx = ts_index.get_indexer(date_range - train_size, method="ffill") n = len(idx) if np.all(train_idx == -1): return i = np.argmax(train_idx > -1) while True: if i >= n: return if i + test_size >= n: if not reduce_test: return test_indices = np.arange(idx[i], n_samples) else: test_indices = np.arange(idx[i], idx[i + test_size] - purged_size) if expend_train: train_start = 0 else: train_start = train_idx[i] train_indices = np.arange(train_start, idx[i]) yield train_indices, test_indices i += test_size