"""Entropy Pooling estimator."""
# Copyright (c) 2025
# Author: Hugo Delatte <delatte.hugo@gmail.com>
# Credits: Vincent Maladière, Matteo Manzi, Carlo Nicolini
# SPDX-License-Identifier: BSD-3-Clause
import operator
import re
import warnings
from collections.abc import Sequence
from typing import TYPE_CHECKING
import cvxpy as cp
import numpy as np
import numpy.typing as npt
import scipy.optimize as sco
import scipy.sparse.linalg as scl
import scipy.special as scs
import scipy.stats as sts
import sklearn.utils.metadata_routing as skm
import sklearn.utils.validation as skv
import skfolio.measures as sm
import skfolio.typing as skt
from skfolio.exceptions import SolverError
from skfolio.measures import (
ExtraRiskMeasure,
PerfMeasure,
RiskMeasure,
)
from skfolio.prior._base import BasePrior, ReturnDistribution
from skfolio.prior._empirical import EmpiricalPrior
from skfolio.utils.equations import equations_to_matrix
from skfolio.utils.tools import check_estimator, default_asset_names, input_to_array
[docs]
class EntropyPooling(BasePrior):
r"""Entropy Pooling estimator.
Entropy Pooling, introduced by Attilio Meucci in 2008 as a generalization of the
Black-Litterman framework, is a nonparametric method for adjusting a baseline
("prior") probability distribution to incorporate user-defined views by finding the
posterior distribution closest to the prior while satisfying those views.
User-defined views can be **elicited** from domain experts or **derived** from
quantitative analyses.
Grounded in information theory, it updates the distribution in the least-informative
way by minimizing the Kullback-Leibler divergence (relative entropy) under the
specified view constraints.
Mathematically, the problem is formulated as:
.. math::
\begin{aligned}
\min_{\mathbf{q}} \quad & \sum_{i=1}^T q_i \log\left(\frac{q_i}{p_i}\right) \\
\text{subject to} \quad & \sum_{i=1}^T q_i = 1 \quad \text{(normalization constraint)} \\
& \mathbb{E}_q[f_j(X)] = v_j \quad(\text{or } \le v_j, \text{ or } \ge v_j), \quad j = 1,\dots,k, \text{(view constraints)} \\
& q_i \ge 0, \quad i = 1, \dots, T
\end{aligned}
Where:
- :math:`T` is the number of observations (number of scenarios).
- :math:`p_i` is the prior probability of scenario :math:`x_i`.
- :math:`q_i` is the posterior probability of scenario :math:`x_i`.
- :math:`X` is the scenario matrix of shape (n_observations, n_assets).
- :math:`f_j` is the j :sup:`th` view function.
- :math:`v_j` is the target value imposed by the j :sup:`th` view.
- :math:`k` is the total number of views.
The `skfolio` implementation supports the following views:
* Equalities
* Inequalities
* Ranking
* Linear combinations (e.g. relative views)
* Views on groups of assets
On the following measures:
* Mean
* Variance
* Skew
* Kurtosis
* Correlation
* Value-at-Risk (VaR)
* Conditional Value-at-Risk (CVaR)
Notes
-----
Entropy Pooling re-weights the sample probabilities of the prior distribution and is
therefore constrained by the support (completeness) of that distribution. For
example, if the historical distribution contains no returns below -10% for a given
asset, we cannot impose a CVaR view of 15%: no matter how we adjust the sample
probabilities, such tail data do not exist.
Therefore, to impose extreme views on a sparse historical distribution, one must
generate synthetic data. In that case, the EP posterior is only as reliable as the
synthetic scenarios. It is thus essential to use a generator capable of
extrapolating tail dependencies, such as :class:`~skfolio.distribution.VineCopula`,
to model joint extreme events accurately.
Two methods are available:
* Dual form: solves the Fenchel dual of the EP problem using Truncated
Newton Constrained method.
* Primal form: solves the original relative-entropy projection in
probability-space via interior-point algorithms.
See the solver parameter's docstring for full details on available solvers and
options.
To handle nonlinear views, constraints are linearized by fixing the relevant asset
moments (e.g., means or variances) and then solved via **nested entropic tilting**.
At each stage, the KL-divergence is minimized relative to the original prior,
while nesting all previously enforced (linearized) views into the feasible set:
* Stage 1: impose views on asset means, VaR and CVaR.
* Stage 2: carry forward Stage 1 constraints and add variance, fixing the
mean at its Stage 1 value.
* Stage 3: carry forward Stage 2 constraints and add skewness, kurtosis and
pairwise correlations, fixing both mean and variance at their Stage 2 values.
Because each entropic projection nests the prior views, every constraint from
earlier stages is preserved as new ones are added, yielding a final distribution
that satisfies all original nonlinear views while staying as close as possible to
the original prior.
Only the necessary moments are fixed. Slack variables with an L1 norm penalty are
introduced to avoid solver infeasibility that may arise from overly tight
constraints.
CVaR view constraints cannot be directly expressed as linear functions of the
posterior probabilities. Therefore, when CVaR views are present, the EP problem is
solved by recursively solving a series of convex programs that approximate the
nonlinear CVaR constraint.
This implementation improves upon Meucci's algorithm [1]_ by formulating the problem
in continuous space as a function of the dual variables etas (VaR levels), rather
than searching over discrete tail sizes. This formulation not only handles the
CVaR constraint more directly but also supports multiple CVaR views on different
assets.
Although the overall problem is convex in the dual variables etas, it remains
non-smooth due to the presence of the positive-part operator in the CVaR
definition. Consequently, we employ derivative-free optimization methods.
Specifically, for a single CVaR view we use a one-dimensional root-finding
method (Brent's method), and for the multivariate case (supporting multiple
CVaR views) we use Powell's method for derivative-free convex descent.
Parameters
----------
prior_estimator : BasePrior, optional
Estimator of the asset's prior distribution, fitted from a
:ref:`prior estimator <prior>`. The default (`None`) is to use the
empirical prior :class:`~skfolio.prior.EmpiricalPrior()`. To perform Entropy
Pooling on synthetic data, you can use :class:`~skfolio.prior.SyntheticData`
by setting `prior_estimator = SyntheticData()`.
mean_views : list[str], optional
Views on asset means.
The views must match any of following patterns:
* `"ref1 >= a"`
* `"ref1 == b"`
* `"ref1 <= ref1"`
* `"ref1 >= a * prior(ref1)"`
* `"ref1 == b * prior(ref2)"`
* `"a * ref1 + b * ref2 + c <= d * ref3"`
With `"ref1"`, `"ref2"` ... the assets names or the groups names provided
in the parameter `groups`. Assets names can be referenced without the need of
`groups` if the input `X` of the `fit` method is a DataFrame with assets names
in columns. Otherwise, the default asset names `x0, x1, ...` are assigned.
By using the term `prior(...)`, you can reference the asset prior mean.
For example:
* `"SPX >= 0.0015"` --> The mean of SPX is greater than 0.15% (daily mean if
`X` is daily)
* `"SX5E == 0.002"` --> The mean of SX5E equals 0.2%
* `"AAPL <= 0.003"` --> The mean of AAPL is less than to 0.3%
* `"SPX <= SX5E"` --> Ranking view: the mean of SPX is less than SX5E
* `"SPX >= 1.5 * prior(SPX)"` --> The mean of SPX increases by at least 50% (versus its prior)
* `"SX5E == 2 * prior(SX5E)"` --> The mean of SX5E doubles (versus its prior)
* `"AAPL <= 0.8 * prior(SPX)"` --> The mean of AAPL is less than 0.8 times the SPX prior
* `"SX5E + SPX >= 0"` --> The sum of SX5E and SPX mean is greater than zero
* `"US == 0.007"` --> The sum of means of US assets equals 0.7%
* `"Equity == 3 * Bond"` --> The sum of means of Equity assets equals
three times the sum of means of Bond assets.
* `"2*SPX + 3*Europe <= Bond + 0.05"` --> Mixing assets and group mean views
variance_views : list[str], optional
Views on asset variances.
It supports the same patterns as `mean_views`.
For example:
* `"SPX >= 0.0009"` --> SPX variance is greater than 0.0009 (daily)
* `"SX5E == 1.5 * prior(SX5E)"` --> SX5E variance increases by 150% (versus
its prior)
skew_views : list[str], optional
Views on asset skews.
It supports the same patterns as `mean_views`.
For example:
* `"SPX >= 2.0"` --> SPX skew is greater than 2.0
* `"SX5E == 1.5 * prior(SX5E)"` --> SX5E skew increases by 150% (versus its
prior)
kurtosis_views : list[str], optional
Views on asset kurtosis.
It supports the same patterns as `mean_views`.
For example:
* `"SPX >= 9.0"` --> SPX kurtosis is greater than 9.0
* `"SX5E == 1.5 * prior(SX5E)"` --> SX5E kurtosis increases by 150% (versus
its prior)
correlation_views : list[str], optional
Views on asset correlations.
The views must match any of following patterns:
* `"(asset1, asset2) >= a"`
* `"(asset1, asset2) == a"`
* `"(asset1, asset2) <= a"`
* `"(asset1, asset2) >= a * prior(asset1, asset2)"`
* `"(asset1, asset2) == a * prior(asset1, asset2)"`
* `"(asset1, asset2) <= a * prior(asset1, asset2)"`
For example:
* `"(SPX, SX5E) >= 0.8"` --> the correlation between SPX and SX5E is greater than 80%
* `"(SPX, SX5E) == 1.5 * prior(SPX, SX5E)"` --> the correlation between SPX
and SX5E increases by 150% (versus its prior)
value_at_risk_views : list[str], optional
Views on asset Value-at-Risks (VaR).
For example:
* `"SPX >= 0.03"` --> SPX VaR is greater than 3%
* `"SX5E == 1.5 * prior(SX5E)"` --> SX5E VaR increases by 150% (versus its prior)
cvar_views : list[str], optional
Views on asset Conditional Value-at-Risks (CVaR).
It only supports equalities.
For example:
* `"SPX == 0.05"` --> SPX CVaR equals 5%
* `"SX5E == 1.5 * prior(SX5E)"` --> SX5E CVaR increases by 150% (versus its prior)
value_at_risk_beta : float, default=0.95
Confidence level for VaR views, by default 95%.
cvar_beta : float, default=0.95
Confidence level for CVaR views, by default 95%.
groups : dict[str, list[str]] or array-like of strings of shape (n_groups, n_assets), optional
Asset grouping for use in group-based views. If a dict is provided, keys are
asset names and values are lists of group labels; then `X` must be a DataFrame
whose columns match those asset names.
For example:
* `groups = {"SX5E": ["Equity", "Europe"], "SPX": ["Equity", "US"], "TLT": ["Bond", "US"]}`
* `groups = [["Equity", "Equity", "Bond"], ["Europe", "US", "US"]]`
solver : str, default="TNC"
The solver to use.
- "TNC" (default) solves the entropic-pooling dual via SciPy's Truncated Newton
Constrained method. By exploiting the smooth Fenchel dual and its
closed-form gradient, it operates in :math:`\mathbb{R}^k` (the number of
constraints) rather than :math:`\mathbb{R}^T` (the number of scenarios),
yielding an order-of-magnitude speedup over primal CVXPY interior-point
solvers.
- CVXPY solvers (e.g. "CLARABEL") solve the entropic-pooling problem in its
primal form using interior-point methods. While they tend to be slower than
the dual-based approach, they often achieve higher accuracy by enforcing
stricter primal feasibility and duality-gap tolerances. See the CVXPY
documentation for supported solvers:
https://www.cvxpy.org/tutorial/advanced/index.html#choosing-a-solver.
solver_params : dict, optional
Additional parameters to pass to the chosen solver.
- When using **SciPy TNC**, supported options include (but are not limited to)
`gtol`, `ftol`, `eps`, `maxfun`, `maxCGit`, `stepmx`, `disp`. See the SciPy
documentation for a full list and descriptions:
https://docs.scipy.org/doc/scipy/reference/optimize.minimize-tnc.html
- When using a **CVXPY** solver (e.g. ``"CLARABEL"``), supply any
solver-specific parameters here. Refer to the CVXPY solver guide for
details: https://www.cvxpy.org/tutorial/solvers
Attributes
----------
return_distribution_ : ReturnDistribution
Fitted :class:`~skfolio.prior.ReturnDistribution` to be used by the optimization
estimators, containing the assets distribution, moments estimation and the EP
posterior probabilities (sample weights).
relative_entropy_ : float
The KL-divergence between the posterior and prior distributions.
effective_number_of_scenarios_ : float
Effective number of scenarios defined as the perplexity of sample weight
(exponential of entropy).
prior_estimator_ : BasePrior
Fitted `prior_estimator`.
n_features_in_ : int
Number of assets seen during `fit`.
feature_names_in_ : ndarray of shape (`n_features_in_`,)
Names of features seen during `fit`. Defined only when `X`
has feature names that are all strings.
References
----------
.. [1] "Fully Flexible Extreme Views",
Journal of Risk, Meucci, Ardia & Keel (2011)
.. [2] "Fully Flexible Views: Theory and Practice",
Risk, Meucci (2013).
.. [3] "Effective Number of Scenarios in Fully Flexible Probabilities",
GARP Risk Professional, Meucci (2012)
.. [4] "I-Divergence Geometry of Probability Distributions and Minimization
Problems", The Annals of Probability, Csiszar (1975)
Examples
--------
For a full tutorial on entropy pooling, see :ref:`sphx_glr_auto_examples_entropy_pooling_plot_1_entropy_pooling.py`.
>>> from skfolio import RiskMeasure
>>> from skfolio.datasets import load_sp500_dataset
>>> from skfolio.preprocessing import prices_to_returns
>>> from skfolio.prior import EntropyPooling
>>> from skfolio.optimization import HierarchicalRiskParity
>>>
>>> prices = load_sp500_dataset()
>>> prices = prices[["AMD", "BAC", "GE", "JNJ", "JPM", "LLY", "PG"]]
>>> X = prices_to_returns(prices)
>>>
>>> groups = {
... "AMD": ["Technology", "Growth"],
... "BAC": ["Financials", "Value"],
... "GE": ["Industrials", "Value"],
... "JNJ": ["Healthcare", "Defensive"],
... "JPM": ["Financials", "Income"],
... "LLY": ["Healthcare", "Defensive"],
... "PG": ["Consumer", "Defensive"],
... }
>>>
>>> entropy_pooling = EntropyPooling(
... mean_views=[
... "JPM == -0.002",
... "PG >= LLY",
... "BAC >= prior(BAC) * 1.2",
... "Financials == 2 * Growth",
... ],
... variance_views=[
... "BAC == prior(BAC) * 4",
... ],
... correlation_views=[
... "(BAC,JPM) == 0.80",
... "(BAC,JNJ) <= prior(BAC,JNJ) * 0.5",
... ],
... skew_views=[
... "BAC == -0.05",
... ],
... cvar_views=[
... "GE == 0.08",
... ],
... cvar_beta=0.90,
... groups=groups,
... )
>>>
>>> entropy_pooling.fit(X)
EntropyPooling(correlation_views=...
>>>
>>> print(entropy_pooling.relative_entropy_)
0.18...
>>> print(entropy_pooling.effective_number_of_scenarios_)
6876.67...
>>> print(entropy_pooling.return_distribution_.sample_weight)
[0.000103... 0.000093... ... 0.000103... 0.000108...]
>>>
>>> # CVaR Hierarchical Risk Parity optimization on Entropy Pooling
>>> model = HierarchicalRiskParity(
... risk_measure=RiskMeasure.CVAR,
... prior_estimator=entropy_pooling
... )
>>> model.fit(X)
HierarchicalRiskParity(prior_estimator=...
>>> print(model.weights_)
[0.073... 0.0541... ... 0.200...]
>>>
>>> # Stress Test the Portfolio
>>> entropy_pooling = EntropyPooling(cvar_views=["AMD == 0.10"])
>>> entropy_pooling.fit(X)
EntropyPooling(cvar_views=['AMD == 0.10'])
>>>
>>> stressed_dist = entropy_pooling.return_distribution_
>>>
>>> stressed_ptf = model.predict(stressed_dist)
"""
relative_entropy_: float
effective_number_of_scenarios_: float
prior_estimator_: BasePrior
n_features_in_: int
feature_names_in_: np.ndarray
if TYPE_CHECKING:
_returns: np.ndarray
_prior_sample_weight: np.ndarray
_groups: np.ndarray
_is_fixed_mean: np.ndarray
_is_fixed_variance: np.ndarray
_constraints: dict[str, list[np.ndarray] | None]
def __init__(
self,
prior_estimator: BasePrior | None = None,
mean_views: list[str] | None = None,
variance_views: list[str] | None = None,
correlation_views: list[str] | None = None,
skew_views: list[str] | None = None,
kurtosis_views: list[str] | None = None,
value_at_risk_views: list[str] | None = None,
cvar_views: list[str] | None = None,
value_at_risk_beta: float = 0.95,
cvar_beta: float = 0.95,
groups: skt.Groups | None = None,
solver: str = "TNC",
solver_params: dict | None = None,
):
self.prior_estimator = prior_estimator
self.mean_views = mean_views
self.variance_views = variance_views
self.correlation_views = correlation_views
self.skew_views = skew_views
self.kurtosis_views = kurtosis_views
self.value_at_risk_views = value_at_risk_views
self.cvar_views = cvar_views
self.value_at_risk_beta = value_at_risk_beta
self.cvar_beta = cvar_beta
self.groups = groups
self.solver = solver
self.solver_params = solver_params
[docs]
def fit(self, X: npt.ArrayLike, y=None, **fit_params) -> "EntropyPooling":
"""Fit the Entropy Pooling estimator.
Parameters
----------
X : array-like of shape (n_observations, n_assets)
Price returns of the assets.
y : Ignored
Not used, present for API consistency by convention.
**fit_params : dict
Parameters to pass to the underlying estimators.
Only available if `enable_metadata_routing=True`, which can be
set by using ``sklearn.set_config(enable_metadata_routing=True)``.
See :ref:`Metadata Routing User Guide <metadata_routing>` for
more details.
Returns
-------
self : EntropyPooling
Fitted estimator.
"""
routed_params = skm.process_routing(self, "fit", **fit_params)
# Validation
skv.validate_data(self, X)
self.prior_estimator_ = check_estimator(
self.prior_estimator,
default=EmpiricalPrior(),
check_type=BasePrior,
)
# Fitting prior estimator
self.prior_estimator_.fit(X, y, **routed_params.prior_estimator.fit)
# Prior distribution
self._returns = self.prior_estimator_.return_distribution_.returns
n_observations, n_assets = self._returns.shape
self._prior_sample_weight = (
self.prior_estimator_.return_distribution_.sample_weight
)
if self._prior_sample_weight is None:
self._prior_sample_weight = np.ones(n_observations) / n_observations
assets_names = getattr(self, "feature_names_in_", default_asset_names(n_assets))
if self.groups is None:
self._groups = np.asarray([assets_names])
else:
self._groups = input_to_array(
items=self.groups,
n_assets=n_assets,
fill_value="",
dim=2,
assets_names=assets_names,
name="groups",
)
# Init problem variables
self._is_fixed_mean = np.zeros(n_assets, dtype=bool)
self._is_fixed_variance = np.zeros(n_assets, dtype=bool)
self._constraints = {
"equality": None,
"inequality": None,
"fixed_equality": None,
"cvar_equality": None,
}
# Step 1: Mean, VaR and CVaR
self._add_mean_views()
self._add_value_at_risk_views()
sample_weight = self._solve_with_cvar()
# Step 2: Mean, VaR, CVaR and Variance
if self.variance_views is not None:
# Get mean from Step 1
mean = sm.mean(self._returns, sample_weight=sample_weight)
# Add new views and solve
self._add_variance_views(mean=mean)
sample_weight = self._solve_with_cvar()
# Step 3: Mean, VaR, CVaR, Variance, Correlation, Skew and Kurtosis
if (
self.correlation_views is not None
or self.skew_views is not None
or self.kurtosis_views is not None
):
# Get mean and variance from Step 2
mean = sm.mean(self._returns, sample_weight=sample_weight)
variance = sm.variance(
self._returns, sample_weight=sample_weight, biased=True
)
# Add new views and solve
self._add_correlation_views(mean=mean, variance=variance)
self._add_skew_views(mean=mean, variance=variance)
self._add_kurtosis_views(mean=mean, variance=variance)
sample_weight = self._solve_with_cvar()
self.relative_entropy_ = float(
np.sum(scs.rel_entr(sample_weight, self._prior_sample_weight))
)
self.effective_number_of_scenarios_ = np.exp(sts.entropy(sample_weight))
self.return_distribution_ = ReturnDistribution(
mu=sm.mean(self._returns, sample_weight=sample_weight),
covariance=np.cov(self._returns, rowvar=False, aweights=sample_weight),
returns=self._returns,
sample_weight=sample_weight,
)
# Manage memory
del self._returns
del self._constraints
del self._prior_sample_weight
return self
def _add_constraint(self, a: np.ndarray, b: np.ndarray, name: str) -> None:
"""Add the left matrix `a` and right vector `b` of linear equality constraints
`x @ a == b` and linear inequality constraints `x @ a <= b` to the
`_constraints` dict.
Parameters
----------
a : ndarray of shape (n_observations, n_constraints)
Left matrix in `x @ a == b` or `x @ a <= b`.
a : ndarray of shape (n_observations, n_constraints)
Right vector in `x @ a == b` or `x @ a <= b`.
Returns
-------
None
"""
if b.size == 0:
return
# Init constraints dict
if self._constraints[name] is None:
n_observations, _ = self._returns.shape
self._constraints[name] = [np.empty((n_observations, 0)), np.empty(0)]
# Rescaling: views can be on different scales, by rescaling we avoid high
# disparity, have better conditioning, uniform stopping criteria and slack
# penalties.
scales = np.linalg.norm(a, axis=0)
a /= scales
b /= scales
for i, x in enumerate([a, b]):
self._constraints[name][i] = np.hstack((self._constraints[name][i], x))
def _add_mean_views(self) -> None:
"""Add mean view constraints to the optimization problem."""
if self.mean_views is None:
return
a_eq, b_eq, a_ineq, b_ineq = self._process_views(measure=PerfMeasure.MEAN)
for a, b, name in [(a_eq, b_eq, "equality"), (a_ineq, b_ineq, "inequality")]:
if a.size != 0:
self._add_constraint(a=self._returns @ a.T, b=b, name=name)
def _add_variance_views(self, mean: np.ndarray) -> None:
"""Add variance view constraints to the optimization problem.
Parameters
----------
mean : ndarray of shape (n_assets,)
The fixed mean vector used to compute variance as a linear function of
sample weights.
"""
if self.variance_views is None:
return
a_eq, b_eq, a_ineq, b_ineq = self._process_views(measure=RiskMeasure.VARIANCE)
n_observations, n_assets = self._returns.shape
fix = np.zeros(n_assets, dtype=bool)
for a, b, name in [(a_eq, b_eq, "equality"), (a_ineq, b_ineq, "inequality")]:
if a.size != 0:
self._add_constraint(
a=(self._returns - mean) ** 2 @ a.T, b=b, name=name
)
fix |= np.any(a != 0, axis=0)
self._fix_mean(fix=fix, mean=mean)
def _add_skew_views(self, mean: np.ndarray, variance: np.ndarray) -> None:
"""Add skew view constraints to the optimization problem.
Parameters
----------
mean : ndarray of shape (n_assets,)
The fixed mean vector used to compute skew as a linear function of sample
weights.
variance : ndarray of shape (n_assets,)
The fixed variance vector used to compute skew as a linear function of
sample weights.
"""
if self.skew_views is None:
return
a_eq, b_eq, a_ineq, b_ineq = self._process_views(measure=ExtraRiskMeasure.SKEW)
_, n_assets = self._returns.shape
fix = np.zeros(n_assets, dtype=bool)
for a, b, name in [(a_eq, b_eq, "equality"), (a_ineq, b_ineq, "inequality")]:
if a.size != 0:
self._add_constraint(
a=(self._returns**3 - mean**3 - 3 * mean * variance)
/ variance**1.5
@ a.T,
b=b,
name=name,
)
fix |= np.any(a != 0, axis=0)
# Fix the mean and the variance of the correlation views in order for the
# correlation to match exactly the views
self._fix_mean(fix=fix, mean=mean)
self._fix_variance(fix=fix, mean=mean, variance=variance)
def _add_kurtosis_views(self, mean: np.ndarray, variance: np.ndarray) -> None:
"""Add kurtosis view constraints to the optimization problem.
Parameters
----------
mean : ndarray of shape (n_assets,)
The fixed mean vector used to compute kurtosis as a linear function of
sample weights.
variance : ndarray of shape (n_assets,)
The fixed variance vector used to compute kurtosis as a linear function of
sample weights.
"""
if self.kurtosis_views is None:
return
a_eq, b_eq, a_ineq, b_ineq = self._process_views(
measure=ExtraRiskMeasure.KURTOSIS
)
_, n_assets = self._returns.shape
fix = np.zeros(n_assets, dtype=bool)
for a, b, name in [(a_eq, b_eq, "equality"), (a_ineq, b_ineq, "inequality")]:
if a.size != 0:
self._add_constraint(
a=(
(
self._returns**4
- 4 * mean * self._returns**3
+ 6 * mean**2 * self._returns**2
- 3 * mean**4
)
/ variance**2
@ a.T
),
b=b,
name=name,
)
fix |= np.any(a != 0, axis=0)
self._fix_mean(fix=fix, mean=mean)
self._fix_variance(fix=fix, mean=mean, variance=variance)
def _add_value_at_risk_views(self) -> None:
"""Add Value-at-Risk (VaR) view constraints to the optimization problem."""
if self.value_at_risk_views is None:
return
a_eq, b_eq, a_ineq, b_ineq = self._process_views(
measure=ExtraRiskMeasure.VALUE_AT_RISK
)
if (a_eq.size != 0 and np.any(np.count_nonzero(a_eq, axis=1) != 1)) or (
a_ineq.size != 0 and np.any(np.count_nonzero(a_ineq, axis=1) != 1)
):
raise ValueError(
"You cannot mix multiple assets in a single Value-at-Risk view."
)
if np.any(b_eq < 0) | np.any(a_ineq * b_ineq[:, np.newaxis] < 0):
raise ValueError("Value-at-Risk views must be strictly positive.")
n_observations, _ = self._returns.shape
for a, b, name in [(a_eq, b_eq, "equality"), (a_ineq, b_ineq, "inequality")]:
for ai, bi in zip(a, b, strict=True):
idx = np.where(self._returns[:, ai.astype(bool)].flatten() <= -abs(bi))[
0
]
if idx.size == 0:
raise ValueError(
f"The Value-at-Risk view of {bi:0.3%} is excessively extreme. "
"Consider lowering the view or adjusting your prior "
"distribution to include more extreme values."
)
sign = 1 if name == "equality" or bi > 0 else -1
ai = np.zeros(n_observations)
ai[idx] = 1.0
self._add_constraint(
a=sign * ai.reshape(-1, 1),
b=sign * np.array([1 - self.value_at_risk_beta]),
name=name,
)
def _add_correlation_views(self, mean: np.ndarray, variance: np.ndarray) -> None:
"""Add correlation view constraints to the optimization problem.
Parameters
----------
mean : ndarray of shape (n_assets,)
The fixed mean vector used to compute correlation as a linear function of
sample weights.
variance : ndarray of shape (n_assets,)
The fixed variance vector used to compute correlation as a linear function of
sample weights.
"""
if self.correlation_views is None:
return
assets = self._groups[0]
n_observations, n_assets = self._returns.shape
asset_to_index = {asset: i for i, asset in enumerate(assets)}
try:
views = []
for view in self.correlation_views:
res = _parse_correlation_view(view, assets=assets)
expression = res["expression"]
corr_view = expression["constant"]
if "prior_assets" in expression:
i, j = (asset_to_index[a] for a in expression["prior_assets"])
corr_view += (
np.corrcoef(self._returns[:, i], self._returns[:, j])[0][1]
* expression["multiplier"]
)
corr_view = np.clip(corr_view, 0 + 1e-8, 1 - 1e-8)
views.append(
(
(asset_to_index[a] for a in res["assets"]),
res["operator"],
corr_view,
)
)
except KeyError as e:
raise ValueError(f"Asset {e.args[0]} is missing from the assets.") from None
fix = np.zeros(n_assets, dtype=bool)
for (i, j), op, corr_view in views:
if not 0 <= corr_view <= 1:
raise ValueError("Correlation views must be between 0 and 1.")
ai = self._returns[:, i] * self._returns[:, j]
bi = mean[i] * mean[j] + corr_view * np.sqrt(variance[i] * variance[j])
sign = 1 if op in [operator.eq, operator.lt, operator.le] else -1
self._add_constraint(
a=sign * ai.reshape(-1, 1),
b=sign * np.array([bi]),
name="equality" if op == operator.eq else "inequality",
)
fix[[i, j]] = True
self._fix_mean(fix=fix, mean=mean)
self._fix_variance(fix=fix, mean=mean, variance=variance)
def _solve_with_cvar(self) -> np.ndarray:
"""Solve the entropy pooling problem handling CVaR view constraints.
CVaR view constraints cannot be directly expressed as linear functions of the
posterior probabilities. Therefore, when CVaR views are present, the EP problem
must be solved by recursively solving a series of convex programs that
approximate the non-linear CVaR constraint.
Our approach improves upon Meucci's algorithm [1]_ by formulating the problem
in continuous space as a function of the dual variables etas (VaR levels)
rather than searching over discrete tail sizes. This formulation not only
handles the CVaR constraint more directly but also supports multiple CVaR views
on different assets.
Although the overall problem is convex in the dual variables etas, it remains
non-smooth due to the presence of the positive-part operator in the CVaR
definition. Consequently, we employ derivative-free optimization methods.
Specifically, for a single CVaR view we use a one-dimensional root-finding
method (Brent's method), and for the multidimensional case (supporting multiple
CVaR views) we utilize Powell's method for derivative-free convex descent.
Returns
-------
sample_weight : ndarray of shape (n_observations,)
The updated probability vector satisfying all view constraints.
References
----------
.. [1] "Fully Flexible Extreme Views",
Journal of Risk, Meucci, Ardia & Keel (2011)
"""
if self.cvar_views is None:
sample_weight = self._solve()
return sample_weight
a_eq, b_eq, a_ineq, b_ineq = self._process_views(measure=RiskMeasure.CVAR)
if b_ineq.size != 0:
raise ValueError(
"CVaR view inequalities are not supported, use equalities views `==`"
)
if np.any(np.count_nonzero(a_eq, axis=1) != 1):
raise ValueError("You cannot mix multiple assets in a single CVaR view")
if np.any(b_eq < 0):
raise ValueError("CVaR view must be strictly positive")
n_observations, _ = self._returns.shape
asset_returns = self._returns[:, np.where(a_eq.sum(axis=0) != 0)[0]].T
views = b_eq
n_views = len(views)
min_ret = -np.min(asset_returns, axis=1)
invalid_views = views >= min_ret
if np.any(invalid_views):
msg = ""
for v, m in zip(views[invalid_views], min_ret[invalid_views], strict=True):
msg += (
"The CVaR views of "
+ ", ".join([f"{v:.2%}" for v in views[invalid_views]])
+ f" is excessively extreme and cannot exceed {m:.2%} which is the "
"worst realization. Consider lowering the view or adjusting your "
"prior distribution to include more extreme values."
)
raise ValueError(msg)
def func(etas: list[float]) -> tuple[np.ndarray, float]:
"""Solve the EP with CVaR constraints for a given list of etas (VaR levels).
Parameters
----------
etas : list[float]
The list of etas (VaR levels) of each asset with a CVaR view.
Returns
-------
sample_weight : ndarray of shape (n_observations,)
Sample weight of the CVaR EP problem.
error : float
For one-dimensional (a single eta), the error is the difference
between the target CVaR beta and the effective CVaR beta.
For multidimensional, the error is the RMSE of the difference between
the target CVaR and the effective CVaR.
"""
# Init CVaR constraints
self._constraints["cvar_equality"] = [
np.empty((n_observations, 0)),
np.empty(0),
]
pos_part = None
for i in range(n_views):
if not (0 < etas[i] < views[i]):
raise ValueError(
f"eta[{i}] must be between 0 and the CVaR view {views[i]}"
)
pos_part = np.maximum(-asset_returns[i] - etas[i], 0)
self._add_constraint(
a=(pos_part / (1 - self.cvar_beta)).reshape(-1, 1),
b=np.array([views[i] - etas[i]]),
name="cvar_equality",
)
w = self._solve()
if n_views == 1:
error = np.sum(w[pos_part != 0]) - (1 - self.cvar_beta)
else:
error = np.linalg.norm(
sm.cvar(asset_returns.T, beta=self.cvar_beta, sample_weight=w)
- views
) / np.sqrt(n_views)
return w, error
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="^Solution may be inaccurate")
# One-dimensional: we use one-dimensional root-finding method
if n_views == 1:
sol = sco.root_scalar(
lambda x: func([x])[-1],
bracket=[1e-3, views[0] - 1e-8],
method="brentq", # Doesn't required a smooth derivative
xtol=1e-4,
maxiter=50,
)
if not sol.converged:
raise RuntimeError(
"Failed to solve the CVaR view problem. Consider relaxing your "
"constraints or substituting with VaR views."
)
res = [sol.root]
# Multidimensional: we use derivative-free convex descent
else:
sol = sco.minimize(
lambda x: func(x)[-1],
x0=np.array([view * 0.5 for view in views]),
bounds=[(1e-3, view - 1e-8) for view in views],
method="Powell",
options={"xtol": 1e-4, "maxiter": 80},
)
if not sol.success:
raise ValueError(
"Failed to solve the multi-CVaR view problem. Consider "
"relaxing your constraints, using a single CVaR view, or "
"substituting with VaR views."
)
res = sol.x
sample_weight = func(res)[0]
return sample_weight
def _solve(self) -> np.ndarray:
"""Solve the base entropy pooling problem.
Dispatch either to the EP dual (TNC) solver or the EP primal (CVXPY) solver
based on the `solver` parameter.
Returns
-------
sample_weight : ndarray of shape (n_observations,)
The updated posterior probability vector.
"""
if all(v is None for v in self._constraints.values()):
# No view constraints so we don't need to solve the EP problem.
return self._prior_sample_weight
if self.solver == "TNC":
return self._solve_dual()
return self._solve_primal()
def _solve_dual(self) -> np.ndarray:
r"""Solves the entropic-pooling dual via SciPy's Truncated Newton
Constrained method. By exploiting the smooth Fenchel dual and its
closed-form gradient, it operates in :math:`\mathbb{R}^k` (the number of
constraints) rather than :math:`\mathbb{R}^T` (the number of scenarios),
yielding an order-of-magnitude speedup over primal CVXPY interior-point
solvers.
Returns
-------
sample_weight : ndarray of shape (n_observations,)
The updated posterior probability vector.
References
----------
.. [1] "Convex Optimization", Cambridge University Press, Section 5.2.3,
Entropy maximization, Boyd & Vandenberghe (2004)
.. [2] "I-Divergence Geometry of Probability Distributions and Minimization
Problems", The Annals of Probability, Csiszar (1975)
.. [3] "Fully Flexible Views: Theory and Practice",
Risk, Meucci (2013).
"""
n_observations, _ = self._returns.shape
# Init constraints with sum(p)==1, rescaled by its norm
# Has better convergence than the normalized form done inside the dual.
a = [np.ones(n_observations).reshape(-1, 1) / np.sqrt(n_observations)]
b = [np.array([1.0]) / np.sqrt(n_observations)]
bounds = [(None, None)]
for name, constraints in self._constraints.items():
if constraints is not None:
a.append(constraints[0])
b.append(constraints[1])
s = constraints[1].size
match name:
case "equality" | "cvar_equality":
bounds += [(None, None)] * s
case "fixed_equality":
# Equivalent to relaxing the problem with slack variables with
# a norm penalty to avoid solver infeasibility that may arise
# from overly tight constraints from fixing the moments.
bounds += [(-1000, 1000)] * s
case "inequality":
bounds += [(0, None)] * s
case _:
raise KeyError(f"constrain {name}")
a = np.hstack(a)
b = np.hstack(b)
def func(x: np.ndarray) -> tuple[float, np.ndarray]:
"""Computes the Fenchel dual of the entropic-pooling problem in its
unnormalized form.
Parameters
----------
x : ndarray of shape (n_constraints,)
Dual variables (Lagrange multipliers for each constraint).
Returns
-------
obj : float
Value of the dual objective.
grad : ndarray of shape (n_constraints,)
Gradient of the dual objective.
"""
z = self._prior_sample_weight * np.exp(-a @ x - 1)
obj = z.sum() + x @ b
grad = b - a.T @ z
return obj, grad
# We use TNC as it often outperforms L-BFGS-B on EP problems because
# it builds (implicitly) true curvature information via Hessian-vector products,
# whereas L-BFGS-B only ever uses a low-rank quasi-Newton approximation.
# We set stepmx=1 to avoid large solver defaults when the bounds are not None.
solver_params = (
self.solver_params
if self.solver_params is not None
else {
"maxfun": 5000,
"ftol": 1e-11,
"xtol": 1e-8,
"gtol": 1e-8,
"stepmx": 1,
}
)
sol = sco.minimize(
func,
x0=np.zeros_like(b),
jac=True,
bounds=bounds,
method="TNC",
options=solver_params,
)
if not sol.success:
raise SolverError(
"Dual problem with Solver 'TNC' failed. This typically occurs when the "
"specified views conflict or are overly extreme. Consider using a "
"prior that generates more synthetic data for extreme views. You can "
"also change `solver_params` or try another `solver` such as "
f"'CLARABEL'. Solver error: {sol.message}"
)
sample_weight = self._prior_sample_weight * np.exp(-1 - a @ sol.x)
# Handles numerical precision errors
sample_weight = np.clip(sample_weight, 0, 1)
sample_weight /= sample_weight.sum()
return sample_weight
def _solve_primal(self) -> np.ndarray:
"""Solve the base entropy-pooling problem in its primal form by minimizing KL
divergence to the prior.
Returns
-------
sample_weight : ndarray of shape (n_observations,)
The updated posterior probability vector.
"""
n_observations, _ = self._returns.shape
solver_params = self.solver_params if self.solver_params is not None else {}
posterior = cp.Variable(n_observations) # Posterior probas
objective = cp.sum(cp.kl_div(posterior, self._prior_sample_weight))
constraints = [posterior >= 0, cp.sum(posterior) == 1]
if self._constraints["equality"] is not None:
a, b = self._constraints["equality"]
constraints.append(posterior @ a - b == 0)
if self._constraints["inequality"] is not None:
a, b = self._constraints["inequality"]
constraints.append(posterior @ a - b <= 0)
if self._constraints["cvar_equality"] is not None:
a, b = self._constraints["cvar_equality"]
constraints.append(posterior @ a - b == 0)
if self._constraints["fixed_equality"] is not None:
a, b = self._constraints["fixed_equality"]
# Relax the problem with slack variables with a norm1 penalty to avoid
# solver infeasibility that may arise from overly tight constraints from
# fixing the moments.
slack = cp.Variable(b.size)
constraints.append(posterior @ a - b == slack)
objective += 1e5 * cp.norm1(slack)
problem = cp.Problem(cp.Minimize(objective), constraints)
try:
# We suppress cvxpy warning as it is redundant with our warning
with warnings.catch_warnings():
warnings.simplefilter("ignore")
problem.solve(solver=self.solver, **solver_params)
if posterior.value is None:
raise cp.SolverError
if problem.status != cp.OPTIMAL:
warnings.warn(
"Solution may be inaccurate. Try changing the solver params. For "
"more details, set `solver_params=dict(verbose=True)`",
stacklevel=2,
)
# Handles numerical precision errors
sample_weight = np.clip(posterior.value, 0, 1)
sample_weight /= sample_weight.sum()
return sample_weight
except (cp.SolverError, scl.ArpackNoConvergence):
raise SolverError(
f"Primal problem with Solver '{self.solver}' failed. This typically "
"occurs when the specified views conflict or are overly extreme. "
"Consider using a prior that generates more synthetic data for extreme "
"views. You can also change `solver_params` or try another `solver` "
"such as 'TNC'."
) from None
def _process_views(self, measure: PerfMeasure | RiskMeasure | ExtraRiskMeasure):
"""Process and convert view equations into constraint matrices.
This method uses the provided view strings and groups to generate the equality
and inequality matrices (a_eq, b_eq, a_ineq, b_ineq) needed to formulate the
constraints. Prior asset expressions are replaced using the current prior.
Parameters
----------
measure : {PerfMeasure, RiskMeasure, ExtraRiskMeasure}
The type of view measure to process.
Returns
-------
a_eq, b_eq, a_ineq, b_ineq : tuple of ndarray
Matrices and vectors corresponding to equality and inequality constraints.
"""
assets = self._groups[0]
name = f"{measure.value}_views"
views = getattr(self, name)
views = np.asarray(views)
if views.ndim != 1:
raise ValueError(f"{name} must be a list of strings")
measure_func = getattr(sm, str(measure.value))
measure_args = {}
if measure == RiskMeasure.VARIANCE:
measure_args["biased"] = True
required_prior_assets = _extract_prior_assets(views, assets=assets)
if len(required_prior_assets) != 0:
prior_values = {
k: measure_func(self._returns[:, i], **measure_args)
for i, k in enumerate(assets)
if k in required_prior_assets
}
views = _replace_prior_views(views=views, prior_values=prior_values)
a_eq, b_eq, a_ineq, b_ineq = equations_to_matrix(
groups=self._groups,
equations=views,
raise_if_group_missing=True,
names=("groups", "views"),
)
return a_eq, b_eq, a_ineq, b_ineq
def _fix_mean(self, fix: np.ndarray, mean: np.ndarray) -> None:
"""Add constraints to fix the mean for assets where view constraints have been
applied.
The method introduces slack variables to avoid solver infeasibility from
conflicting tight constraints.
Parameters
----------
fix : ndarray of bool of shape (n_assets,)
Boolean mask indicating which assets to fix.
mean : ndarray of shape (n_assets,)
Fixed mean values for the assets.
"""
fix &= ~self._is_fixed_mean
if np.any(fix):
self._add_constraint(
a=self._returns[:, fix], b=mean[fix], name="fixed_equality"
)
self._is_fixed_mean |= fix
def _fix_variance(
self, fix: np.ndarray, mean: np.ndarray, variance: np.ndarray
) -> None:
"""Add constraints to fix the variance for assets where view constraints have
been applied.
The method introduces slack variables to avoid solver infeasibility from
conflicting tight constraints.
Parameters
----------
fix : ndarray of bool of shape (n_assets,)
Boolean mask indicating which assets to fix.
mean : ndarray of shape (n_assets,)
Fixed mean values used for the linearization of the variance.
variance : np.ndarray
Fixed variance values.
"""
fix &= ~self._is_fixed_variance
if np.any(fix):
self._add_constraint(
a=(self._returns[:, fix] - mean[fix]) ** 2,
b=variance[fix],
name="fixed_equality",
)
self._is_fixed_variance |= fix
def _extract_prior_assets(views: Sequence[str], assets: Sequence[str]) -> set[str]:
"""
Given a list of views, return a set of asset names referenced within any
'prior(ASSET)' pattern. Only asset names in the provided 'assets' list
will be recognized.
Supported format:
- "prior(ASSET)"
- "prior(ASSET) * a"
- "a * prior(ASSET)"
Parameters
----------
views : list[str]
The list of views to scan.
assets : list[str]
The list of allowed asset names.
Returns
-------
prior_assets : set[str]
Set of asset names that appear in prior() pattern.
"""
allowed_assets = "|".join(re.escape(asset) for asset in assets)
pattern = r"prior\(\s*(" + allowed_assets + r")\s*\)"
prior_assets = set()
for view in views:
matches = re.findall(pattern, view)
prior_assets.update(matches)
return set(prior_assets)
def _replace_prior_views(
views: Sequence[str], prior_values: dict[str, float]
) -> list[str]:
"""
Replace occurrences of the below prior patterns using `prior_values`.
Supported patterns:
- "prior(ASSET)"
- "prior(ASSET) * a"
- "a * prior(ASSET)"
Parameters
----------
views : list[str]
The list of views.
prior_values : dict[str, float]
A dictionary mapping asset names (str) to their prior values.
Returns
-------
views : list[str]
The views with each prior pattern instance replaced by its computed value.
"""
# Build a regex pattern for allowed asset names.
allowed_assets = "|".join(re.escape(asset) for asset in prior_values)
# Pattern captures:
# - An optional multiplier before: ([0-9\.]+)\s*\*\s*
# - The prior() function with an allowed asset: prior\(\s*(allowed_asset)\s*\)
# - An optional multiplier after: \s*\*\s*([0-9\.]+)
pattern = (
r"(?:([0-9\.]+)\s*\*\s*)?" # Optional pre-multiplier
r"prior\(\s*(" + allowed_assets + r")\s*\)" # prior(ASSET) with allowed asset
r"(?:\s*\*\s*([0-9\.]+))?" # Optional post-multiplier
)
def repl(match) -> str:
pre_multiplier = float(match.group(1)) if match.group(1) else 1.0
asset = match.group(2)
post_multiplier = float(match.group(3)) if match.group(3) else 1.0
result_value = prior_values[asset] * pre_multiplier * post_multiplier
return str(result_value)
new_views = [re.sub(pattern, repl, view) for view in views]
# After substitution, check for any unresolved 'prior(ASSET)'
unresolved_pattern = r"prior\(\s*([^)]+?)\s*\)"
for view in new_views:
m = re.search(unresolved_pattern, view)
if m is not None:
missing_asset = m.group(1)
raise ValueError(
"Unresolved 'prior' expression found in view. "
f"Asset '{missing_asset}' is not available in prior_values."
)
return new_views
def _parse_correlation_view(view: str, assets: Sequence[str]) -> dict:
"""
Parse a correlation view and return its structured representation.
The view string should follow one of the formats:
(asset1, asset2) <operator> <float>
(asset1, asset2) <operator> <float> * prior(asset3, asset4) * <float> + <float>
Only asset names from the provided `assets` list are accepted.
If the view contains a "prior(...)" expression, it is replaced by its computed value
using multipliers and constant additions. If the asset referenced in any prior
expression is not allowed, an error is raised.
Parameters
----------
view : str
The correlation view string.
assets : list[str]
A list of allowed asset names.
Returns
-------
views : dict
A dictionary with keys:
- "assets": tuple(asset1, asset2)
- "operator": the comparison operator (==, >=, <=, >, or <)
- "expression": a dict representing the right-hand side expression.
* If there is no prior expression, "expression" contains {"constant": value}.
* If a prior expression is present, it returns a dict with keys:
"prior_assets": tuple(asset3, asset4),
"multiplier": overall multiplier (default 1.0),
"constant": constant term (default 0.0).
"""
operator_map = {
">": operator.gt,
">=": operator.ge,
"<": operator.lt,
"<=": operator.le,
"==": operator.eq,
}
# Build a regex pattern that only accepts the allowed asset names.
allowed_assets = "|".join(re.escape(asset) for asset in assets)
main_pattern = (
r"^\(\s*(" + allowed_assets + r")\s*,\s*(" + allowed_assets + r")\s*\)\s*"
r"(==|>=|<=|>|<)\s*(.+)$"
)
main_match = re.match(main_pattern, view)
if not main_match:
raise ValueError(
f"Invalid correlation view format or unknown asset in view: {view}"
)
asset1, asset2, operator_str, expression = main_match.groups()
expression = expression.strip()
if "prior(" not in expression:
try:
constant = float(expression)
except ValueError as e:
raise ValueError(
f"Could not convert constant '{expression}' to float in view: {view}"
) from e
parsed_expression = {"constant": constant}
else:
# Pattern to capture:
# - An optional pre multiplier followed by "*" (e.g. "2 *")
# - The pattern prior( asset3 , asset4 )
# - An optional post multiplier preceded by "*" (e.g. "* 3")
# - An optional constant preceded by "+" (e.g. "+ 4")
prior_pattern = (
r"^(?:\s*([A-Za-z0-9_.-]+)\s*\*\s*)?" # Optional pre-multiplier
r"prior\(\s*("
+ allowed_assets
+ r")\s*,\s*("
+ allowed_assets
+ r")\s*\)" # prior(asset3, asset4)
r"(?:\s*\*\s*([A-Za-z0-9_.-]+))?" # Optional post-multiplier
r"(?:\s*\+\s*([A-Za-z0-9_.-]+))?" # Optional constant addition
r"$"
)
prior_match = re.match(prior_pattern, expression)
if not prior_match:
raise ValueError(
"Invalid prior expression format or unknown asset in expression: "
f"{expression}"
)
(pre_mult, asset3, asset4, post_mult, constant) = prior_match.groups()
try:
pre_mult = float(pre_mult) if pre_mult is not None else 1.0
except ValueError as e:
raise ValueError(
f"Invalid pre-multiplier '{pre_mult}' in view: {view}"
) from e
try:
post_mult = float(post_mult) if post_mult is not None else 1.0
except ValueError as e:
raise ValueError(
f"Invalid post-multiplier '{post_mult}' in view: {view}"
) from e
try:
constant = float(constant) if constant is not None else 0.0
except ValueError as e:
raise ValueError(f"Invalid constant '{constant}' in view: {view}") from e
parsed_expression = {
"prior_assets": (asset3, asset4),
"multiplier": pre_mult * post_mult,
"constant": constant,
}
return {
"assets": (asset1, asset2),
"operator": operator_map[operator_str],
"expression": parsed_expression,
}