Source code for skfolio.distribution.multivariate._vine_copula

"""Vine Copula Estimation."""

# Copyright (c) 2025
# Author: Hugo Delatte <delatte.hugo@gmail.com>
# Credits: Matteo Manzi, Vincent Maladière, Carlo Nicolini
# SPDX-License-Identifier: BSD-3-Clause

# This is a novel implementation of R-vine that is fully OOP based on graph theory with
# new features to also capture C-like and clustered dependency structures.
#
# Unlike previous implementations that relied on a procedural paradigm, this version
# adopts an object-oriented design for constructing vine copulas.
#
# The validity of a vine tree is based on the following principle:
#   - In tree T_{k-1}, each edge represents a subset of k variables.
#   - Two such edges can form a new edge in tree T_k if and only if they share exactly
#     k-1 variables.
#
# In an OOP approach, we can use the below equivalent graphical adjacency rule:
# Two edges in T_{k-1} (which are “nodes” when building T_k) are connected by an edge
# in T_k if and only if they share exactly one node in T_{k-1}.
#
#  By using the concept of central assets in the MST, this novel implementation is able
#  to capture clustered or C-like dependency structures, allowing for more nuanced
#  representation of hierarchical relationships among assets and improving conditional
#  sampling.

import contextlib
import numbers
import warnings
from collections import deque

import numpy as np
import numpy.typing as npt
import plotly.express as px
import plotly.graph_objects as go
import scipy.stats as st
import sklearn.utils as sku
import sklearn.utils.parallel as skp
import sklearn.utils.validation as skv

from skfolio.distribution._base import SelectionCriterion
from skfolio.distribution.copula import (
    UNIFORM_MARGINAL_EPSILON,
    BaseBivariateCopula,
    ClaytonCopula,
    GaussianCopula,
    GumbelCopula,
    JoeCopula,
    StudentTCopula,
    select_bivariate_copula,
)
from skfolio.distribution.multivariate._base import BaseMultivariateDist
from skfolio.distribution.multivariate._utils import (
    ChildNode,
    DependenceMethod,
    Edge,
    RootNode,
    Tree,
)
from skfolio.distribution.univariate import (
    BaseUnivariateDist,
    Gaussian,
    JohnsonSU,
    StudentT,
    select_univariate_dist,
)
from skfolio.utils.tools import input_to_array, validate_input_list

_UNIFORM_SAMPLE_EPSILON = 1e-14


