Source code for chap_core.assessment.data_representation_transforming
import itertools
from collections import defaultdict
from typing import Dict, List
from chap_core.assessment.evaluator import Evaluator
from chap_core.assessment.representations import (
MultiLocationForecast,
Samples,
Forecast,
MultiLocationDiseaseTimeSeries,
DiseaseObservation,
DiseaseTimeSeries,
MultiLocationErrorTimeSeries,
ErrorTimeSeries,
Error,
)
from chap_core.database.tables import BackTestForecast
from chap_core.database.dataset_tables import ObservationBase
[docs]
def convert_to_multi_location_forecast(backTestList: List[BackTestForecast]) -> Dict[str, MultiLocationForecast]:
# Group samples by location
all_splitpoint_timeseries = {}
backTestList = sorted(backTestList, key=lambda x: x.last_seen_period)
for last_seen_period, forecast_list in itertools.groupby(backTestList, key=lambda x: x.last_seen_period):
all_splitpoint_timeseries[last_seen_period] = convert_single_splitpoint_to_multi_location_forecast(
forecast_list
)
return all_splitpoint_timeseries
[docs]
def convert_single_splitpoint_to_multi_location_forecast(backTestList: List[BackTestForecast]) -> MultiLocationForecast:
location_forecasts: Dict[str, List[Samples]] = defaultdict(list)
for forecast in backTestList:
location_key = str(forecast.org_unit) # Or use forecast.backtest.location if available
sample = Samples(
time_period=str(forecast.period), # Convert PeriodID to str
disease_case_samples=forecast.values,
)
location_forecasts[location_key].append(sample)
# Sort each list of Samples by time_period before wrapping in Forecast
timeseries = {
location: Forecast(predictions=sorted(samples, key=lambda s: s.time_period))
for location, samples in location_forecasts.items()
}
return MultiLocationForecast(timeseries=timeseries)
[docs]
def convert_to_multi_location_timeseries(obs: List[ObservationBase]) -> MultiLocationDiseaseTimeSeries:
grouped: defaultdict[str, List[DiseaseObservation]] = defaultdict(list)
for ob in obs:
if ob.feature_name == "disease_cases" and ob.value is not None:
disease_obs = DiseaseObservation(
time_period=str(ob.period), # Ensure PeriodID is string-convertible
disease_cases=int(ob.value), # Round or cast as needed
)
grouped[ob.org_unit].append(disease_obs)
multi_ts = MultiLocationDiseaseTimeSeries()
for location, observations in grouped.items():
# Optionally sort by time_period
observations.sort(key=lambda x: x.time_period)
multi_ts[location] = DiseaseTimeSeries(observations=observations)
return multi_ts
[docs]
def mean(samples):
return sum(samples) / len(samples)
[docs]
class MAEonMeanPredictions(Evaluator):
# def evaluate(self, true_values, samples):
[docs]
def evaluate(
self, all_truths: MultiLocationDiseaseTimeSeries, all_forecasts: MultiLocationForecast
) -> MultiLocationErrorTimeSeries:
evaluation_result = MultiLocationErrorTimeSeries(timeseries_dict={})
for location in all_truths.locations():
truth_series = all_truths[location]
forecast_series = all_forecasts.timeseries[location]
assert len(truth_series.observations) == len(forecast_series.predictions), (
f"{len(truth_series.observations)} != {len(forecast_series.predictions)}"
)
truth_and_forecast_series = zip(truth_series.observations, forecast_series.predictions)
error = 0
for truth, prediction in truth_and_forecast_series:
assert truth.time_period == prediction.time_period, (truth.time_period, prediction.time_period)
predicted_mean = mean(prediction.disease_case_samples)
error += abs(truth.disease_cases - predicted_mean)
mean_absolute_error = error / len(truth_series.observations)
evaluation_result[location] = ErrorTimeSeries(
observations=[Error(time_period="Full_period", value=mean_absolute_error)]
)
return evaluation_result
# def evaluate_and_give_one_aggregated_metric(...) -> float:
# res = self.evaluate()
# for location, ..