Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions climada/engine/impact.py
Original file line number Diff line number Diff line change
Expand Up @@ -2318,6 +2318,8 @@ def interpolate(
min_impact=0,
bin_decimals=None,
y_asymptotic=0.0,
threshold_percentile=90,
min_sample_size=20,
):
"""Interpolate and extrapolate impact frequency curve using different methods.

Expand All @@ -2328,9 +2330,12 @@ def interpolate(

method : str, optional
Method to interpolate to new return periods. Currently available are "interpolate",
"extrapolate", "extrapolate_constant" and "stepfunction". If set to "interpolate",
return periods outside the range of the Impact object's observed return periods
will be assigned NaN. If set to "extrapolate_constant" or "stepfunction",
"fit_GPD", "extrapolate", "extrapolate_constant", and "stepfunction". If set to
"interpolate", return periods outside the range of the Impact object's observed return
periods will be assigned NaN. If set to "fit_GPD", a generalized Pareto distribution
is fitted to the tail of the data. The tail is defined by the percentile value
corresponding to threshold_percentile after first filtering out all impacts below
min_impact. If set to "extrapolate_constant" or "stepfunction",
return periods larger than the Impact object's observed return periods will be
assigned the largest impact, and return periods smaller than the Impact object's
observed return periods will be assigned 0. If set to "extrapolate",
Expand All @@ -2354,6 +2359,13 @@ def interpolate(
Has no effect if method is "interpolate". Else, if data size < 2 or if method
is set to "extrapolate_constant" or "stepfunction", it provides return value for
exceeded impact for return periods smaller than the data range. Defaults to 0.
threshold_percentile : float, optional
Has only effect if method is "fit_GPD". Defined percentile to set threshold in
GPD peak-over-threshold, after first filtering out all impacts below min_impact.
Defaults to 90.
min_sample_size : int, optinal
Has only effect if method is "fit_GPD". Minimal sample size to require for fitting
GPD to the tail. Defaults to 20.

Returns
-------
Expand All @@ -2364,6 +2376,8 @@ def interpolate(
--------
util.interpolation.preprocess_and_interpolate_ev :
inter- and extrapolation method
util.interpolation.fit_tail_GPD :
method to fit a GPD distribution to the tail
"""
exceedance_frequency = 1 / np.array(return_periods)