[docs] class VineCopula(BaseMultivariateDist): """ Regular Vine Copula Estimator. This model first fits the best univariate distribution for each asset, transforming the data to uniform marginals via the fitted CDFs. Then, it constructs a regular vine copula by sequentially selecting the best bivariate copula from a list of candidates for each edge in the vine using a maximum spanning tree algorithm based on a given dependence measure [1]_. Regular vines captures complex, fat-tailed dependencies and tail co-movements between asset returns. It also supports conditional sampling, enabling stress testing and scenario analysis by generating samples under specified conditions. Moreover, by marking some assets as central, this novel implementation is able to capture clustered or C-like dependency structures, allowing for more nuanced representation of hierarchical relationships among assets and improving conditional sampling and stress testing. Parameters ---------- fit_marginals : bool, default=True Whether to fit marginal distributions to each asset before constructing the vine. If True, the data will be transformed to uniform marginals using the fitted CDFs. marginal_candidates : list[BaseUnivariateDist], optional Candidate univariate distribution estimators to fit the marginals. If None, defaults to `[Gaussian(), StudentT(), JohnsonSU()]`. copula_candidates : list[BaseBivariateCopula], optional Candidate bivariate copula estimators. If None, defaults to `[GaussianCopula(), StudentTCopula(), ClaytonCopula(), GumbelCopula(), JoeCopula()]`. max_depth : int or None, default=4 Maximum vine depth (truncated level). Must be greater than 1. `None` means that no truncation is applied. The default is 4. log_transform : bool | dict[str, bool] | array-like of shape (n_assets, ), default=False If True, the simple returns provided as input will be transformed to log returns before fitting the vine copula. That is, each return R is transformed via r = log(1+R). After sampling, the generated log returns are converted back to simple returns using R = exp(r) - 1. If a boolean is provided, it is applied to each asset. If a dictionary is provided, its (key/value) pair must be the (asset name/boolean) and the input `X` of the `fit` method must be a DataFrame with the assets names in columns. central_assets : array-like of asset names or asset positions, optional Assets that should be centrally placed during vine construction. If None, no asset is forced to the center. If an array-like of **integer** is provided, its values must be asset positions. If an array-like of **string** is provided, its values must be asset names and the input `X` of the `fit` method must be a DataFrame with the assets names in columns. Assets marked as central are forced to occupy central positions in the vine, leading to C-like or clustered structure. This is needed for conditional sampling, where the conditioning assets should be central nodes. For example: - If only asset 1 is marked as central, it will be connected to all other assets in the first tree (yielding a C-like structure for the initial tree), with subsequent trees following the standard R-vine pattern. - If asset 1 and asset 2 are marked as central, they will be connected together and the remaining assets will connect to either asset 1 or asset 2 (forming a clustered structure in the initial trees). In the next tree, the edge between asset 1 and asset 2 becomes the central node, with subsequent trees following the standard R-vine structure. - This logic extends naturally to more than two central assets. dependence_method : DependenceMethod, default=DependenceMethod.KENDALL_TAU The dependence measure used to compute edge weights for the MST. Possible values are: - KENDALL_TAU - MUTUAL_INFORMATION - WASSERSTEIN_DISTANCE selection_criterion : SelectionCriterion, default=SelectionCriterion.AIC The criterion used for univariate and copula selection. Possible values are: - SelectionCriterion.AIC : Akaike Information Criterion - SelectionCriterion.BIC : Bayesian Information Criterion independence_level : float, default=0.05 Significance level used for the Kendall tau independence test during copula fitting. If the p-value exceeds this threshold, the null hypothesis of independence is accepted, and the pair copula is modeled using the `IndependentCopula()` class. n_jobs : int, optional The number of jobs to run in parallel for `fit` of all `estimators`. The value `-1` means using all processors. The default (`None`) means 1 unless in a `joblib.parallel_backend` context. random_state : int, RandomState instance or None, default=None Seed or random state to ensure reproducibility. Attributes ---------- trees_ : list[Tree] List of constructed vine trees. marginal_distributions_ : list[BaseUnivariateDist] List of fitted marginal distributions (if fit_marginals is True). n_features_in_ : int Number of assets seen during `fit`. feature_names_in_ : ndarray of shape (`n_features_in_`,) Names of assets seen during `fit`. Defined only when `X` has assets names that are all strings. Examples -------- >>> from skfolio.datasets import load_factors_dataset >>> from skfolio.preprocessing import prices_to_returns >>> from skfolio.distribution import VineCopula >>> >>> # Load historical prices and convert them to returns >>> prices = load_factors_dataset() >>> X = prices_to_returns(prices) >>> >>> # Instanciate the VineCopula model >>> vine = VineCopula() >>> # Fit the model >>> vine.fit(X) >>> # Display the vine trees and fitted copulas >>> vine.display_vine() >>> # Log-likelihood, AIC and BIC >>> vine.score(X) >>> vine.aic(X) >>> vine.bic(X) >>> >>> # Generate 10 samples from the fitted vine copula >>> samples = vine.sample(n_samples=10) >>> >>> # Set QUAL, SIZE and MTUM as central >>> vine = VineCopula(central_assets=["QUAL", "SIZE", "MTUM"]) >>> vine.fit(X) >>> # Sample by conditioning on QUAL and SIZE returns >>> samples = vine.sample( ... n_samples=4, ... conditioning={ ... "QUAL": [-0.1, -0.2, -0.3, -0.4], ... "SIZE": -0.2, ... "MTUM": (None, -0.3) # MTUM sampled between -Inf and -30% ... }, ...) >>> # Plots Scatter matrix of sampled returns vs historical X >>> fig = vine.plot_scatter_matrix(X=X) >>> fig.show() >>> >>> # Plots univariate distributions of sampled returns vs historical X >>> fig = vine.plot_marginal_distributions(X=X) >>> fig.show() References ---------- .. [1] "Selecting and estimating regular vine copulae and application to financial returns" Dißmann, Brechmann, Czado, and Kurowicka (2013). .. [2] "Growing simplified vine copula trees: improving Dißmann's algorithm" Krausa and Czado (2017). .. [3] "Pair-copula constructions of multiple dependence" Aas, Czado, Frigessi, Bakken (2009). .. [4] "Pair-Copula Constructions for Financial Applications: A Review" Aas and Czado (2016). .. [5] "Conditional copula simulation for systemic risk stress testing" Brechmann, Hendrich, Czado (2013) """ trees_: list[Tree] marginal_distributions_: list[BaseUnivariateDist] n_features_in_: int feature_names_in_: np.ndarray central_assets_: set[int | str] _log_transform: np.ndarray def __init__( self, fit_marginals: bool = True, marginal_candidates: list[BaseUnivariateDist] | None = None, copula_candidates: list[BaseBivariateCopula] | None = None, max_depth: int | None = 4, log_transform: bool | dict[str, bool] | npt.ArrayLike = False, central_assets: list[int | str] | None = None, dependence_method: DependenceMethod = DependenceMethod.KENDALL_TAU, selection_criterion: SelectionCriterion = SelectionCriterion.AIC, independence_level: float = 0.05, n_jobs: int | None = None, random_state: int | None = None, ): super().__init__(random_state=random_state) self.fit_marginals = fit_marginals self.marginal_candidates = marginal_candidates self.copula_candidates = copula_candidates self.max_depth = max_depth self.log_transform = log_transform self.central_assets = central_assets self.dependence_method = dependence_method self.selection_criterion = selection_criterion self.independence_level = independence_level self.n_jobs = n_jobs @property def n_params(self) -> int: """Number of model parameters.""" skv.check_is_fitted(self) k = 0 if self.fit_marginals: k = sum([dist.n_params for dist in self.marginal_distributions_]) for tree in self.trees_: for edge in tree.edges: k += edge.copula.n_params return k
[docs] def fit(self, X: npt.ArrayLike, y=None) -> "VineCopula": """ Fit the Vine Copula model to the data. Parameters ---------- X : array-like of shape (n_observations, n_assets) Price returns of the assets. y : Ignored Not used, present for API consistency by convention. Returns ------- self : VineCopula The fitted VineCopula instance. Raises ------ ValueError If the number of assets is less than or equal to 2, or if max_depth <= 1. """ X = skv.validate_data(self, X, dtype=np.float64) n_observations, n_assets = X.shape if n_assets <= 2: raise ValueError( f"The number of assets must be higher than 2, got {n_assets}" ) if self.max_depth is not None and self.max_depth <= 1: raise ValueError(f"`max_depth` must be higher than 1, got {self.max_depth}") if np.isscalar(self.log_transform): self._log_transform = np.array([self.log_transform] * n_assets, dtype=bool) else: self._log_transform = input_to_array( items=self.log_transform, n_assets=n_assets, fill_value=False, dim=1, assets_names=getattr(self, "feature_names_in_", None), name="log_transform", ) if np.any(self._log_transform): X = np.where(self._log_transform, np.log1p(X), X) depth = n_assets - 1 if self.max_depth is not None: depth = min(self.max_depth, depth) self.central_assets_ = set() if self.central_assets is not None: self.central_assets_ = set( validate_input_list( items=self.central_assets, n_assets=n_assets, assets_names=getattr(self, "feature_names_in_", None), name="central_assets", raise_if_string_missing=False, ) ) marginal_candidates = ( self.marginal_candidates if self.marginal_candidates is not None else [ Gaussian(), StudentT(), JohnsonSU(), ] ) copula_candidates = ( self.copula_candidates if self.copula_candidates is not None else [ GaussianCopula(), StudentTCopula(), ClaytonCopula(), GumbelCopula(), JoeCopula(), ] ) if self.fit_marginals: # Fit marginal distributions self.marginal_distributions_ = skp.Parallel(n_jobs=self.n_jobs)( skp.delayed(select_univariate_dist)( X=X[:, [i]], distribution_candidates=marginal_candidates, selection_criterion=self.selection_criterion, ) for i in range(n_assets) ) # Transform data to uniform marginals using the fitted CDF X = np.hstack( [ dist.cdf(X[:, [i]]) for i, dist in enumerate(self.marginal_distributions_) ] ) # Ensure values are within [0, 1]. if not np.all((X >= 0) & (X <= 1)): raise ValueError( "If `fit_marginals` is set to False, X must be in the interval " "`[0, 1]`, typically obtained via marginal CDF transformation." ) # Handle potential numerical issues by ensuring X doesn't contain exact 0 or 1. X = np.clip(X, UNIFORM_MARGINAL_EPSILON, 1 - UNIFORM_MARGINAL_EPSILON) trees: list[Tree] = [] for level in range(depth): if level == 0: # Initial tree: each asset is a node. tree = Tree( level=level, nodes=[ RootNode( ref=i, pseudo_values=X[:, i], central=i in self.central_assets_, ) for i in range(n_assets) ], ) else: # Subsequent trees: nodes represent edges from the previous tree. tree = Tree( level=level, nodes=[ChildNode(ref=edge) for edge in trees[-1].edges] ) # Set edges via Maximum Spanning Tree using the specified dependence method. tree.set_edges_from_mst(dependence_method=self.dependence_method) assert len(tree.edges) == n_assets - level - 1 # Fit bivariate copulas for each edge. copulas = skp.Parallel(n_jobs=self.n_jobs)( skp.delayed(select_bivariate_copula)( X=edge.get_X(), copula_candidates=copula_candidates, selection_criterion=self.selection_criterion, independence_level=self.independence_level, ) for edge in tree.edges ) for edge, copula in zip(tree.edges, copulas, strict=True): edge.copula = copula # Clear previous tree cache to free memory. if level > 0: trees[-1].clear_cache() trees.append(tree) # Clear last tree trees[-1].clear_cache() # Attach a node to each terminal edge (used for sampling). for edge in trees[-1].edges: ChildNode(ref=edge) self.trees_ = trees return self
[docs] def score_samples(self, X: npt.ArrayLike) -> np.ndarray: r"""Compute the log-likelihood of each sample (log-pdf) under the model. Parameters ---------- X : array-like of shape (n_observations, n_assets) Price returns of the assets. Returns ------- density : ndarray of shape (n_observations,) The log-likelihood of each sample under the fitted vine copula. """ skv.check_is_fitted(self) X = skv.validate_data(self, X, dtype=np.float64, reset=False) self.clear_cache() if np.any(self._log_transform): X = np.where(self._log_transform, np.log1p(X), X) if not self.fit_marginals: score_samples = np.zeros(len(X)) else: score_samples = np.sum( [ dist.score_samples(X[:, [i]]) for i, dist in enumerate(self.marginal_distributions_) ], axis=0, ) X = np.hstack( [ dist.cdf(X[:, [i]]) for i, dist in enumerate(self.marginal_distributions_) ] ) # Handle potential numerical issues by ensuring X doesn't contain exact 0 or 1. X = np.clip(X, UNIFORM_MARGINAL_EPSILON, 1 - UNIFORM_MARGINAL_EPSILON) for i, node in enumerate(self.trees_[0].nodes): node.pseudo_values = X[:, i] for tree in self.trees_: for edge in tree.edges: score_samples += edge.copula.score_samples(X=edge.get_X()) self.clear_cache() return score_samples
[docs] def sample( self, n_samples: int = 1, conditioning: dict[int | str : float | tuple[float, float] | npt.ArrayLike] | None = None, ) -> np.ndarray: """Generate random samples from the vine copula. This method generates `n_samples` from the fitted vine copula model. The resulting samples represent multivariate observations drawn according to the dependence structure captured by the vine copula. Parameters ---------- n_samples : int, default=1 Number of samples to generate. conditioning : dict[int | str, float | tuple[float, float] | array-like], optional A dictionary specifying conditioning information for one or more assets. The dictionary keys are asset indices or names, and the values define how the samples are conditioned for that asset. Three types of conditioning values are supported: 1. **Fixed value (float):** If a float is provided, all samples are generated under the condition that the asset takes exactly that value. 2. **Bounds (tuple of two floats):** If a tuple `(min_value, max_value)` is provided, samples are generated under the condition that the asset's value falls within the specified bounds. Use `-np.Inf` for no lower bound or `np.Inf` for no upper bound. 3. **Array-like (1D array):** If an array-like of length `n_samples` is provided, each sample is conditioned on the corresponding value in the array for that asset. **Important:** When using conditional sampling, it is recommended that the assets you condition on are set as central during the vine copula construction. This can be specified via the `central_assets` parameter in the vine copula instantiation. Returns ------- X : array-like of shape (n_samples, n_assets) A two-dimensional array where each row is a multivariate observation sampled from the vine copula. """ skv.check_is_fitted(self) self.clear_cache() n_assets = self.n_features_in_ rng, conditioning_vars, conditioning_clean, uniform_cond_samples = ( self._init_conditioning(n_samples=n_samples, conditioning=conditioning) ) # Determine sampling order based on vine structure. sampling_order = self._sampling_order(conditioning_vars=conditioning_vars) # Generate independent Uniform(0,1) samples for non-conditioned nodes. X_rand = rng.random(size=(n_assets - len(conditioning_vars), n_samples)) # Propagate samples through the vine structure bottom-up. # We perform a first pass by only recording the number of Node visits in # each Node without propagating any data. This count is used for optimally # clearing cache during sampling. with self._count_node_visits(): _propagate_samples( X_rand, sampling_order, conditioning_vars, uniform_cond_samples, ) # We now perform the second pass with full data propagation. _propagate_samples( X_rand, sampling_order, conditioning_vars, uniform_cond_samples, ) # Collect samples from the root tree. samples = np.stack([node.pseudo_values for node in self.trees_[0].nodes]).T self.clear_cache() # Avoid Inf samples = np.clip( samples, a_min=_UNIFORM_SAMPLE_EPSILON, a_max=1 - _UNIFORM_SAMPLE_EPSILON, ) # Transform samples back to the original scale using inverse CDF # (if marginals were fitted). if self.fit_marginals: samples = np.hstack( [ dist.ppf(samples[:, [i]]) for i, dist in enumerate(self.marginal_distributions_) ] ) # Reverse the log-return transformation if log_transform is True. if np.any(self._log_transform): # A known effect when sampling with log transform is that large sampling # log returns explode after applying the reverse log transform into # unrealistic simple returns. # A common approach is to apply Box-Cox transform above some threshold. # For financial returns, log returns above 200% are transformed linearly # instead of exponentially. box_cox_threshold = np.log(3) samples[:, self._log_transform] = np.where( samples[:, self._log_transform] <= box_cox_threshold, np.exp(samples[:, self._log_transform]) - 1, np.exp(box_cox_threshold) + (samples[:, self._log_transform] - box_cox_threshold) - 1, ) # To avoid Inf values and numerical instability, conditional values converted to # uniforms are clipped to [1e-14 , 1-1e-14]. This means that if the conditional # value has an extremely low probability, its final value will be bounded by # [ppf(1e-14) , ppf(1-1e-14)]. To keep conditional values accurate even for # extremely low probability, we force them back in the final samples. for var, cond in conditioning_clean.items(): if isinstance(cond, tuple): samples[:, var] = np.clip(samples[:, var], a_min=cond[0], a_max=cond[1]) else: samples[:, var] = cond return samples
[docs] def clear_cache(self, clear_count: bool = True): """Clear cached intermediate results in the vine trees.""" for tree in self.trees_: tree.clear_cache(clear_count=clear_count) for edge in self.trees_[-1].edges: edge.ref_node.clear_cache(clear_count=clear_count)
def _conditioning_count(self) -> np.ndarray: """Compute cumulative counts of conditioning set occurrences in the vine. Returns ------- conditioning_counts : ndarray of shape (n_trees, n_assets) Array of cumulative conditioning counts. """ n_assets = self.n_features_in_ conditioning_counts = np.cumsum( [ np.bincount( [s for edge in tree.edges for s in edge.cond_sets.conditioning], minlength=n_assets, ) for tree in self.trees_ ], axis=0, ) return conditioning_counts def _init_conditioning( self, n_samples: int, conditioning: dict[int | str : float | tuple[float, float] | npt.ArrayLike], ) -> tuple[ np.random.RandomState, set[int], dict[int, float], dict[int, np.ndarray] ]: """ Initialised conditioning variables used in the conditioning sampling. Parameters ---------- n_samples : int, default=1 Number of samples to generate. conditioning : dict[int | str, float | tuple[float, float] | array-like], optional A dictionary specifying conditioning information for one or more assets. The dictionary keys are asset indices or names, and the values define how the samples are conditioned for that asset. Three types of conditioning values are supported: 1. **Fixed value (float):** If a float is provided, all samples are generated under the condition that the asset takes exactly that value. 2. **Bounds (tuple of two floats):** If a tuple `(min_value, max_value)` is provided, samples are generated under the condition that the asset's value falls within the specified bounds. Use `-np.Inf` for no lower bound or `np.Inf` for no upper bound. 3. **Array-like (1D array):** If an array-like of length `n_samples` is provided, each sample is conditioned on the corresponding value in the array for that asset. Returns ------- random_sate : RandomState Numpy Random State. conditioning_vars : set[int] The conditioning variables. conditioning_clean : dict[int, float] The cleaned conditioning dictionary. uniform_cond_samples : dict[int, np.ndarray] The uniform conditioning samples corresponding to the `conditioning`. """ rng = sku.check_random_state(self.random_state) conditioning_vars = set() conditioning_clean = dict() uniform_cond_samples = dict() if conditioning is None: return rng, conditioning_vars, conditioning_clean, uniform_cond_samples if not isinstance(conditioning, dict): raise ValueError( "When provided, `conditioning` must be a dictionary mapping each asset " "to its corresponding conditioning value, " f"received {type(conditioning)}" ) n_assets = self.n_features_in_ conditioning_vars = validate_input_list( items=list(conditioning.keys()), n_assets=n_assets, assets_names=getattr(self, "feature_names_in_", None), name="conditioning", ) missing_central_vars = set(conditioning_vars).difference(self.central_assets_) if not set(conditioning_vars).issubset(set(range(n_assets))): raise ValueError( "The keys of `conditioning` must be asset indices or names " "from the input X." ) if len(conditioning_vars) >= n_assets: raise ValueError( "`conditioning` must be provided for strictly fewer assets " "than the total." ) if missing_central_vars: warnings.warn( "When performing conditional sampling, it is recommended to set " "conditioning assets as central during Vine Copula construction. " "The following conditioning assets were not set as central: " f"{missing_central_vars}. " "This can be achieved by using the `central_assets` parameter.", stacklevel=2, ) for var, value in zip(conditioning_vars, conditioning.values(), strict=True): if isinstance(value, tuple): if len(value) != 2: raise ValueError( "When a tuple is provided in `conditioning`, it must be" "of length 2, representing the conditioning bounds: " "(min_value, max_value)" ) min_value, max_value = value if min_value is None: min_value = -np.inf if max_value is None: max_value = np.inf if min_value >= max_value: raise ValueError( "The conditioning tuple lower bound must be lower than " "its upper bound." ) conditioning_clean[var] = (min_value, max_value) if self._log_transform[var]: if not np.isneginf(min_value): min_value = np.log1p(min_value) if not np.isposinf(max_value): max_value = np.log1p(max_value) if self.fit_marginals: # Transform the bounds using the marginal CDF dist = self.marginal_distributions_[var] u_min = 0.0 if np.isneginf(min_value) else dist.cdf(min_value) u_max = 1.0 if np.isposinf(max_value) else dist.cdf(max_value) else: u_min, u_max = min_value, max_value # Sample uniformly in the transformed interval. samples = rng.uniform(low=u_min, high=u_max, size=n_samples) elif np.isscalar(value): if not isinstance(value, numbers.Number): raise ValueError( f"Conditioning values should be numbers, got {value}" ) conditioning_clean[var] = value if self._log_transform[var]: value = np.log1p(value) if self.fit_marginals: # Transform conditioning value using the fitted marginal CDF. value = self.marginal_distributions_[var].cdf(value) samples = np.full(n_samples, value) else: samples = np.asarray(value) conditioning_clean[var] = samples if samples.ndim != 1 or samples.shape[0] != n_samples: raise ValueError( "When an array is provided in `conditioning`, it must be a " f"1D array of length n_samples={n_samples}, got {samples.ndim}D of " f"length {samples.shape[0]}" ) if self._log_transform[var]: samples = np.log1p(samples) if self.fit_marginals: # Transform conditioning samples using the fitted marginal CDF. samples = self.marginal_distributions_[var].cdf(samples) uniform_cond_samples[var] = samples conditioning_vars = set(conditioning_vars) return rng, conditioning_vars, conditioning_clean, uniform_cond_samples def _sampling_order( self, conditioning_vars: set[int] | None = None ) -> list[tuple[RootNode | ChildNode, bool]]: """ Determine the optimal sampling order for the vine copula. The sampling order is derived using a top-down elimination strategy that is analogous to finding a perfect elimination ordering for chordal graphs. In our vine copula, each conditional density is expressed as a product of bivariate copula densities and univariate margins. The algorithm starts with the deepest tree (i.e., the tree with the largest conditioning sets) and selects a variable from its conditioned set. This variable is then marginalized out, effectively generating a new sub-vine with a new deepest node. This process is repeated until the first tree level is reached and all n variables have been ordered. Choosing the optimal sampling order in this manner simplifies the inversion of conditional CDFs, thereby improving numerical stability. At each elimination step, among the candidate variables, the one that appears least frequently in the conditioning sets of shallower trees is chosen, ensuring that variables critical for conditional sampling occupy central roles. Parameters ---------- conditioning_vars : set[int], optional A set of asset indices for which conditioning samples are provided. If specified, these assets will be prioritized during the sampling order determination. Returns ------- sampling_order : list[tuple(Node, bool | None)] A list of tuples representing the optimal sampling order. Each tuple contains: - A Node object corresponding to an asset or an edge in the vine. - A boolean flag indicating whether the left branch is used for sampling at that node, or None if the node is the root. """ conditioning_vars = conditioning_vars or set() sampling_order: list[tuple[RootNode | ChildNode, bool | None]] = [] n_assets = self.n_features_in_ conditioning_counts = self._conditioning_count() edges = self.trees_[-1].edges if len(edges) == 1: edge = edges[0] is_left = _is_left_branch( edge=edge, conditioning_vars=conditioning_vars, conditioning_counts=conditioning_counts[-1], ) sampling_order.append((edge.ref_node, is_left)) else: # For truncated trees, select edges minimizing conditioning counts. remaining = edges visited: set[Edge] = set() prev_visited: set[Edge] = set() edge, is_left = None, None while remaining: selected = [] costs = [] for edge in remaining: c1 = len(edge.node1.edges - prev_visited) == 1 c2 = len(edge.node2.edges - prev_visited) == 1 if c1 or c2: if c1 and c2: # Last node is_left = _is_left_branch( edge=edge, conditioning_vars=conditioning_vars, conditioning_counts=conditioning_counts[-1], ) else: is_left = c1 costs.append( conditioning_counts[ -1, edge.cond_sets.conditioned[0 if is_left else 1] ] ) selected.append((edge.ref_node, is_left)) visited.add(edge) remaining = [x for x in remaining if x not in visited] # Sort selected nodes by cost and add to sampling order. ordered_selected = sorted( zip(costs, selected, strict=True), key=lambda pair: pair[0] ) sampling_order.extend(node for _, node in ordered_selected) prev_visited = visited.copy() # Marginalization: Peeling off the tree level by level. # We have two valid choices (two conditioned vars: left or right), # we chose the conditioned var with the less conditioning cost i = 2 while True: node = edge.node2 if is_left else edge.node1 if isinstance(node, RootNode): sampling_order.append((node, None)) break else: edge = node.ref is_left = _is_left_branch( edge=edge, conditioning_vars=conditioning_vars, conditioning_counts=conditioning_counts[-i], ) sampling_order.append((node, is_left)) i += 1 # Replace nodes with conditioning samples where applicable. if conditioning_vars: for i, (node, is_left) in enumerate(sampling_order): node_var = node.get_var(is_left) if is_left is not None else node.ref if node_var in conditioning_vars: sampling_order[i] = (self.trees_[0].nodes[node_var], None) sampling_order = sampling_order[::-1] if not (len(set(sampling_order)) == len(sampling_order) == n_assets): raise ValueError( "Sampling order computation failed: ordering is not unique or complete." ) return sampling_order @property def fitted_repr(self) -> str: """String representation of the fitted Vine Copula.""" skv.check_is_fitted(self) lines = [] if self.fit_marginals: lines.append("Root Nodes") lines.append("-" * 10) for node, dist in zip( self.trees_[0].nodes, self.marginal_distributions_, strict=True ): lines.append(f"{node}: {dist.fitted_repr}") lines.append("") for tree in self.trees_: lines.append(str(tree)) lines.append("-" * len(str(tree))) for edge in tree.edges: lines.append(str(edge)) lines.append("") result_string = "\n".join(lines) return result_string
[docs] def display_vine(self): """Display the vine trees and fitted copulas. Prints the structure of each tree and the details of each edge. """ print(self.fitted_repr)
[docs] def plot_marginal_distributions( self, X: npt.ArrayLike | None = None, conditioning: dict[int | str : float | tuple[float, float] | npt.ArrayLike] | None = None, subset: list[int | str] | None = None, n_samples: int = 500, title: str = "Vine Copula Marginal Distributions", ) -> go.Figure: """ Plot overlaid marginal distributions. Parameters ---------- X : array-like of shape (n_samples, n_assets), optional Historical data where each column corresponds to an asset. conditioning : dict[int | str, float | tuple[float, float] | array-like], optional A dictionary specifying conditioning information for one or more assets. The dictionary keys are asset indices or names, and the values define how the samples are conditioned for that asset. Three types of conditioning values are supported: 1. **Fixed value (float):** If a float is provided, all samples are generated under the condition that the asset takes exactly that value. 2. **Bounds (tuple of two floats):** If a tuple `(min_value, max_value)` is provided, samples are generated under the condition that the asset's value falls within the specified bounds. Use `-np.Inf` for no lower bound or `np.Inf` for no upper bound. 3. **Array-like (1D array):** If an array-like of length `n_samples` is provided, each sample is conditioned on the corresponding value in the array for that asset. **Important:** When using conditional sampling, it is recommended that the assets you condition on are set as central during the vine copula construction. This can be specified via the `central_assets` parameter in the vine copula instantiation. subset : list[int | str], optional Indices or names of assets to include in the plot. If None, all assets are used. n_samples : int, default=500 Number of samples used to control the density and readability of the plot. If `X` is provided and contains more than `n_samples` rows, a random subsample of size `n_samples` is selected. Conversely, if `X` has fewer rows than `n_samples`, the value is adjusted to match the number of rows in `X` to ensure balanced visualization. title : str, default="Vine Copula Marginal Distributions" The title for the plot. Returns ------- fig : plotly.graph_objects.Figure A figure with overlaid univariate distributions for each asset. """ n_assets = self.n_features_in_ subset = subset or list(range(n_assets)) colors = px.colors.qualitative.Plotly if X is not None: X = np.asarray(X) if X.ndim != 2: raise ValueError("X should be an 2D array") if X.shape[1] != n_assets: raise ValueError(f"X should have {n_assets} columns") if X.shape[0] > n_samples: # We subsample for improved graph readability rng = sku.check_random_state(self.random_state) indices = rng.choice( np.arange(X.shape[0]), size=n_samples, replace=False ) X = X[indices, :] else: # We want same proportion as X to have a balanced graph n_samples = X.shape[0] samples = self.sample(n_samples=n_samples, conditioning=conditioning) traces = [] for i, s in enumerate(subset): traces.append( _kde_trace( x=samples[:, s], opacity=1.0, color=colors[i % len(colors)], name=f"{self.feature_names_in_[s]} Generated", visible=True if i == 0 else "legendonly", ) ) if X is not None: for i, s in enumerate(subset): traces.append( _kde_trace( x=X[:, s], opacity=0.6, color=colors[i % len(colors)], name=f"{self.feature_names_in_[s]} Empirical", visible=True if i == 0 else "legendonly", ) ) fig = go.Figure(data=traces) fig.update_layout( title=title, xaxis_title="x", yaxis_title="Probability Density", ) fig.update_xaxes( tickformat=".0%", ) return fig
@contextlib.contextmanager def _count_node_visits(self): """A context manager to enable counting node visits within the tree. Temporarily enables node visit counting for the duration of the context. After the block is executed, the original state is restored. """ for tree in self.trees_: tree.is_count_visits = True try: yield finally: for tree in self.trees_: tree.is_count_visits = False self.clear_cache(clear_count=False)
def _is_left_branch( edge: Edge, conditioning_vars: set[int], conditioning_counts: np.ndarray ) -> bool: """ Determine whether the left branch should be followed during the elimination ordering (tree peeling). Parameters ---------- edge : Edge The edge for which to decide the branch. conditioning_vars : set[int] Set of asset indices with conditioning samples. conditioning_counts : np.ndarray Array of conditioning counts for each asset. Returns ------- bool True if the left branch is preferred, False otherwise. """ v1, v2 = edge.cond_sets.conditioned if v1 in conditioning_vars and v2 not in conditioning_vars: return False if v1 not in conditioning_vars and v2 in conditioning_vars: return True return conditioning_counts[v1] <= conditioning_counts[v2] def _propagate_samples(X_rand, sampling_order, conditioning_vars, uniform_cond_samples): """Propagate samples through the vine structure bottom-up following the elimination strategy (tree peeling) given by the Node orders and whether the next Node will on the right or left branch. If `is_count_visits` is activated, we only record the number of Node visits in each Node. This count is used for optimally clearing cache during sampling. """ is_count = sampling_order[0][0].tree.is_count_visits if not is_count: X_rand = iter(X_rand) # Initialize samples for each node according to the sampling order. queue: deque[tuple[RootNode | ChildNode, bool]] = deque() for node, is_left in sampling_order: node_var = node.get_var(is_left) if is_left is not None else node.ref if is_count: init_samples = np.array([np.nan]) else: if node_var in conditioning_vars: init_samples = uniform_cond_samples[node_var] else: init_samples = next(X_rand) # Avoid Inf init_samples = np.clip( init_samples, a_min=_UNIFORM_SAMPLE_EPSILON, a_max=1 - _UNIFORM_SAMPLE_EPSILON, ) if isinstance(node, RootNode): node.pseudo_values = init_samples else: queue.append((node, is_left)) if is_left: node.u = init_samples else: node.v = init_samples while queue: node, is_left = queue.popleft() edge = node.ref if isinstance(edge.node1, RootNode): if is_left: x = np.stack([node.u, edge.node2.pseudo_values]).T edge.node1.pseudo_values = _inverse_partial_derivative( edge, x, is_count ) else: x = np.stack([node.v, edge.node1.pseudo_values]).T edge.node2.pseudo_values = _inverse_partial_derivative( edge, x, is_count ) else: is_left1, is_left2 = edge.node1.ref.shared_node_is_left(edge.node2.ref) if is_left: x = np.stack([node.u, edge.node2.v if is_left2 else edge.node2.u]).T u = _inverse_partial_derivative(edge, x, is_count) if is_left1: edge.node1.v = u else: edge.node1.u = u queue.appendleft((edge.node1, not is_left1)) else: x = np.stack([node.v, edge.node1.v if is_left1 else edge.node1.u]).T u = _inverse_partial_derivative(edge, x, is_count) if is_left2: edge.node2.v = u else: edge.node2.u = u queue.appendleft((edge.node2, not is_left2)) def _inverse_partial_derivative( edge: Edge, X: np.ndarray, is_count: bool ) -> np.ndarray: """Inverse partial derivative of an Edge copula.""" if is_count: return np.array([np.nan]) return edge.copula.inverse_partial_derivative(X) def _kde_trace( x: np.ndarray, opacity: float, color: str, name: str, visible: bool ) -> go.Scatter: """Gaussian KDE line plot.""" kde = st.gaussian_kde(x) x = np.linspace(min(x), max(x), 500) return go.Scatter( x=x, y=kde(x), mode="lines", name=name, line=dict(color=color), fill="tozeroy", opacity=opacity, visible=visible, )