chap_core.rest_api package¶
Subpackages¶
- chap_core.rest_api.v1 package
- Submodules
- chap_core.rest_api.v1.debug module
- chap_core.rest_api.v1.jobs module
- chap_core.rest_api.v1.rest_api module
CompatibilityResponseHealthResponseStateSystemInfoResponsecancel()create_api()favicon()get_evaluation_results()get_exception()get_openapi_schema()get_results()get_status()health()is_compatible()list_features()list_models()main_backend()seed()set_cur_response()system_info()version()
- Module contents
Submodules¶
chap_core.rest_api.celery_tasks module¶
- class chap_core.rest_api.celery_tasks.CeleryJob(job: Task, app: Celery)[source]¶
Bases:
Generic[ReturnType]Wrapper for a Celery Job
- property exception_info: str¶
- property id¶
- property is_finished: bool¶
- property progress: float¶
- property result: ReturnType¶
- property status: str¶
- class chap_core.rest_api.celery_tasks.CeleryPool(celery: Celery = None)[source]¶
Bases:
Generic[ReturnType]Simple abstraction for a Celery Worker
- class chap_core.rest_api.celery_tasks.JobDescription(*, id: str, type: str, name: str, status: str, start_time: str | None, end_time: str | None, result: str | None)[source]¶
Bases:
BaseModel- end_time: str | None¶
- id: str¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str¶
- result: str | None¶
- start_time: str | None¶
- status: str¶
- type: str¶
- class chap_core.rest_api.celery_tasks.TrackedTask[source]¶
Bases:
Task- apply_async(args=None, kwargs=None, **options)[source]¶
Apply tasks asynchronously by sending a message.
- Arguments:
args (Tuple): The positional arguments to pass on to the task.
kwargs (Dict): The keyword arguments to pass on to the task.
- countdown (float): Number of seconds into the future that the
task should execute. Defaults to immediate execution.
- eta (~datetime.datetime): Absolute time and date of when the task
should be executed. May not be specified if countdown is also supplied.
- expires (float, ~datetime.datetime): Datetime or
seconds in the future for the task should expire. The task won’t be executed after the expiration time.
- shadow (str): Override task name used in logs/monitoring.
Default is retrieved from
shadow_name().- connection (kombu.Connection): Reuse existing broker connection
instead of acquiring one from the connection pool.
- retry (bool): If enabled sending of the task message will be
retried in the event of connection loss or failure. Default is taken from the :setting:`task_publish_retry` setting. Note that you need to handle the producer/connection manually for this to work.
- retry_policy (Mapping): Override the retry policy used.
See the :setting:`task_publish_retry_policy` setting.
time_limit (int): If set, overrides the default time limit.
- soft_time_limit (int): If set, overrides the default soft
time limit.
- queue (str, kombu.Queue): The queue to route the task to.
This must be a key present in :setting:`task_queues`, or :setting:`task_create_missing_queues` must be enabled. See guide-routing for more information.
- exchange (str, kombu.Exchange): Named custom exchange to send the
task to. Usually not used in combination with the
queueargument.- routing_key (str): Custom routing key used to route the task to a
worker server. If in combination with a
queueargument only used to specify custom routing keys to topic exchanges.- priority (int): The task priority, a number between 0 and 9.
Defaults to the
priorityattribute.- serializer (str): Serialization method to use.
Can be pickle, json, yaml, msgpack or any custom serialization method that’s been registered with
kombu.serialization.registry. Defaults to theserializerattribute.- compression (str): Optional compression method
to use. Can be one of
zlib,bzip2, or any custom compression methods registered withkombu.compression.register(). Defaults to the :setting:`task_compression` setting.- link (Signature): A single, or a list of tasks signatures
to apply if the task returns successfully.
- link_error (Signature): A single, or a list of task signatures
to apply if an error occurs while executing the task.
- producer (kombu.Producer): custom producer to use when publishing
the task.
- add_to_parent (bool): If set to True (default) and the task
is applied while executing another task, then the result will be appended to the parent tasks
request.childrenattribute. Trailing can also be disabled by default using thetrailattribute- ignore_result (bool): If set to False (default) the result
of a task will be stored in the backend. If set to True the result will not be stored. This can also be set using the
ignore_resultin the app.task decorator.
publisher (kombu.Producer): Deprecated alias to
producer.- headers (Dict): Message headers to be included in the message.
The headers can be used as an overlay for custom labeling using the canvas-stamping feature.
- Returns:
celery.result.AsyncResult: Promise of future evaluation.
- Raises:
- TypeError: If not enough arguments are passed, or too many
arguments are passed. Note that signature checks may be disabled by specifying
@task(typing=False).- ValueError: If soft_time_limit and time_limit both are set
but soft_time_limit is greater than time_limit
- kombu.exceptions.OperationalError: If a connection to the
transport cannot be made, or if the connection is lost.
- Note:
Also supports all keyword arguments supported by
kombu.Producer.publish().
- ignore_result = False¶
If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.
- on_failure(exc, task_id, args, kwargs, einfo)[source]¶
Error handler.
This is run by the worker when the task fails.
- Arguments:
exc (Exception): The exception raised by the task. task_id (str): Unique id of the failed task. args (Tuple): Original arguments for the task that failed. kwargs (Dict): Original keyword arguments for the task that failed. einfo (~billiard.einfo.ExceptionInfo): Exception information.
- Returns:
None: The return value of this handler is ignored.
- on_success(retval, task_id, args, kwargs)[source]¶
Success handler.
Run by the worker if the task executes successfully.
- Arguments:
retval (Any): The return value of the task. task_id (str): Unique id of the executed task. args (Tuple): Original arguments for the executed task. kwargs (Dict): Original keyword arguments for the executed task.
- Returns:
None: The return value of this handler is ignored.
- priority = None¶
Default task priority.
- rate_limit = None¶
Rate limit for this task type. Examples:
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)
- reject_on_worker_lost = None¶
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>¶
Task request stack, the current request will be the topmost.
- serializer = 'pickle'¶
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False¶
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True¶
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the :setting:`task_track_started` setting.
- typing = True¶
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
chap_core.rest_api.data_models module¶
- class chap_core.rest_api.data_models.BackTestCreate(*, datasetId: int, modelId: str, name: str | None = None, created: datetime | None = None, modelTemplateVersion: str | None = None)[source]¶
Bases:
BackTestBase- model_config: ClassVar[ConfigDict] = {'alias_generator': <function to_camel>, 'from_attributes': True, 'populate_by_name': True, 'registry': PydanticUndefined, 'validate_by_alias': True, 'validate_by_name': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class chap_core.rest_api.data_models.BackTestFull(*, datasetId: int, modelId: str, name: str | None = None, created: ~datetime.datetime | None = None, modelTemplateVersion: str | None = None, id: int, orgUnits: ~typing.List[str] = <factory>, splitPeriods: ~typing.List[str] = <factory>, dataset: ~chap_core.database.tables.DataSetMeta, aggregateMetrics: ~typing.Dict[str, float], configuredModel: ~chap_core.database.tables.ConfiguredModelRead | None, metrics: list[~chap_core.database.tables.BackTestMetric], forecasts: list[~chap_core.database.tables.BackTestForecast])[source]¶
Bases:
BackTestRead- forecasts: list[BackTestForecast]¶
- metrics: list[BackTestMetric]¶
- model_config: ClassVar[ConfigDict] = {'alias_generator': <function to_camel>, 'from_attributes': True, 'populate_by_name': True, 'registry': PydanticUndefined, 'validate_by_alias': True, 'validate_by_name': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class chap_core.rest_api.data_models.DatasetMakeRequest(*, name: str, dataSources: ~typing.List[~chap_core.database.dataset_tables.DataSource] | None = <factory>, type: str | None = None, geojson: ~chap_core.api_types.FeatureCollectionModel, providedData: ~typing.List[~chap_core.database.dataset_tables.ObservationBase], dataToBeFetched: ~typing.List[~chap_core.rest_api.data_models.FetchRequest])[source]¶
Bases:
DataSetCreateInfo- data_to_be_fetched: List[FetchRequest]¶
- geojson: FeatureCollectionModel¶
- model_config: ClassVar[ConfigDict] = {'alias_generator': <function to_camel>, 'from_attributes': True, 'populate_by_name': True, 'registry': PydanticUndefined, 'validate_by_alias': True, 'validate_by_name': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- provided_data: List[ObservationBase]¶
- class chap_core.rest_api.data_models.FetchRequest(*, featureName: str, dataSourceName: str)[source]¶
Bases:
DBModel- data_source_name: str¶
- feature_name: str¶
- model_config: ClassVar[ConfigDict] = {'alias_generator': <function to_camel>, 'from_attributes': True, 'populate_by_name': True, 'registry': PydanticUndefined, 'validate_by_alias': True, 'validate_by_name': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class chap_core.rest_api.data_models.FullPredictionResponse(*, diseaseId: str, dataValues: List[PredictionResponse])[source]¶
Bases:
BaseModel- dataValues: List[PredictionResponse]¶
- diseaseId: str¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class chap_core.rest_api.data_models.ImportSummaryResponse(*, id: str | None, importedCount: int, rejected: list[ValidationError])[source]¶
Bases:
DBModel- id: str | None¶
- imported_count: int¶
- model_config: ClassVar[ConfigDict] = {'alias_generator': <function to_camel>, 'from_attributes': True, 'populate_by_name': True, 'registry': PydanticUndefined, 'validate_by_alias': True, 'validate_by_name': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- rejected: list[ValidationError]¶
- class chap_core.rest_api.data_models.JobResponse(*, id: str)[source]¶
Bases:
BaseModel- id: str¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class chap_core.rest_api.data_models.PredictionBase(*, orgUnit: str, dataElement: str, period: str)[source]¶
Bases:
BaseModel- dataElement: str¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- orgUnit: str¶
- period: str¶
- class chap_core.rest_api.data_models.PredictionParams(*, modelId: str, nPeriods: int = 3)[source]¶
Bases:
DBModel- model_config: ClassVar[ConfigDict] = {'alias_generator': <function to_camel>, 'from_attributes': True, 'populate_by_name': True, 'registry': PydanticUndefined, 'validate_by_alias': True, 'validate_by_name': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_id: str¶
- n_periods: int¶
- class chap_core.rest_api.data_models.PredictionResponse(*, orgUnit: str, dataElement: str, period: str, value: float)[source]¶
Bases:
PredictionBase- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- value: float¶
- class chap_core.rest_api.data_models.PredictionSamplResponse(*, orgUnit: str, dataElement: str, period: str, values: list[float])[source]¶
Bases:
PredictionBase- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- values: list[float]¶
- class chap_core.rest_api.data_models.ValidationError(*, reason: str, orgUnit: str, featureName: str, timePeriods: List[str])[source]¶
Bases:
DBModel- feature_name: str¶
- model_config: ClassVar[ConfigDict] = {'alias_generator': <function to_camel>, 'from_attributes': True, 'populate_by_name': True, 'registry': PydanticUndefined, 'validate_by_alias': True, 'validate_by_name': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- org_unit: str¶
- reason: str¶
- time_periods: List[str]¶
chap_core.rest_api.db_worker_functions module¶
- chap_core.rest_api.db_worker_functions.convert_dicts_to_models(func)[source]¶
Convert dict arguments to Pydantic models based on type hints.
- chap_core.rest_api.db_worker_functions.debug(session: SessionWrapper)[source]¶
- chap_core.rest_api.db_worker_functions.harmonize_and_add_dataset(provided_field_names: list[str], data_to_be_fetched: list[FetchRequest], health_dataset: DataSet, name: str, ds_type: str, session: SessionWrapper, worker_config=WorkerConfig(is_test=False, failing_services=())) FullData[source]¶
- chap_core.rest_api.db_worker_functions.harmonize_and_add_health_dataset(health_dataset: FullData, name: str, session: SessionWrapper, worker_config=WorkerConfig(is_test=False, failing_services=())) FullData[source]¶
- chap_core.rest_api.db_worker_functions.predict_pipeline_from_composite_dataset(provided_field_names: list[str], health_dataset: dict, name: str, dataset_create_info: DataSetCreateInfo, prediction_params: PredictionParams, session: SessionWrapper, worker_config=WorkerConfig(is_test=False, failing_services=())) int[source]¶
This is the main pipeline function to run prediction from a dataset.
- chap_core.rest_api.db_worker_functions.run_backtest(info: BackTestCreate, n_periods: int | None = None, n_splits: int = 10, stride: int = 1, session: SessionWrapper = None)[source]¶
- chap_core.rest_api.db_worker_functions.run_backtest_from_dataset(feature_names: list[str], provided_data_model_dump: dict, backtest_name: str, model_id: str, dataset_info: DataSetCreateInfo, backtest_params: BackTestParams, session: SessionWrapper, worker_config=WorkerConfig(is_test=False, failing_services=())) int[source]¶
- chap_core.rest_api.db_worker_functions.run_prediction(model_id: str, dataset_id: str, n_periods: int | None, name: str, session: SessionWrapper)[source]¶
chap_core.rest_api.worker_functions module¶
- class chap_core.rest_api.worker_functions.DataValue(value: int, orgUnit: str, dataElement: str, period: str)[source]¶
Bases:
object- dataElement: str¶
- orgUnit: str¶
- period: str¶
- value: int¶
- class chap_core.rest_api.worker_functions.WorkerConfig(*, is_test: bool = False, failing_services: Tuple[str] = ())[source]¶
Bases:
BaseModel- failing_services: Tuple[str]¶
- is_test: bool¶
- model_config: ClassVar[ConfigDict] = {'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- chap_core.rest_api.worker_functions.dataset_from_request_v1(json_data: RequestV1, target_name='diseases', usecwd_for_credentials=False, worker_config: WorkerConfig = WorkerConfig(is_test=False, failing_services=())) DataSet[FullData][source]¶
- chap_core.rest_api.worker_functions.dataset_to_datalist(dataset: DataSet[HealthData], target_id: str) DataList[source]¶
- chap_core.rest_api.worker_functions.get_combined_dataset(json_data: RequestV1)[source]¶
Get a dataset of potentially multiple data types from a RequestV1 object.
- chap_core.rest_api.worker_functions.get_health_dataset(json_data: PredictionRequest, dataclass=None, colnames=('ou', 'pe'))[source]¶
- chap_core.rest_api.worker_functions.harmonize_health_dataset(dataset, usecwd_for_credentials, fetch_requests: List[FetchRequest] = None, worker_config: WorkerConfig = WorkerConfig(is_test=False, failing_services=()))[source]¶
- chap_core.rest_api.worker_functions.predictions_to_datavalue(data: DataSet[HealthData], attribute_mapping: dict[str, str])[source]¶
- chap_core.rest_api.worker_functions.sample_dataset_to_prediction_response(predictions: DataSet[Samples], target_id: str) dict[source]¶
- chap_core.rest_api.worker_functions.samples_to_evaluation_response(predictions_list, quantiles, real_data: DataList)[source]¶
- chap_core.rest_api.worker_functions.train_on_json_data(json_data: RequestV1, model_name, model_path, control=None)[source]¶
- chap_core.rest_api.worker_functions.v1_conversion(data_list: list[DataElement | DataElementV2], fill_missing=False, colnames=('ou', 'pe')) DataSet[TimeSeriesArray][source]¶
Convert a list of DataElement objects to a SpatioTemporalDict[TimeSeriesArray] object.