Expand All @@ -2372,18 +2386,31 @@ def interpolate(
impacts = np.squeeze(np.array(self.impact)[sorted_idxs])
rps = np.asarray(self.return_per)[sorted_idxs]
frequency = np.diff(1 / np.array(rps)[::-1], prepend=0)[::-1]
impact_interpolated = u_interp.preprocess_and_interpolate_ev(
exceedance_frequency,
None,
frequency,
impacts,
log_frequency=log_frequency,
log_values=log_impact,
value_threshold=min_impact,
method=method,
y_asymptotic=y_asymptotic,
bin_decimals=bin_decimals,
)

if method == "fit_GPD":
_, impact_interpolated, _ = u_interp.fit_tail_GPD(
exceedance_frequency,
None,
frequency,
impacts,
value_threshold=min_impact,
min_sample_size=min_sample_size,
threshold_percentile=threshold_percentile,
)

else:
impact_interpolated = u_interp.preprocess_and_interpolate_ev(
exceedance_frequency,
None,
frequency,
impacts,
log_frequency=log_frequency,
log_values=log_impact,
value_threshold=min_impact,
method=method,
y_asymptotic=y_asymptotic,
bin_decimals=bin_decimals,
)

return ImpactFreqCurve(
return_per=return_periods,
Expand Down
158 changes: 158 additions & 0 deletions climada/util/interpolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import numpy as np
from scipy import interpolate
from scipy.optimize import minimize
from scipy.stats import genextreme, genpareto

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -405,3 +407,159 @@ def _group_frequency(frequency, value, bin_decimals):
return frequency, value_unique

return frequency, value


def _gpd_distribution(values, xi, beta, lambda_u, threshold):
"""
Survival function (1-CDF) using a generalized Pareto distribution for the conditional
distribution of the tail and a probability of threhsold exceedance (lambda_u).
See https://en.wikipedia.org/wiki/Generalized_Pareto_distribution
"""
values = np.asarray(values)
return lambda_u * (1 + xi * (values - threshold) / beta) ** (-1 / xi)


def _gpd_inverse_distribution(lambdas, xi, beta, lambda_u, threshold):
"""
Inverse survival function (1-CDF) using a generalized Pareto distribution for the conditional
distribution of the tail and a probability of threhsold exceedance (lambda_u).
See https://en.wikipedia.org/wiki/Generalized_Pareto_distribution
"""
lambdas = np.asarray(lambdas)
return threshold + (beta / xi) * ((lambdas / lambda_u) ** (-xi) - 1)


def fit_tail_GPD(
test_frequency=None,
test_values=None,
frequency=None,
values=None,
value_threshold=0,
threshold_percentile=90,
min_sample_size=30,
):
"""
Fit a generalized Pareto distribution (GPD) to the tail of the exceedance data
and extrapolate to test points.

Extreme value theory provides one principal solution for fitting exceedance values
using a peaks-over-threshold approach leading to the GPD, see e.g.
https://en.wikipedia.org/wiki/Generalized_Pareto_distribution and
(Coles, 2001, Chapter 4, https://doi.org/10.1007/978-1-4471-3675-0).
The selection of the threshold is a crucial choice because lower thresholds result in
more data points to fit, while higher thresholds ensure that data points actually
correspond to extreme values.

Parameters
----------
test_frequency : array_like, optional
Exceedance frequencies for which to compute values. If given, test_values must be None.
test_values : array_like, optional
Values for which to compute exceedance frequencies. If given, test_frequency must be None.
frequency : array_like
Frequencies of the observed values.
values : array_like
Observed values.
value_threshold : float, optional
Value threshold to filter values before the threshold percentile is calculated.
Defaults to 0, filtering out all values less or equal to zero.
threshold_percentile : float, optional
Percentile for the tail threshold. Defaults to 90.
min_sample_size : int, optional
Minimum number of points above the threshold. Defaults to 30.

Returns
-------
tuple
(test_frequency, values) if test_frequency is given, else (exceedance_frequencies, test_values)
"""

# Validate inputs
if (test_frequency is not None and test_values is not None) or (
test_frequency is None and test_values is None
):
raise ValueError("Provide exactly one of test_frequency or test_values")

# Filter by value_threshold before computing threshold with percentile
mask_filter = values > value_threshold
frequency = frequency[mask_filter]
values = values[mask_filter]

# compute percentile threshold and select tail
threshold = np.percentile(values, threshold_percentile)
mask = values > threshold
values_tail = values[mask]
frequency_tail = frequency[mask]
if sum(mask) < min_sample_size:
raise ValueError(
f"Not enough data points above the threshold for fitting the GPD. You can try to "
f"choose a smaller threshold_percentile={threshold_percentile} or a smaller "
f"min_sample_size={min_sample_size}."
)

# Sort values and frequencies
sorted_idxs = np.argsort(values_tail)
values_tail = np.squeeze(values_tail[sorted_idxs])
frequency_tail = frequency_tail[sorted_idxs]
ex_freq = np.cumsum(frequency_tail[::-1])[::-1]
lambda_u = ex_freq[0] # estimated frequency for exceeding the threshold

def exceedance_negerror(params):
"""function to be optimized: summed squared of log errors"""
_, beta = params
if beta <= 0:
return np.inf
model = _gpd_distribution(values_tail, *params, lambda_u, threshold)
return np.sum((np.log(ex_freq) - np.log(model)) ** 2)

res = minimize(
exceedance_negerror,
[0.1, np.std(values_tail - threshold)],
bounds=[(-1, None), (1e-6, None)],
)
xi_hat, beta_hat = res.x
fit_result = {"xi": xi_hat, "beta": beta_hat}
LOGGER.info(
"Fitted GPD parameters using %.3g points: xi=%.3g, beta=%.3g.",
sum(mask),
xi_hat,
beta_hat,
)

# Compute some goodness-of-fit metrics
# Recompute model on tail with fitted params
model_tail = _gpd_distribution(values_tail, xi_hat, beta_hat, lambda_u, threshold)
# Avoid log(0) issues
eps = 1e-12
lambda_tail_safe = np.maximum(ex_freq, eps)
model_tail_safe = np.maximum(model_tail, eps)
# Root mean-squared error (log-survival space)
residuals = np.log(lambda_tail_safe) - np.log(model_tail_safe)
rmse_log = np.sqrt(np.mean(residuals**2))

# Kolmogorov–Smirnov distance on normalized survival function
empirical_prob = lambda_tail_safe / lambda_u
model_prob = model_tail_safe / lambda_u
ks_distance = np.max(np.abs(empirical_prob - model_prob))

fit_result.update({"rmse_log": rmse_log, "ks_distance": ks_distance})
LOGGER.info(
"GOF metrics: RMSE_log=%.3g, KS=%.3g",
rmse_log,
ks_distance,
)

if test_values is not None:
mask_tail = test_values > threshold
lambda_dist = _gpd_distribution(
test_values, xi_hat, beta_hat, lambda_u, threshold
)
lambda_dist[~mask_tail] = np.nan
return (lambda_dist, test_values, fit_result)
else:
vals = _gpd_inverse_distribution(
test_frequency, xi_hat, beta_hat, lambda_u, threshold
)
mask_tail = vals > threshold
vals[~mask_tail] = np.nan
return (test_frequency, vals, fit_result)
55 changes: 55 additions & 0 deletions climada/util/test/test_interpolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import unittest

import numpy as np
from scipy.stats import genextreme, genpareto

import climada.util.interpolation as u_interp

Expand Down Expand Up @@ -282,6 +283,60 @@ def test_preprocess_and_interpolate_ev(self):
with self.assertRaises(ValueError):
u_interp.preprocess_and_interpolate_ev(None, None, frequency, values)

def test_fit_tail_gpd(self):
"""Test GPD fitting with synthetic data"""
rng = np.random.default_rng(42)
xi_true = 0.35
beta_true = 7
n = 5000
threshold_percentile = 90
values = genpareto.rvs(c=xi_true, scale=beta_true, size=n, random_state=rng)
frequency = np.ones(n) / n

threshold = np.percentile(values, threshold_percentile)
lambda_u = np.sum(
frequency[values >= threshold]
) # frequency of exceeding threhold
test_freq = lambda_u * np.array([1.2, 0.8, 0.5])
freq_out, val_out, fit_result = u_interp.fit_tail_GPD(
test_frequency=test_freq,
frequency=frequency,
values=values,
threshold_percentile=threshold_percentile,
)
# test shapes
np.testing.assert_equal(len(freq_out), len(test_freq))
np.testing.assert_equal(len(val_out), len(test_freq))

# test fitted parameters
# changing the threshold does not change xi but changes beta, see Ch. 4 Eq. 4.16 in
# (Coles, 2001, https://doi.org/10.1007/978-1-4471-3675-0)
expected_beta = beta_true + xi_true * threshold
np.testing.assert_allclose(fit_result["xi"], xi_true, rtol=0.15)
np.testing.assert_allclose(fit_result["beta"], expected_beta, rtol=0.15)

# Test predicted tail
# Ch. 4 Eq. 4.13 in (Coles, 2001, https://doi.org/10.1007/978-1-4471-3675-0)
expected_vals = threshold + (expected_beta / xi_true) * (
(test_freq / lambda_u) ** (-xi_true) - 1.0
)
expected_vals = np.where(expected_vals >= threshold, expected_vals, np.nan)
np.testing.assert_allclose(val_out, expected_vals, rtol=0.15)

# testing the inverse of the distribution
test_values = np.array([0.9 * threshold, 1.1 * threshold, 1.5 * threshold])
freq_out, val_out, fit_result = u_interp.fit_tail_GPD(
test_values=test_values,
frequency=frequency,
values=values,
threshold_percentile=threshold_percentile,
)
expected_freqs = lambda_u * (
1 + xi_true / expected_beta * (test_values - threshold)
) ** (-1 / xi_true)
expected_freqs = np.where(test_values >= threshold, expected_freqs, np.nan)
np.testing.assert_allclose(freq_out, expected_freqs, rtol=0.15)


# Execute Tests
if __name__ == "__main__":
Expand Down
Loading
Loading