Source code for chap_core.assessment.metrics.peak_diff

"""
one numeric metric column per metric. get_metric validates this against your MetricSpec. If you try to return extra numeric columns (e.g., both value_diff and week_lag), CHAP will raise a “produced wrong columns” error.
"""

import pandas as pd
from chap_core.assessment.flat_representations import DataDimension, FlatForecasts, FlatObserved
from chap_core.assessment.metrics.base import MetricBase, MetricSpec


def _parse_year_week(week_str: str) -> tuple[int, int]:
    year_str, w = week_str.split("-W")
    return int(year_str), int(w)


def _week_index(week_str: str) -> int:
    y, w = _parse_year_week(week_str)
    return y * 54 + (w - 1)


def _week_diff(w1: str, w2: str) -> int:
    return _week_index(w2) - _week_index(w1)


def _pick_peak(rows: pd.DataFrame, value_col: str) -> tuple[str, float]:
    tmp = rows[["time_period", value_col]].copy()
    tmp["_wi"] = tmp["time_period"].map(_week_index)
    tmp = tmp.sort_values(by=[value_col, "_wi"], ascending=[False, True])
    top = tmp.iloc[0]
    return str(top["time_period"]), float(top[value_col])


[docs] class PeakValueDiffMetric(MetricBase): # Now returns (location, time_period, horizon_distance, metric) spec = MetricSpec( output_dimensions=(DataDimension.location, DataDimension.time_period, DataDimension.horizon_distance), metric_name="Peak Value Difference", metric_id="peak_value_diff", description="Truth peak value minus predicted peak value, per horizon.", )
[docs] def compute(self, observations: FlatObserved, forecasts: FlatForecasts) -> pd.DataFrame: fc_mean = ( forecasts.groupby(["location", "time_period", "horizon_distance"], as_index=False)["forecast"] .mean() .rename(columns={"forecast": "forecast_mean"}) ) obs = observations[["location", "time_period", "disease_cases"]].copy() out_rows = [] for loc, obs_loc in obs.groupby("location"): truth_timepoint, truth_val = _pick_peak(obs_loc, "disease_cases") fc_loc = fc_mean[fc_mean["location"] == loc] if fc_loc.empty: continue for h, fc_loc_h in fc_loc.groupby("horizon_distance"): if fc_loc_h.empty: continue _, pred_val = _pick_peak(fc_loc_h[["time_period", "forecast_mean"]], "forecast_mean") metric_val = float(truth_val - pred_val) out_rows.append( { "location": loc, "time_period": truth_timepoint, "horizon_distance": int(h), "metric": metric_val, } ) return pd.DataFrame(out_rows, columns=["location", "time_period", "horizon_distance", "metric"])
[docs] class PeakWeekLagMetric(MetricBase): spec = MetricSpec( output_dimensions=(DataDimension.location, DataDimension.time_period, DataDimension.horizon_distance), metric_name="Peak Week Lag", metric_id="peak_week_lag", description="Lag in weeks between true and predicted peak (pred - truth), per horizon.", )
[docs] def compute(self, observations: FlatObserved, forecasts: FlatForecasts) -> pd.DataFrame: fc_mean = ( forecasts.groupby(["location", "time_period", "horizon_distance"], as_index=False)["forecast"] .mean() .rename(columns={"forecast": "forecast_mean"}) ) obs = observations[["location", "time_period", "disease_cases"]].copy() out_rows = [] for loc, obs_loc in obs.groupby("location"): truth_timepoint, _ = _pick_peak(obs_loc, "disease_cases") fc_loc = fc_mean[fc_mean["location"] == loc] if fc_loc.empty: continue for h, fc_loc_h in fc_loc.groupby("horizon_distance"): if fc_loc_h.empty: continue pred_timepoint, _ = _pick_peak(fc_loc_h[["time_period", "forecast_mean"]], "forecast_mean") lag_weeks = int(_week_diff(truth_timepoint, pred_timepoint)) out_rows.append( { "location": loc, "time_period": truth_timepoint, "horizon_distance": int(h), "metric": float(lag_weeks), } ) return pd.DataFrame(out_rows, columns=["location", "time_period", "horizon_distance", "metric"])