"""Hierarchical Clustering estimators."""

# Copyright (c) 2023
# Author: Hugo Delatte <>
# License: BSD 3 clause

from enum import auto

import numpy as np
import numpy.typing as npt
import plotly.graph_objects as go
import scipy.cluster.hierarchy as sch
import scipy.spatial.distance as scd
import sklearn.base as skb
import sklearn.utils.validation as skv
from plotly.figure_factory import create_dendrogram

from skfolio.utils.stats import assert_is_distance, compute_optimal_n_clusters
from import AutoEnum, default_asset_names

[docs] class LinkageMethod(AutoEnum): r"""Methods for calculating the distance between clusters in the linkage matrix. See the `Linkage Methods` section of `scipy.cluster.hierarchy.linkage` for full descriptions. Parameters ---------- SINGLE : str Assigns .. math:: d(u,v) = \min(dist(u[i],v[j])) for all points :math:`i` in cluster :math:`u` and :math:`j` in cluster :math:`v`. This is also known as the Nearest Point Algorithm. COMPLETE : str Assigns .. math:: d(u, v) = \max(dist(u[i],v[j])) for all points :math:`i` in cluster u and :math:`j` in cluster :math:`v`. This is also known by the Farthest Point Algorithm or Voor Hees Algorithm. AVERAGE : str Assigns .. math:: d(u,v) = \sum_{ij} \frac{d(u[i], v[j])}{(|u|*|v|)} for all points :math:`i` and :math:`j` where :math:`|u|` and :math:`|v|` are the cardinalities of clusters :math:`u` and :math:`v`, respectively. This is also called the UPGMA algorithm. WEIGHTED : str Assigns .. math:: d(u,v) = (dist(s,v) + dist(t,v))/2 where cluster u was formed with cluster s and t and v is a remaining cluster in the forest (also called WPGMA). CENTROID : str Assigns .. math:: dist(s,t) = ||c_s-c_t||_2 where :math:`c_s` and :math:`c_t` are the centroids of clusters :math:`s` and :math:`t`, respectively. This is also known as the UPGMC algorithm. MEDIAN : str assigns :math:`d(s,t)` like the ``centroid`` method. This is also known as the WPGMC algorithm. WARD : str Uses the Ward variance minimization algorithm. The new entry :math:`d(u,v)` is computed as follows, .. math:: d(u,v) = \sqrt{\frac{|v|+|s|} {T}d(v,s)^2 + \frac{|v|+|t|} {T}d(v,t)^2 - \frac{|v|} {T}d(s,t)^2} where :math:`u` is the newly joined cluster consisting of clusters :math:`s` and :math:`t`, :math:`v` is an unused cluster in the forest, :math:`T=|v|+|s|+|t|`, and :math:`|*|` is the cardinality of its argument. This is also known as the incremental algorithm. """ SINGLE = auto() COMPLETE = auto() AVERAGE = auto() WEIGHTED = auto() CENTROID = auto() MEDIAN = auto() WARD = auto()
[docs] class HierarchicalClustering(skb.ClusterMixin, skb.BaseEstimator): r"""Hierarchical Clustering. Parameters ---------- max_clusters : int, optional For coherent clustering, the algorithm finds a minimum threshold ``r`` so that the cophenetic distance between any two original observations in the same flat cluster is no more than ``r`` and no more than `max_clusters` flat clusters are formed. The default (`None`) is to estimate the maximal number of clusters based on the Two-Order Difference to Gap Statistic [1]_. linkage_method : LinkageMethod, default=LinkageMethod.WARD Methods for calculating the distance between clusters in the linkage matrix. See the `Linkage Methods` section of `scipy.cluster.hierarchy.linkage` for the full descriptions. The default is the Ward variance minimization algorithm `LinkageMethod.WARD`. Attributes ---------- n_clusters_ : int Number of formed clusters. labels_ : ndarray of shape (n_assets,) Labels of each asset. linkage_matrix_ : ndarray of shape (n_assets - 1, 4) Linkage matrix computed from the distance matrix of the `distance_estimator`. condensed_distance_ : ndarray of shape (\\binom{n_assets}{2}, ) The 1-D condensed distance matrix. n_features_in_ : int Number of assets seen during `fit`. feature_names_in_ : ndarray of shape (`n_features_in_`,) Names of assets seen during `fit`. Defined only when `X` has assets names that are all strings. References ---------- .. [1] "Application of two-order difference to gap statistic". Yue, Wang & Wei (2009) """ n_clusters_: int labels_: np.ndarray linkage_matrix_: np.ndarray condensed_distance_: np.ndarray def __init__( self, max_clusters: int | None = None, linkage_method: LinkageMethod = LinkageMethod.WARD, ): self.max_clusters = max_clusters self.linkage_method = linkage_method
[docs] def fit(self, X: npt.ArrayLike, y: None = None) -> "HierarchicalClustering": """Fit the Hierarchical Equal Risk Contribution estimator. Parameters ---------- X : array-like of shape (n_assets, n_assets) Distance matrix of the assets. y : Ignored Not used, present for API consistency by convention. Returns ------- self : HierarchicalClustering Fitted estimator. """ X = skv.validate_data(self, X) assert_is_distance(X) self.condensed_distance_ = scd.squareform(X, checks=False) self.linkage_matrix_ = sch.linkage( self.condensed_distance_, method=str(self.linkage_method.value), # Not needed for clustering, only for # visualization and can be slow. So we perform the optimal ordering only # in `plot_dendrogram`. optimal_ordering=False, ) max_clusters = self.max_clusters if max_clusters is None: max_clusters = compute_optimal_n_clusters( distance=X, linkage_matrix=self.linkage_matrix_, ) # Get the clusters from the linkage matrix labels_ = sch.fcluster( self.linkage_matrix_, t=max_clusters, criterion="maxclust" ) labels_ -= 1 # Start at 0 self.n_clusters_ = len(set(labels_)) assert self.n_clusters_ == max(labels_) + 1 <= max_clusters self.labels_ = labels_ return self
[docs] def plot_dendrogram(self, heatmap: bool = True) -> go.Figure: """Plot the dendrogram. The blue lines represent distinct clusters composed of a single asset. The remaining colors represent clusters of more than one asset. When `heatmap` is set to True, the heatmap of the reordered distance matrix is displayed below the dendrogram and clusters are outlined with yellow squares. The number of clusters used in the plot is the same as the `n_clusters_` attribute if it exists, otherwise a default number is used corresponding to the number of cluster with a distance above 70% of the maximum cluster distance. Parameters ---------- heatmap : bool, default=True If this is set to True, the distance heatmap is returned with the clustered outlined in yellow. Returns ------- fig : Figure The dendrogram figure. """ skv.check_is_fitted(self, "linkage_matrix_") linkage_matrix = sch.optimal_leaf_ordering( self.linkage_matrix_, self.condensed_distance_ ) n_assets = linkage_matrix.shape[0] + 1 cophenetic_distance_threshold = linkage_matrix[-(self.n_clusters_ - 1), 2] if hasattr(self, "feature_names_in_"): asset_names = self.feature_names_in_ else: asset_names = default_asset_names(n_assets=n_assets) if not heatmap: fig = create_dendrogram( np.ones(1), distfun=lambda x: None, linkagefun=lambda x: linkage_matrix, color_threshold=cophenetic_distance_threshold, labels=asset_names, ) fig.update_layout( title="Dendrogram", width=800, height=400, showlegend=False, hovermode="closest", xaxis={"title": "Assets"}, yaxis={"title": "Distance"}, ) return fig # Initialize figure by creating upper dendrogram fig = create_dendrogram( np.ones(1), orientation="bottom", distfun=lambda x: None, linkagefun=lambda x: linkage_matrix, color_threshold=cophenetic_distance_threshold, labels=asset_names, ) for i in range(len(fig["data"])): fig["data"][i]["yaxis"] = "y2" # Create Side Dendrogram side_dendrogram = create_dendrogram( np.ones(1), orientation="right", distfun=lambda x: None, linkagefun=lambda x: linkage_matrix, color_threshold=cophenetic_distance_threshold, labels=asset_names, ) for i in range(len(side_dendrogram["data"])): side_dendrogram["data"][i]["xaxis"] = "x2" # Add Side Dendrogram Data to Figure for data in side_dendrogram["data"]: fig.add_trace(data) # Create Heatmap ordered_asset_names = side_dendrogram["layout"]["yaxis"]["ticktext"] ordered_asset_names_idx = np.array( [np.argwhere(x == asset_names)[0][0] for x in ordered_asset_names] ) assert np.array_equal(asset_names[ordered_asset_names_idx], ordered_asset_names) distance = scd.squareform(self.condensed_distance_, checks=False) heat_data = distance[ordered_asset_names_idx, :][:, ordered_asset_names_idx] heatmap = [ go.Heatmap( x=ordered_asset_names, y=ordered_asset_names, z=heat_data, colorscale="Blues", name="", ) ] heatmap[0]["x"] = fig["layout"]["xaxis"]["tickvals"] heatmap[0]["y"] = side_dendrogram["layout"]["yaxis"]["tickvals"] # Add Heatmap Data to Figure for data in heatmap: fig.add_trace(data) # Outline clusters delta = heatmap[0]["x"][1] - heatmap[0]["x"][0] clusters_ids = self.labels_[ordered_asset_names_idx] for i in range(max(clusters_ids) + 1): c_ids = np.argwhere(clusters_ids == i).ravel() a = c_ids[0] * delta b = (c_ids[-1] + 1) * delta fig.add_shape( type="rect", x0=a, y0=a, x1=b, y1=b, line=dict( color="gold", width=2, ), ) fig.update_layout( title="Dendrogram", width=800, height=800, showlegend=False, hovermode="closest", xaxis={ "title": "Assets", "domain": [0.15, 1], "mirror": False, "showgrid": False, "showline": False, "zeroline": False, "ticks": "", }, xaxis2={ "domain": [0, 0.15], "mirror": False, "showgrid": False, "showline": False, "zeroline": False, "showticklabels": False, "ticks": "", }, yaxis={ "title": "Assets", "domain": [0, 0.85], "mirror": False, "showgrid": False, "showline": False, "zeroline": False, "showticklabels": False, "ticks": "", "tickvals": fig["layout"]["xaxis"]["tickvals"], "ticktext": fig["layout"]["xaxis"]["ticktext"], }, yaxis2={ "domain": [0.825, 0.975], "mirror": False, "showgrid": False, "showline": False, "zeroline": False, "showticklabels": False, "ticks": "", }, ) return fig