Source code for chap_core.models.external_web_model

import io
import logging
import time
import uuid
from pathlib import Path
from typing import Optional

import pandas as pd
import requests
import yaml

from chap_core.datatypes import Samples
from chap_core.exceptions import ModelFailedException
from chap_core.geometry import Polygons
from chap_core.models.external_model import ExternalModelBase
from chap_core.spatio_temporal_data.temporal_dataclass import DataSet

logger = logging.getLogger(__name__)


[docs] class ExternalWebModel(ExternalModelBase): """ Wrapper for a ConfiguredModel that can only be run through a web service, defined by an URL. This web service supports a strict REST API that allows for training and prediction. This class makes such a model available through the ConfiguredModel interface, with train and predict methods. """ def __init__( self, api_url: str, name: str = None, timeout: int = 3600, poll_interval: int = 5, configuration: Optional[dict] = None, adapters: Optional[dict] = None, working_dir: str = "./", ): """ Initialize the ExternalWebModel. Parameters ---------- api_url : str Base URL of the model API (e.g., "http://localhost:8000") name : str Name of the model timeout : int Maximum time to wait for job completion in seconds (default: 3600) poll_interval : int Time between status checks in seconds (default: 5) configuration : dict Optional configuration to pass to the model adapters : dict Optional field name adapters for data conversion working_dir : str Working directory for temporary files (default: "./") """ self._api_url = api_url.rstrip("/") self._name = name or "external_web_model" self._timeout = timeout self._poll_interval = poll_interval self._configuration = configuration or {} self._trained_model_name = None self._adapters = adapters self._working_dir = Path(working_dir) self._location_mapping = None self._polygons_file_name = None @property def name(self): return self._name @property def configuration(self): return self._configuration def _wait_for_job(self, job_id: str, job_type: str = "job") -> dict: """ Poll the API until the job completes or times out. Parameters ---------- job_id : str The job ID to monitor job_type : str Type of job for logging ("training" or "prediction") Returns ------- dict The final job status Raises ------ ModelFailedException If the job fails or times out """ start_time = time.time() while time.time() - start_time < self._timeout: try: response = requests.get(f"{self._api_url}/check_status/{job_id}") response.raise_for_status() job_info = response.json() status = job_info["status"] logger.info(f"{job_type} job {job_id} status: {status}") if status == "completed": return job_info elif status == "failed": error_msg = job_info.get("error_message", "Unknown error") raise ModelFailedException(f"{job_type} job failed: {error_msg}") time.sleep(self._poll_interval) except requests.RequestException as e: logger.error(f"Error checking job status: {e}") raise ModelFailedException(f"Failed to check job status: {e}") raise ModelFailedException(f"{job_type} job timed out after {self._timeout} seconds")
[docs] def train(self, train_data: DataSet, extra_args=None): """ Trains the model by starting training and waiting for the model to finish training """ logger.info(f"Training {self._name} via API at {self._api_url}") # Adapt training data using base class method frequency = self._get_frequency(train_data) train_df = train_data.to_pandas() adapted_df = self._adapt_data(train_df, frequency=frequency) # Convert to CSV string for upload train_csv = io.StringIO() adapted_df.to_csv(train_csv, index=False) train_csv.seek(0) # Prepare files for upload files = { "training_data": ("training_data.csv", train_csv, "text/csv"), } # Handle polygons using base class method if train_data.polygons is not None: self._polygons_file_name = self._working_dir / "polygons.geojson" self._write_polygons_to_geojson(train_data, self._polygons_file_name) polygons_json = Polygons(train_data.polygons).to_json() files["polygons"] = ("polygons.geojson", polygons_json, "application/geo+json") logger.info("Will pass polygons file to train and predict commands") # Add configuration if present if self._configuration: config_yaml = yaml.dump(self._configuration) files["config"] = ("config.yaml", config_yaml, "text/yaml") # Generate unique model name for this training session self._trained_model_name = f"{self._name}_{uuid.uuid4().hex[:8]}" data = { "model_name": self._trained_model_name, } try: # Submit training job response = requests.post( f"{self._api_url}/train", files=files, data=data, ) response.raise_for_status() result = response.json() job_id = result["job_id"] logger.info(f"Training job submitted with ID: {job_id}") # Wait for training to complete self._wait_for_job(job_id, "Training") logger.info(f"Training completed for model: {self._trained_model_name}") return self except requests.RequestException as e: logger.error(f"Failed to submit training job: {e}") raise ModelFailedException(f"Training failed: {e}")
[docs] def predict(self, historic_data: DataSet, future_data: DataSet) -> DataSet: """ Predicts by starting prediction and waiting for the model to finish prediction. """ if self._trained_model_name is None: raise ModelFailedException("Model must be trained before prediction") logger.info(f"Predicting with {self._trained_model_name} via API at {self._api_url}") # Adapt data using base class methods historic_frequency = self._get_frequency(historic_data) future_frequency = self._get_frequency(future_data) historic_adapted = self._adapt_data(historic_data.to_pandas(), frequency=historic_frequency) future_adapted = self._adapt_data(future_data.to_pandas(), frequency=future_frequency) # Convert adapted data to CSV strings historic_csv = io.StringIO() historic_adapted.to_csv(historic_csv, index=False) historic_csv.seek(0) future_csv = io.StringIO() future_adapted.to_csv(future_csv, index=False) future_csv.seek(0) # Prepare files for upload files = { "historic_data": ("historic_data.csv", historic_csv, "text/csv"), "future_data": ("future_data.csv", future_csv, "text/csv"), } # Add polygons if present (reuse from training if available) if self._polygons_file_name is not None: # Use existing polygons from training polygons_json = Polygons(future_data.polygons).to_json() files["polygons"] = ("polygons.geojson", polygons_json, "application/geo+json") elif future_data.polygons is not None: # Write new polygons if not already written during training self._polygons_file_name = self._working_dir / "polygons.geojson" self._write_polygons_to_geojson(future_data, self._polygons_file_name) polygons_json = Polygons(future_data.polygons).to_json() files["polygons"] = ("polygons.geojson", polygons_json, "application/geo+json") data = { "model_name": self._trained_model_name, } try: # Submit prediction job response = requests.post( f"{self._api_url}/predict", files=files, data=data, ) response.raise_for_status() result = response.json() job_id = result["job_id"] logger.info(f"Prediction job submitted with ID: {job_id}") # Wait for prediction to complete self._wait_for_job(job_id, "Prediction") # Fetch predictions response = requests.get(f"{self._api_url}/fetch_predictions/{job_id}") response.raise_for_status() # Parse predictions CSV predictions_csv = io.StringIO(response.text) predictions_df = pd.read_csv(predictions_csv) # Apply inverse location mapping if needed if self._location_mapping is not None: predictions_df["location"] = predictions_df["location"].apply(self._location_mapping.index_to_name) # Convert to DataSet try: predictions = DataSet.from_pandas(predictions_df, Samples) return predictions except ValueError as e: logger.error(f"Error parsing predictions: {e}") raise ModelFailedException(f"Failed to parse predictions: {e}") except requests.RequestException as e: logger.error(f"Failed to submit prediction job: {e}") raise ModelFailedException(f"Prediction failed: {e}")