Source code for skfolio.utils.stats

"""Tools module"""

import warnings

# Copyright (c) 2023
# Author: Hugo Delatte <delatte.hugo@gmail.com>
# License: BSD 3 clause
# Implementation derived from:
# Riskfolio-Lib, Copyright (c) 2020-2023, Dany Cajas, Licensed under BSD 3 clause.
# Statsmodels, Copyright (C) 2006, Jonathan E. Taylor, Licensed under BSD 3 clause.
from enum import auto

import cvxpy as cp
import numpy as np
import scipy.cluster.hierarchy as sch
import scipy.optimize as sco
import scipy.sparse.linalg as scl
import scipy.spatial.distance as scd
import scipy.special as scs
from scipy.sparse import csr_matrix

from skfolio.utils.tools import AutoEnum

__all__ = [
    "NBinsMethod",
    "n_bins_freedman",
    "n_bins_knuth",
    "is_cholesky_dec",
    "assert_is_square",
    "assert_is_symmetric",
    "assert_is_distance",
    "cov_nearest",
    "cov_to_corr",
    "corr_to_cov",
    "commutation_matrix",
    "compute_optimal_n_clusters",
    "rand_weights",
    "rand_weights_dirichlet",
    "minimize_relative_weight_deviation",
]


[docs] class NBinsMethod(AutoEnum): """Enumeration of the Number of Bins Methods Parameters ---------- FREEDMAN : str Freedman method KNUTH : str Knuth method """ FREEDMAN = auto() KNUTH = auto()
[docs] def n_bins_freedman(x: np.ndarray) -> int: """Compute the optimal histogram bin size using the Freedman-Diaconis rule [1]_. Parameters ---------- x : ndarray of shape (n_observations,) The input array. Returns ------- n_bins : int The optimal bin size. References ---------- .. [1] "On the histogram as a density estimator: L2 theory". Freedman & Diaconis (1981). """ if x.ndim != 1: raise ValueError("`x` must be a 1d-array") n = len(x) p_25, p_75 = np.percentile(x, [25, 75]) d = 2 * (p_75 - p_25) / (n ** (1 / 3)) if d == 0: return 5 n_bins = max(1, np.ceil((np.max(x) - np.min(x)) / d)) return int(round(n_bins))
[docs] def n_bins_knuth(x: np.ndarray) -> int: """Compute the optimal histogram bin size using Knuth's rule [1]_. Parameters ---------- x : ndarray of shape (n_observations,) The input array. Returns ------- n_bins : int The optimal bin size. References ---------- .. [1] "Optimal Data-Based Binning for Histograms". Knuth. """ x = np.sort(x) n = len(x) def func(y: np.ndarray) -> float: y = y[0] if y <= 0: return np.inf bin_edges = np.linspace(x[0], x[-1], int(y) + 1) hist, _ = np.histogram(x, bin_edges) return -( n * np.log(y) + scs.gammaln(0.5 * y) - y * scs.gammaln(0.5) - scs.gammaln(n + 0.5 * y) + np.sum(scs.gammaln(hist + 0.5)) ) n_bins_init = n_bins_freedman(x) n_bins = sco.fmin(func, n_bins_init, disp=0)[0] return int(round(n_bins))
[docs] def rand_weights_dirichlet(n: int) -> np.array: """Produces n random weights that sum to one from a dirichlet distribution (uniform distribution over a simplex) Parameters ---------- n : int Number of weights. Returns ------- weights : ndarray of shape (n, ) The vector of weights. """ return np.random.dirichlet(np.ones(n))
[docs] def rand_weights(n: int, zeros: int = 0) -> np.array: """Produces n random weights that sum to one from an uniform distribution (non-uniform distribution over a simplex) Parameters ---------- n : int Number of weights. zeros : int, default=0 The number of weights to randomly set to zeros. Returns ------- weights : ndarray of shape (n, ) The vector of weights. """ k = np.random.rand(n) if zeros > 0: zeros_idx = np.random.choice(n, zeros, replace=False) k[zeros_idx] = 0 return k / sum(k)
[docs] def is_cholesky_dec(x: np.ndarray) -> bool: """Returns True if Cholesky decomposition can be computed. The matrix must be Hermitian (symmetric if real-valued) and positive-definite. No checking is performed to verify whether the matrix is Hermitian or not. Parameters ---------- x : ndarray of shape (n, m) The matrix. Returns ------- value : bool True if Cholesky decomposition can be applied to the matrix, False otherwise. """ # Around 100 times faster than checking for positive eigenvalues with np.linalg.eigh try: np.linalg.cholesky(x) return True except np.linalg.LinAlgError: return False
def is_positive_definite(x: np.ndarray) -> bool: """Returns True if the matrix is positive definite. Parameters ---------- x : ndarray of shape (n, m) The matrix. Returns ------- value : bool True if the matrix is positive definite, False otherwise. """ return np.all(np.linalg.eigvals(x) > 0)
[docs] def assert_is_square(x: np.ndarray) -> None: """Raises an error if the matrix is not square. Parameters ---------- x : ndarray of shape (n, n) The matrix. Raises ------ ValueError: if the matrix is not square. """ if x.ndim != 2 or x.shape[0] != x.shape[1]: raise ValueError("The matrix must be square")
[docs] def assert_is_symmetric(x: np.ndarray) -> None: """Raises an error if the matrix is not symmetric. Parameters ---------- x : ndarray of shape (n, m) The matrix. Raises ------ ValueError: if the matrix is not symmetric. """ assert_is_square(x) if not np.allclose(x, x.T): raise ValueError("The matrix must be symmetric")
[docs] def assert_is_distance(x: np.ndarray) -> None: """Raises an error if the matrix is not a distance matrix. Parameters ---------- x : ndarray of shape (n, n) The matrix. Raises ------ ValueError: if the matrix is a distance matrix. """ assert_is_symmetric(x) if not np.allclose(np.diag(x), np.zeros(x.shape[0]), atol=1e-5): raise ValueError( "The distance matrix must have diagonal elements close to zeros" )
[docs] def cov_to_corr(cov: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """Convert a covariance matrix to a correlation matrix. Parameters ---------- cov : ndarray of shape (n, n) Covariance matrix. Returns ------- corr, std : tuple[ndarray of shape (n, n), ndarray of shape (n, )] Correlation matrix and standard-deviation vector """ if cov.ndim != 2: raise ValueError(f"`cov` must be a 2D array, got a {cov.ndim}D array") std = np.sqrt(np.diag(cov)) corr = cov / std / std[:, None] return corr, std
[docs] def corr_to_cov(corr: np.ndarray, std: np.ndarray): """Convert a correlation matrix to a covariance matrix given its standard-deviation vector. Parameters ---------- corr : ndarray of shape (n, n) Correlation matrix. std : ndarray of shape (n, ) Standard-deviation vector. Returns ------- cov : ndarray of shape (n, n) Covariance matrix """ if std.ndim != 1: raise ValueError(f"`std` must be a 1D array, got a {std.ndim}D array") if corr.ndim != 2: raise ValueError(f"`corr` must be a 2D array, got a {corr.ndim}D array") cov = corr * std * std[:, None] return cov
_CLIPPING_VALUE = 1e-13
[docs] def cov_nearest( cov: np.ndarray, higham: bool = False, higham_max_iteration: int = 100, warn: bool = False, ): """Compute the nearest covariance matrix that is positive definite and with a cholesky decomposition than can be computed. The variance is left unchanged. A covariance matrix that is not positive definite often occurs in high dimensional problems. It can be due to multicollinearity, floating-point inaccuracies, or when the number of observations is smaller than the number of assets. First, it converts the covariance matrix to a correlation matrix. Then, it finds the nearest correlation matrix and converts it back to a covariance matrix using the initial standard deviation. Cholesky decomposition can fail for symmetric positive definite (SPD) matrix due to floating point error and inversely, Cholesky decomposition can success for non-SPD matrix. Therefore, we need to test for both. We always start by testing for Cholesky decomposition which is significantly faster than checking for positive eigenvalues. Parameters ---------- cov : ndarray of shape (n, n) Covariance matrix. higham : bool, default=False If this is set to True, the Higham & Nick (2002) algorithm [1]_ is used, otherwise the eigenvalues are clipped to threshold above zeros (1e-13). The default (`False`) is to use the clipping method as the Higham & Nick algorithm can be slow for large datasets. higham_max_iteration : int, default=100 Maximum number of iteration of the Higham & Nick (2002) algorithm. The default value is `100`. warn : bool, default=False If this is set to True, a user warning is emitted when the covariance matrix is not positive definite and replaced by the nearest. The default is False. Returns ------- cov : ndarray The nearest covariance matrix. References ---------- .. [1] "Computing the nearest correlation matrix - a problem from finance" IMA Journal of Numerical Analysis Higham & Nick (2002) """ assert_is_square(cov) assert_is_symmetric(cov) # Around 100 times faster than checking eigenvalues with np.linalg.eigh if is_cholesky_dec(cov) and is_positive_definite(cov): return cov if warn: warnings.warn( "The covariance matrix is not positive definite. " f"The {'Higham' if higham else 'Clipping'} algorithm will be used to find " "the nearest positive definite covariance.", stacklevel=2, ) corr, std = cov_to_corr(cov) if higham: eps = np.finfo(np.float64).eps * 5 diff = np.zeros(corr.shape) x = corr.copy() for _ in range(higham_max_iteration): x_adj = x - diff eig_vals, eig_vecs = np.linalg.eigh(x_adj) x = eig_vecs * np.maximum(eig_vals, eps) @ eig_vecs.T diff = x - x_adj np.fill_diagonal(x, 1) cov = corr_to_cov(x, std) if is_cholesky_dec(cov) and is_positive_definite(cov): break else: raise ValueError("Unable to find the nearest positive definite matrix") else: eig_vals, eig_vecs = np.linalg.eigh(corr) # Clipping the eigenvalues with a value smaller than 1e-13 can cause scipy to # consider the matrix non-psd is some corner cases (see test/test_stats.py) x = eig_vecs * np.maximum(eig_vals, _CLIPPING_VALUE) @ eig_vecs.T x, _ = cov_to_corr(x) cov = corr_to_cov(x, std) return cov
[docs] def commutation_matrix(x): """Compute the commutation matrix. Parameters ---------- x : ndarray of shape (n, m) The matrix. Returns ------- K : ndarray of shape (m * n, m * n) The commutation matrix. """ (m, n) = x.shape row = np.arange(m * n) col = row.reshape((m, n), order="F").ravel() data = np.ones(m * n, dtype=np.int8) k = csr_matrix((data, (row, col)), shape=(m * n, m * n)) return k
[docs] def compute_optimal_n_clusters(distance: np.ndarray, linkage_matrix: np.ndarray) -> int: r"""Compute the optimal number of clusters based on Two-Order Difference to Gap Statistic [1]_. The Two-Order Difference to Gap Statistic has been developed to improve the performance and stability of the Tibshiranis Gap statistic. It applies the two-order difference of the within-cluster dispersion to replace the reference null distribution in the Gap statistic. The number of cluster :math:`k` is determined by: .. math:: \begin{cases} \begin{aligned} &\max_{k} & & W_{k+2} + W_{k} - 2 W_{k+1} \\ &\text{s.t.} & & 1 \ge c \ge max\bigl(8, \sqrt{n}\bigr) \\ \end{aligned} \end{cases} with :math:`n` the sample size and :math:`W_{k}` the within-cluster dispersions defined as: .. math:: W_{k} = \sum_{i=1}^{k} \frac{D_{i}}{2|C_{i}|} where :math:`|C_{i}|` is the cardinality of cluster :math:`i` and :math:`D_{i}` its density defined as: .. math:: D_{i} = \sum_{u \in C_{i}} \sum_{v \in C_{i}} d(u,v) with :math:`d(u,v)` the distance between u and v. Parameters ---------- distance : ndarray of shape (n, n) Distance matrix. linkage_matrix : ndarray of shape (n - 1, 4) Linkage matrix. Returns ------- value : int Optimal number of clusters. References ---------- .. [1] "Application of two-order difference to gap statistic". Yue, Wang & Wei (2009) """ cut_tree = sch.cut_tree(linkage_matrix) n = cut_tree.shape[1] max_clusters = min(n, max(8, round(np.sqrt(n)))) dispersion = [] for k in range(max_clusters): level = cut_tree[:, n - k - 1] cluster_density = [] for i in range(np.max(level) + 1): cluster_idx = np.argwhere(level == i).flatten() cluster_dists = scd.squareform( distance[cluster_idx, :][:, cluster_idx], checks=False ) if cluster_dists.shape[0] != 0: cluster_density.append(np.nan_to_num(cluster_dists.mean())) dispersion.append(np.sum(cluster_density)) dispersion = np.array(dispersion) gaps = np.roll(dispersion, -2) + dispersion - 2 * np.roll(dispersion, -1) gaps = gaps[:-2] # k=0 represents one cluster k = np.argmax(gaps) + 2 return k
[docs] def minimize_relative_weight_deviation( weights: np.ndarray, min_weights: np.ndarray, max_weights: np.ndarray, solver: str = "CLARABEL", solver_params: dict | None = None, ) -> np.ndarray: r""" Apply weight constraints to an initial array of weights by minimizing the relative weight deviation of the final weights from the initial weights. .. math:: \begin{cases} \begin{aligned} &\min_{w} & & \Vert \frac{w - w_{init}}{w_{init}} \Vert_{2}^{2} \\ &\text{s.t.} & & \sum_{i=1}^{N} w_{i} = 1 \\ & & & w_{min} \leq w_i \leq w_{max}, \quad \forall i \end{aligned} \end{cases} Parameters ---------- weights : ndarray of shape (n_assets,) Initial weights. min_weights : ndarray of shape (n_assets,) Minimum assets weights (weights lower bounds). max_weights : ndarray of shape (n_assets,) Maximum assets weights (weights upper bounds). solver : str, default="CLARABEL" The solver to use. The default is "CLARABEL" which is written in Rust and has better numerical stability and performance than ECOS and SCS. For more details about available solvers, check the CVXPY documentation: https://www.cvxpy.org/tutorial/advanced/index.html#choosing-a-solver solver_params : dict, optional Solver parameters. For example, `solver_params=dict(verbose=True)`. The default (`None`) is to use the CVXPY default. For more details about solver arguments, check the CVXPY documentation: https://www.cvxpy.org/tutorial/advanced/index.html#setting-solver-options """ if not (weights.shape == min_weights.shape == max_weights.shape): raise ValueError("`min_weights` and `max_weights` must have same size") if np.any(weights < 0): raise ValueError("Initial weights must be strictly positive") if not np.isclose(np.sum(weights), 1.0): raise ValueError("Initial weights must sum to one") if np.any(max_weights < min_weights): raise ValueError("`min_weights` must be lower or equal to `max_weights`") if np.all((weights >= min_weights) & (weights <= max_weights)): return weights if solver_params is None: solver_params = {} n = len(weights) w = cp.Variable(n) objective = cp.Minimize(cp.norm(w / weights - 1)) constraints = [cp.sum(w) == 1, w >= min_weights, w <= max_weights] problem = cp.Problem(objective, constraints) try: problem.solve(solver=solver, **solver_params) if w.value is None: raise cp.SolverError("No solution found") except (cp.SolverError, scl.ArpackNoConvergence): raise cp.SolverError( f"Solver '{solver}' failed. Try another" " solver, or solve with solver_params=dict(verbose=True) for more" " information" ) from None return w.value