Source code for skfolio.population._population

"""Population module.
A population is a collection of portfolios.
"""

# Copyright (c) 2023
# Author: Hugo Delatte <delatte.hugo@gmail.com>
# License: BSD 3 clause

import inspect
from typing import Any

import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import scipy.interpolate as sci

import skfolio.typing as skt
from skfolio.measures import RatioMeasure
from skfolio.portfolio import BasePortfolio, MultiPeriodPortfolio
from skfolio.utils.sorting import non_denominated_sort
from skfolio.utils.tools import deduplicate_names, optimal_rounding_decimals


[docs] class Population(list): """Population Class. A `Population` is a list of :class:`~skfolio.portfolio.Portfolio` or :class:`~skfolio.portfolio.MultiPeriodPortfolio` or both. Parameters ---------- iterable : list[BasePortfolio] The list of portfolios. Each item can be of type :class:`~skfolio.portfolio.Portfolio` and/or :class:`~skfolio.portfolio.MultiPeriodPortfolio`. Empty list are accepted. """ def __init__(self, iterable: list[BasePortfolio]) -> None: super().__init__(self._validate_item(item) for item in iterable) def __repr__(self) -> str: return "<Population(" + super().__repr__() + ")>" def __getitem__( self, indices: int | list[int] | slice ) -> "BasePortfolio | Population": item = super().__getitem__(indices) if isinstance(item, list): return self.__class__(item) return item def __setitem__(self, index: int, item: BasePortfolio) -> None: super().__setitem__(index, self._validate_item(item)) def __add__(self, other: BasePortfolio) -> "Population": if not isinstance(other, Population): raise TypeError( f"Cannot add a Population with an object of type {type(other)}" ) return self.__class__(super().__add__(other))
[docs] def insert(self, index, item: BasePortfolio) -> None: """Insert portfolio before index.""" super().insert(index, self._validate_item(item))
[docs] def append(self, item: BasePortfolio) -> None: """Append portfolio to the end of the population list.""" super().append(self._validate_item(item))
[docs] def extend(self, other: BasePortfolio) -> None: """Extend population list by appending elements from the iterable.""" if isinstance(other, type(self)): super().extend(other) else: super().extend(self._validate_item(item) for item in other)
[docs] def set_portfolio_params(self, **params: Any) -> "Population": """Set the parameters of all the portfolios. Parameters ---------- **params : Any Portfolio parameters. Returns ------- self : Population The Population instance. """ if not params: return self init_signature = inspect.signature(BasePortfolio.__init__) # Consider the constructor parameters excluding 'self' valid_params = [ p.name for p in init_signature.parameters.values() if p.name != "self" and p.kind != p.VAR_KEYWORD ] for key in params: if key not in valid_params: raise ValueError( f"Invalid parameter {key!r} . " f"Valid parameters are: {valid_params!r}." ) for portfolio in self: for key, value in params.items(): setattr(portfolio, key, value)
@staticmethod def _validate_item( item: BasePortfolio, ) -> BasePortfolio: """Validate that items are of type Portfolio or MultiPeriodPortfolio.""" if isinstance(item, BasePortfolio): return item raise TypeError( "Population only accept items that inherit from BasePortfolio such as " "Portfolio or MultiPeriodPortfolio" f", got {type(item).__name__}" )
[docs] def non_denominated_sort(self, first_front_only: bool = False) -> list[list[int]]: """Fast non-dominated sorting. Sort the portfolios into different non-domination levels. Complexity O(MN^2) where M is the number of objectives and N the number of portfolios. Parameters ---------- first_front_only : bool, default=False If this is set to True, only the first front is sorted and returned. The default is `False`. Returns ------- fronts : list[list[int]] A list of Pareto fronts (lists), the first list includes non-dominated portfolios. """ n = len(self) if n > 0 and np.any( [ portfolio.fitness_measures != self[0].fitness_measures for portfolio in self ] ): raise ValueError( "Cannot compute non denominated sorting with Portfolios " "containing mixed `fitness_measures`" ) fitnesses = np.array([portfolio.fitness for portfolio in self]) fronts = non_denominated_sort( fitnesses=fitnesses, first_front_only=first_front_only ) return fronts
[docs] def filter( self, names: skt.Names | None = None, tags: skt.Tags | None = None ) -> "Population": """Filter the Population of portfolios by names and tags. If both names and tags are provided, the intersection is returned. Parameters ---------- names : str | list[str], optional If provided, the population is filtered by portfolio names. tags : str | list[str], optional If provided, the population is filtered by portfolio tags. Returns ------- population : Population A new population of portfolios filtered by names and tags. """ if tags is None and names is None: return self if isinstance(names, str): names = [names] if isinstance(tags, str): tags = [tags] if tags is None: return self.__class__( [portfolio for portfolio in self if portfolio.name in names] ) if names is None: return self.__class__( [portfolio for portfolio in self if portfolio.tag in tags] ) return self.__class__( [ portfolio for portfolio in self if portfolio.name in names and portfolio.tag in tags ] )
[docs] def measures( self, measure: skt.Measure, ) -> np.ndarray: """Vector of portfolios measures for each portfolio from the population. Parameters ---------- measure : Measure The portfolio measure. Returns ------- values : ndarray The vector of portfolios measures. """ return np.array([ptf.__getattribute__(measure.value) for ptf in self])
[docs] def measures_mean( self, measure: skt.Measure, ) -> float: """Mean of portfolios measures for each portfolio from the population. Parameters ---------- measure : Measure The portfolio measure. Returns ------- value : float The mean of portfolios measures. """ return self.measures(measure=measure).mean()
[docs] def measures_std( self, measure: skt.Measure, ) -> float: """Standard-deviation of portfolios measures for each portfolio from the population. Parameters ---------- measure : Measure The portfolio measure. Returns ------- value : float The standard-deviation of portfolios measures. """ return self.measures(measure=measure).std()
[docs] def sort_measure(self, measure: skt.Measure, reverse: bool = False) -> "Population": """Sort the population by a given portfolio measure. Parameters ---------- measure : Measure The portfolio measure. reverse : bool, default=False If this is set to True, the order is reversed. Returns ------- values : Populations The sorted population. """ return self.__class__( sorted( self, key=lambda x: x.__getattribute__(measure.value), reverse=reverse, ) )
[docs] def quantile( self, measure: skt.Measure, q: float, ) -> BasePortfolio: """Returns the portfolio corresponding to the `q` quantile for a given portfolio measure. Parameters ---------- measure : Measure The portfolio measure. q : float The quantile value. Returns ------- values : BasePortfolio Portfolio corresponding to the `q` quantile for the measure. """ if not 0 <= q <= 1: raise ValueError("The quantile`q` must be between 0 and 1") sorted_portfolios = self.sort_measure(measure=measure, reverse=False) k = max(0, int(np.round(len(sorted_portfolios) * q)) - 1) return sorted_portfolios[k]
[docs] def min_measure( self, measure: skt.Measure, ) -> BasePortfolio: """Returns the portfolio with the minimum measure. Parameters ---------- measure : Measure The portfolio measure. Returns ------- values : BasePortfolio The portfolio with minimum measure. """ return self.quantile(measure=measure, q=0)
[docs] def max_measure( self, measure: skt.Measure, ) -> BasePortfolio: """Returns the portfolio with the maximum measure. Parameters ---------- measure: Measure The portfolio measure. Returns ------- values : BasePortfolio The portfolio with maximum measure. """ return self.quantile(measure=measure, q=1)
[docs] def summary( self, formatted: bool = True, ) -> pd.DataFrame: """Summary of the portfolios in the population Parameters ---------- formatted : bool, default=True If this is set to True, the measures are formatted into rounded string with units. The default is `True`. Returns ------- summary : pandas DataFrame The population's portfolios summary """ df = pd.concat( [p.summary(formatted=formatted) for p in self], keys=[p.name for p in self], axis=1, ) return df
[docs] def composition( self, display_sub_ptf_name: bool = True, ) -> pd.DataFrame: """Composition of each portfolio in the population. Parameters ---------- display_sub_ptf_name : bool, default=True If this is set to True, each sub-portfolio name composing a multi-period portfolio is displayed. Returns ------- df : DataFrame Composition of the portfolios in the population. """ res = [] for ptf in self: comp = ptf.composition if display_sub_ptf_name: if isinstance(ptf, MultiPeriodPortfolio): comp.rename( columns={c: f"{ptf.name}_{c}" for c in comp.columns}, inplace=True, ) else: comp.rename(columns={c: ptf.name for c in comp.columns}, inplace=True) res.append(comp) df = pd.concat(res, axis=1) df.columns = deduplicate_names(list(df.columns)) df.fillna(0, inplace=True) return df
[docs] def contribution( self, measure: skt.Measure, spacing: float | None = None, display_sub_ptf_name: bool = True, ) -> pd.DataFrame: r"""Contribution of each asset to a given measure of each portfolio in the population. Parameters ---------- measure : Measure The measure used for the contribution computation. spacing : float, optional Spacing "h" of the finite difference: :math:`contribution(wi)= \frac{measure(wi-h) - measure(wi+h)}{2h}`. display_sub_ptf_name : bool, default=True If this is set to True, each sub-portfolio name composing a multi-period portfolio is displayed. Returns ------- df : DataFrame Contribution of each asset to a given measure of each portfolio in the population. """ res = [] for ptf in self: contribution = ptf.contribution( measure=measure, spacing=spacing, to_df=True ) if display_sub_ptf_name: if isinstance(ptf, MultiPeriodPortfolio): contribution.rename( columns={c: f"{ptf.name}_{c}" for c in contribution.columns}, inplace=True, ) else: contribution.rename( columns={c: ptf.name for c in contribution.columns}, inplace=True ) res.append(contribution) df = pd.concat(res, axis=1) df.columns = deduplicate_names(list(df.columns)) df.fillna(0, inplace=True) return df
[docs] def rolling_measure( self, measure: skt.Measure = RatioMeasure.SHARPE_RATIO, window: int = 30 ) -> pd.DataFrame: """Compute the measure over a rolling window for each portfolio in the population. Parameters ---------- measure : ct.Measure, default=RatioMeasure.SHARPE_RATIO The measure. The default measure is the Sharpe Ratio. window : int, default=30 The window size. The default value is `30` observations. Returns ------- dataframe : pandas DataFrame The rolling measures. """ rolling_measures = [] names = [] for ptf in self: rolling_measures.append(ptf.rolling_measure(measure=measure, window=window)) names.append(_ptf_name_with_tag(ptf)) df = pd.concat(rolling_measures, axis=1) df.columns = deduplicate_names(names) # Sort index because pd.concat unsort NaNs at the end df.sort_index(inplace=True) return df
[docs] def plot_distribution( self, measure_list: list[skt.Measure], tag_list: list[str] | None = None, n_bins: int | None = None, **kwargs, ) -> go.Figure: """Plot the population's distribution for each measure provided in the measure list. Parameters ---------- measure_list : list[Measure] The list of portfolio measures. A different distribution is plotted per measure. tag_list : list[str], optional If this is provided, an additional distribution is plotted per measure for each tag provided. n_bins : int, optional Sets the number of bins. Returns ------- plot : Figure Returns the plotly Figure object. """ values = [] labels = [] for measure in measure_list: if tag_list is not None: for tag in tag_list: values.append(self.filter(tags=tag).measures(measure=measure)) labels.append(f"{measure} - {tag}") else: values.append(self.measures(measure=measure)) labels.append(measure.value) df = pd.DataFrame(np.array(values).T, columns=labels).melt( var_name="Population" ) fig = px.histogram( df, color="Population", barmode="overlay", marginal="box", nbins=n_bins, **kwargs, ) fig.update_layout(title_text="Measures Distribution", xaxis_title="measures") return fig
[docs] def plot_cumulative_returns( self, log_scale: bool = False, idx: slice | np.ndarray | None = None, ) -> go.Figure: """Plot the population's portfolios cumulative returns. Non-compounded cumulative returns start at 0. Compounded cumulative returns are rescaled to start at 1000. Parameters ---------- log_scale : bool, default=False If this is set to True, the cumulative returns are displayed with a logarithm scale on the y-axis and rebased at 1000. The cumulative returns must be compounded otherwise an exception is raise. idx : slice | array, optional Indexes or slice of the observations to plot. The default (`None`) is to take all observations. Returns ------- plot : Figure Returns the plot Figure object. """ if idx is None: idx = slice(None) cumulative_returns = [] names = [] compounded = [] for ptf in self: cumulative_returns.append(ptf.cumulative_returns_df) names.append(_ptf_name_with_tag(ptf)) compounded.append(ptf.compounded) compounded = set(compounded) if len(compounded) == 2: raise ValueError( "Some portfolios cumulative returns are compounded while some " "are non-compounded. You can change the compounded with" "`population.set_portfolio_params(compounded=False)`", ) title = "Cumulative Returns" compounded = compounded.pop() if compounded: yaxis_title = f"{title} (rebased at 1000)" if log_scale: title = f"{title} (compounded & log scaled)" else: title = f"{title} (compounded)" else: if log_scale: raise ValueError( "Plotting with logarithm scaling must be done on cumulative " "returns that are compounded as opposed to non-compounded." "You can change to compounded with " "`set_portfolio_params(compounded=True)`" ) yaxis_title = title title = f"{title} (non-compounded)" df = pd.concat(cumulative_returns, axis=1).iloc[:, idx] # Sort index because pd.concat unsort NaNs at the end df.sort_index(inplace=True) df.columns = deduplicate_names(names) fig = df.plot(backend="plotly") fig.update_layout( title=title, xaxis_title="Observations", yaxis_title=yaxis_title, legend_title_text="Portfolios", ) if compounded: fig.update_yaxes(tickformat=".0f") else: fig.update_yaxes(tickformat=".2%") if log_scale: fig.update_yaxes(type="log") return fig
[docs] def plot_composition(self, display_sub_ptf_name: bool = True) -> go.Figure: """Plot the compositions of the portfolios in the population. Parameters ---------- display_sub_ptf_name : bool, default=True If this is set to True, each sub-portfolio name composing a multi-period portfolio is displayed. Returns ------- plot : Figure Returns the plotly Figure object. """ df = self.composition(display_sub_ptf_name=display_sub_ptf_name).T fig = px.bar(df, x=df.index, y=df.columns) fig.update_layout( title="Portfolios Composition", xaxis_title="Portfolios", yaxis={ "title": "Weight", "tickformat": ",.0%", }, legend=dict(yanchor="top", y=0.99, xanchor="left", x=1.15), ) return fig
[docs] def plot_contribution( self, measure: skt.Measure, spacing: float | None = None, display_sub_ptf_name: bool = True, ) -> go.Figure: """Plot the contribution of each asset to a given measure of the portfolios in the population. Parameters ---------- measure : Measure The measure used for the contribution computation. spacing : float, optional Spacing "h" of the finite difference: :math:`contribution(wi)= \frac{measure(wi-h) - measure(wi+h)}{2h}` display_sub_ptf_name : bool, default=True If this is set to True, each sub-portfolio name composing a multi-period portfolio is displayed. Returns ------- plot : Figure Returns the plotly Figure object. """ df = self.contribution( display_sub_ptf_name=display_sub_ptf_name, measure=measure, spacing=spacing ).T fig = px.bar(df, x=df.index, y=df.columns) yaxis = { "title": "Contribution", } if not measure.is_ratio: n = optimal_rounding_decimals(df.sum(axis=1).max()) yaxis["tickformat"] = f",.{n}%" fig.update_layout( title=f"{measure} Contribution", xaxis_title="Portfolios", yaxis=yaxis, legend=dict(yanchor="top", y=0.99, xanchor="left", x=1.15), ) return fig
[docs] def plot_measures( self, x: skt.Measure, y: skt.Measure, z: skt.Measure = None, to_surface: bool = False, hover_measures: list[skt.Measure] | None = None, show_fronts: bool = False, color_scale: skt.Measure | str | None = None, title="Portfolios", ) -> go.Figure: """Plot the 2D (or 3D) scatter points (or surface) of a given set of measures for each portfolio in the population. Parameters ---------- x : Measure The x-axis measure. y : Measure The y-axis measure. z : Measure, optional The z-axis measure. to_surface : bool, default=False If this is set to True, a surface is estimated. hover_measures : list[Measure], optional The list of measure to show on point hover. show_fronts : bool, default=False If this is set to True, the pareto fronts are highlighted. The default is `False`. color_scale : Measure | str, optional If this is provided, a color scale is displayed. title : str, default="Portfolios" The graph title. The default value is "Portfolios". Returns ------- plot : Figure Returns the plotly Figure object. """ num_fmt = ":.3f" hover_data = {x: num_fmt, y: num_fmt, "tag": True} if z is not None: hover_data[z] = num_fmt if hover_measures is not None: for measure in hover_measures: hover_data[measure] = num_fmt columns = list(hover_data) columns.append("name") if isinstance(color_scale, skt.Measure): hover_data[color_scale] = num_fmt if color_scale is not None and color_scale not in columns: columns.append(color_scale) col_values = [e.value if isinstance(e, skt.Measure) else e for e in columns] res = [ [portfolio.__getattribute__(attr) for attr in col_values] for portfolio in self ] # Improved formatting columns = [str(e) for e in columns] hover_data = {str(k): v for k, v in hover_data.items()} df = pd.DataFrame(res, columns=columns) df["tag"] = df["tag"].astype(str).replace("None", "") if show_fronts: fronts = self.non_denominated_sort(first_front_only=False) df["front"] = str(-1) for i, front in enumerate(fronts): for idx in front: df.iloc[idx, -1] = str(i) color = df.columns[-1] elif color_scale is not None: color = str(color_scale) else: color = "tag" if z is not None: if to_surface: # estimate the surface x_arr = np.array(df[str(x)]) y_arr = np.array(df[str(y)]) z_arr = np.array(df[str(z)]) xi = np.linspace(start=min(x_arr), stop=max(x_arr), num=100) yi = np.linspace(start=min(y_arr), stop=max(y_arr), num=100) X, Y = np.meshgrid(xi, yi) Z = sci.griddata( points=(x_arr, y_arr), values=z_arr, xi=(X, Y), method="cubic" ) fig = go.Figure( go.Surface( x=xi, y=yi, z=Z, hovertemplate="<br>".join( [ str(e) + ": %{" + v + ":" + (",.3%" if not e.is_ratio else None) + "}" for e, v in [(x, "x"), (y, "y"), (z, "z")] ] ) + "<extra></extra>", colorbar=dict( title=str(z), titleside="top", tickformat=",.2%" if not z.is_ratio else None, ), ) ) fig.update_layout( title=title, scene=dict( xaxis={ "title": str(x), "tickformat": ",.1%" if not x.is_ratio else None, }, yaxis={ "title": str(y), "tickformat": ",.1%" if not y.is_ratio else None, }, zaxis={ "title": str(z), "tickformat": ",.1%" if not z.is_ratio else None, }, ), ) else: # plot the points fig = px.scatter_3d( df, x=str(x), y=str(y), z=str(z), hover_name="name", hover_data=hover_data, color=color, symbol="tag", ) fig.update_traces(marker_size=8) fig.update_layout( title=title, scene=dict( xaxis={ "title": str(x), "tickformat": ",.1%" if not x.is_ratio else None, }, yaxis={ "title": str(y), "tickformat": ",.1%" if not y.is_ratio else None, }, zaxis={ "title": str(z), "tickformat": ",.1%" if not z.is_ratio else None, }, ), legend=dict(yanchor="top", y=0.99, xanchor="left", x=1.15), ) else: fig = px.scatter( df, x=str(x), y=str(y), hover_name="name", hover_data=hover_data, color=color, symbol="tag", ) fig.update_traces(marker_size=10) fig.update_layout( title=title, xaxis={ "title": str(x), "tickformat": ",.1%" if not x.is_ratio else None, }, yaxis={ "title": str(y), "tickformat": ",.1%" if not y.is_ratio else None, }, legend=dict(yanchor="top", y=0.96, xanchor="left", x=1.25), ) return fig
[docs] def plot_rolling_measure( self, measure: skt.Measure = RatioMeasure.SHARPE_RATIO, window: int = 30, ) -> go.Figure: """Plot the measure over a rolling window for each portfolio in the population. Parameters ---------- measure : ct.Measure, default = RatioMeasure.SHARPE_RATIO The measure. window : int, default=30 The window size. Returns ------- plot : Figure Returns the plot Figure object """ df = self.rolling_measure(measure=measure, window=window) fig = df.plot(backend="plotly") max_val = np.max(df) min_val = np.min(df) if max_val > 0 > min_val: fig.add_hrect( y0=0, y1=max_val * 1.3, line_width=0, fillcolor="green", opacity=0.1 ) fig.add_hrect( y0=min_val * 1.3, y1=0, line_width=0, fillcolor="red", opacity=0.1 ) yaxis = { "title": str(measure), } if not measure.is_ratio: n = optimal_rounding_decimals(max_val) yaxis["tickformat"] = f",.{n}%" fig.update_layout( title=f"Rolling {measure} - {window} observations window", xaxis_title="Observations", yaxis=yaxis, showlegend=False, ) return fig
def _ptf_name_with_tag(portfolio: BasePortfolio) -> str: if portfolio.tag is None: return portfolio.name return f"{portfolio.name}_{portfolio.tag}"