Skip to content

API Reference

Complete API documentation for all chapkit modules, classes, and functions.

Artifact Module

Hierarchical storage system for models, data, and experiment tracking.

Models

models

Artifact ORM model for hierarchical data storage.

Classes

Artifact

Bases: Entity

ORM model for hierarchical artifacts with parent-child relationships.

Source code in src/chapkit/artifact/models.py
class Artifact(Entity):
    """ORM model for hierarchical artifacts with parent-child relationships."""

    __tablename__ = "artifacts"

    parent_id: Mapped[ULID | None] = mapped_column(
        ULIDType,
        ForeignKey("artifacts.id", ondelete="SET NULL"),
        nullable=True,
        index=True,
    )

    parent: Mapped[Artifact | None] = relationship(
        remote_side="Artifact.id",
        back_populates="children",
    )

    children: Mapped[list[Artifact]] = relationship(
        back_populates="parent",
    )

    data: Mapped[Any] = mapped_column(PickleType(protocol=4), nullable=False)
    level: Mapped[int] = mapped_column(default=0, nullable=False, index=True)

Schemas

schemas

Pydantic schemas for hierarchical artifacts with tree structures.

Classes

ArtifactIn

Bases: EntityIn

Input schema for creating or updating artifacts.

Source code in src/chapkit/artifact/schemas.py
class ArtifactIn(EntityIn):
    """Input schema for creating or updating artifacts."""

    data: Any
    parent_id: ULID | None = None
    level: int | None = None

ArtifactOut

Bases: EntityOut

Output schema for artifact entities.

Source code in src/chapkit/artifact/schemas.py
class ArtifactOut(EntityOut):
    """Output schema for artifact entities."""

    data: JsonSafe
    parent_id: ULID | None = None
    level: int

ArtifactTreeNode

Bases: ArtifactOut

Artifact node with tree structure metadata.

Source code in src/chapkit/artifact/schemas.py
class ArtifactTreeNode(ArtifactOut):
    """Artifact node with tree structure metadata."""

    level_label: str | None = None
    hierarchy: str | None = None
    children: list["ArtifactTreeNode"] | None = None

    @classmethod
    def from_artifact(cls, artifact: ArtifactOut) -> Self:
        """Create a tree node from an artifact output schema."""
        return cls.model_validate(artifact.model_dump())
Functions
from_artifact(artifact) classmethod

Create a tree node from an artifact output schema.

Source code in src/chapkit/artifact/schemas.py
@classmethod
def from_artifact(cls, artifact: ArtifactOut) -> Self:
    """Create a tree node from an artifact output schema."""
    return cls.model_validate(artifact.model_dump())

ArtifactHierarchy

Bases: BaseModel

Configuration for artifact hierarchy with level labels.

Source code in src/chapkit/artifact/schemas.py
class ArtifactHierarchy(BaseModel):
    """Configuration for artifact hierarchy with level labels."""

    name: str = Field(..., description="Human readable name of this hierarchy")
    level_labels: Mapping[int, str] = Field(
        default_factory=dict,
        description="Mapping of numeric levels to labels (0 -> 'train', etc.)",
    )

    model_config = {"frozen": True}

    hierarchy_key: ClassVar[str] = "hierarchy"
    depth_key: ClassVar[str] = "level_depth"
    label_key: ClassVar[str] = "level_label"

    def label_for(self, level: int) -> str:
        """Get the label for a given level or return default."""
        return self.level_labels.get(level, f"level_{level}")

    def describe(self, level: int) -> dict[str, Any]:
        """Get hierarchy metadata dict for a given level."""
        return {
            self.hierarchy_key: self.name,
            self.depth_key: level,
            self.label_key: self.label_for(level),
        }
Functions
label_for(level)

Get the label for a given level or return default.

Source code in src/chapkit/artifact/schemas.py
def label_for(self, level: int) -> str:
    """Get the label for a given level or return default."""
    return self.level_labels.get(level, f"level_{level}")
describe(level)

Get hierarchy metadata dict for a given level.

Source code in src/chapkit/artifact/schemas.py
def describe(self, level: int) -> dict[str, Any]:
    """Get hierarchy metadata dict for a given level."""
    return {
        self.hierarchy_key: self.name,
        self.depth_key: level,
        self.label_key: self.label_for(level),
    }

PandasDataFrame

Bases: BaseModel

Pydantic schema for serializing pandas DataFrames.

Source code in src/chapkit/artifact/schemas.py
class PandasDataFrame(BaseModel):
    """Pydantic schema for serializing pandas DataFrames."""

    columns: list[str]
    data: list[list[Any]]

    @classmethod
    def from_dataframe(cls, df: pd.DataFrame) -> Self:
        """Create schema from pandas DataFrame."""
        if not isinstance(df, pd.DataFrame):  # pyright: ignore[reportUnnecessaryIsInstance]
            raise TypeError(f"Expected a pandas DataFrame, but got {type(df)}")
        return cls(columns=df.columns.tolist(), data=df.values.tolist())

    def to_dataframe(self) -> pd.DataFrame:
        """Convert schema back to pandas DataFrame."""
        return pd.DataFrame(self.data, columns=self.columns)
Functions
from_dataframe(df) classmethod

Create schema from pandas DataFrame.

Source code in src/chapkit/artifact/schemas.py
@classmethod
def from_dataframe(cls, df: pd.DataFrame) -> Self:
    """Create schema from pandas DataFrame."""
    if not isinstance(df, pd.DataFrame):  # pyright: ignore[reportUnnecessaryIsInstance]
        raise TypeError(f"Expected a pandas DataFrame, but got {type(df)}")
    return cls(columns=df.columns.tolist(), data=df.values.tolist())
to_dataframe()

Convert schema back to pandas DataFrame.

Source code in src/chapkit/artifact/schemas.py
def to_dataframe(self) -> pd.DataFrame:
    """Convert schema back to pandas DataFrame."""
    return pd.DataFrame(self.data, columns=self.columns)

Repository

repository

Artifact repository for hierarchical data access with tree traversal.

Classes

ArtifactRepository

Bases: BaseRepository[Artifact, ULID]

Repository for Artifact entities with tree traversal operations.

Source code in src/chapkit/artifact/repository.py
class ArtifactRepository(BaseRepository[Artifact, ULID]):
    """Repository for Artifact entities with tree traversal operations."""

    def __init__(self, session: AsyncSession) -> None:
        """Initialize artifact repository with database session."""
        super().__init__(session, Artifact)

    async def find_by_id(self, id: ULID) -> Artifact | None:
        """Find an artifact by ID with children eagerly loaded."""
        return await self.s.get(self.model, id, options=[selectinload(self.model.children)])

    async def find_subtree(self, start_id: ULID) -> Iterable[Artifact]:
        """Find all artifacts in the subtree rooted at the given ID using recursive CTE."""
        cte = select(self.model.id).where(self.model.id == start_id).cte(name="descendants", recursive=True)
        cte = cte.union_all(select(self.model.id).where(self.model.parent_id == cte.c.id))

        subtree_ids = (await self.s.scalars(select(cte.c.id))).all()
        rows = (await self.s.scalars(select(self.model).where(self.model.id.in_(subtree_ids)))).all()
        return rows

    async def get_root_artifact(self, artifact_id: ULID) -> Artifact | None:
        """Find the root artifact by traversing up the parent chain."""
        artifact = await self.s.get(self.model, artifact_id)
        if artifact is None:
            return None

        while artifact.parent_id is not None:
            parent = await self.s.get(self.model, artifact.parent_id)
            if parent is None:
                break
            artifact = parent

        return artifact
Functions
__init__(session)

Initialize artifact repository with database session.

Source code in src/chapkit/artifact/repository.py
def __init__(self, session: AsyncSession) -> None:
    """Initialize artifact repository with database session."""
    super().__init__(session, Artifact)
find_by_id(id) async

Find an artifact by ID with children eagerly loaded.

Source code in src/chapkit/artifact/repository.py
async def find_by_id(self, id: ULID) -> Artifact | None:
    """Find an artifact by ID with children eagerly loaded."""
    return await self.s.get(self.model, id, options=[selectinload(self.model.children)])
find_subtree(start_id) async

Find all artifacts in the subtree rooted at the given ID using recursive CTE.

Source code in src/chapkit/artifact/repository.py
async def find_subtree(self, start_id: ULID) -> Iterable[Artifact]:
    """Find all artifacts in the subtree rooted at the given ID using recursive CTE."""
    cte = select(self.model.id).where(self.model.id == start_id).cte(name="descendants", recursive=True)
    cte = cte.union_all(select(self.model.id).where(self.model.parent_id == cte.c.id))

    subtree_ids = (await self.s.scalars(select(cte.c.id))).all()
    rows = (await self.s.scalars(select(self.model).where(self.model.id.in_(subtree_ids)))).all()
    return rows
get_root_artifact(artifact_id) async

Find the root artifact by traversing up the parent chain.

Source code in src/chapkit/artifact/repository.py
async def get_root_artifact(self, artifact_id: ULID) -> Artifact | None:
    """Find the root artifact by traversing up the parent chain."""
    artifact = await self.s.get(self.model, artifact_id)
    if artifact is None:
        return None

    while artifact.parent_id is not None:
        parent = await self.s.get(self.model, artifact.parent_id)
        if parent is None:
            break
        artifact = parent

    return artifact

Manager

manager

Artifact manager for hierarchical data with parent-child relationships.

Classes

ArtifactManager

Bases: BaseManager[Artifact, ArtifactIn, ArtifactOut, ULID]

Manager for Artifact entities with hierarchical tree operations.

Source code in src/chapkit/artifact/manager.py
class ArtifactManager(BaseManager[Artifact, ArtifactIn, ArtifactOut, ULID]):
    """Manager for Artifact entities with hierarchical tree operations."""

    def __init__(
        self,
        repo: ArtifactRepository,
        hierarchy: ArtifactHierarchy | None = None,
    ) -> None:
        """Initialize artifact manager with repository and optional hierarchy."""
        super().__init__(repo, Artifact, ArtifactOut)
        self.repository: ArtifactRepository = repo
        self.hierarchy = hierarchy

    # Public API ------------------------------------------------------

    async def find_subtree(self, start_id: ULID) -> list[ArtifactTreeNode]:
        """Find all artifacts in the subtree rooted at the given ID."""
        artifacts = await self.repository.find_subtree(start_id)
        return [self._to_tree_node(artifact) for artifact in artifacts]

    async def expand_artifact(self, artifact_id: ULID) -> ArtifactTreeNode | None:
        """Expand a single artifact with hierarchy metadata but without children."""
        artifact = await self.repository.find_by_id(artifact_id)
        if artifact is None:
            return None

        node = self._to_tree_node(artifact)
        node.children = None

        return node

    async def build_tree(self, start_id: ULID) -> ArtifactTreeNode | None:
        """Build a hierarchical tree structure rooted at the given artifact ID."""
        artifacts = await self.find_subtree(start_id)
        if not artifacts:
            return None

        node_map: dict[ULID, ArtifactTreeNode] = {}
        for node in artifacts:
            node.children = []
            node_map[node.id] = node

        for node in artifacts:
            if node.parent_id is None:
                continue
            parent = node_map.get(node.parent_id)
            if parent is None:
                continue
            if parent.children is None:
                parent.children = []
            parent.children.append(node)

        # Keep children as [] for leaf nodes (semantic: "loaded but empty")
        # Only expand_artifact sets children=None (semantic: "not loaded")

        root = node_map.get(start_id)

        return root

    # Lifecycle overrides --------------------------------------------

    def _should_assign_field(self, field: str, value: object) -> bool:
        """Prevent assigning None to level field during updates."""
        if field == "level" and value is None:
            return False
        return super()._should_assign_field(field, value)

    async def pre_save(self, entity: Artifact, data: ArtifactIn) -> None:
        """Compute and set artifact level before saving."""
        entity.level = await self._compute_level(entity.parent_id)

    async def pre_update(self, entity: Artifact, data: ArtifactIn, old_values: dict[str, object]) -> None:
        """Recalculate artifact level and cascade updates to descendants if parent changed."""
        previous_level = old_values.get("level", entity.level)
        entity.level = await self._compute_level(entity.parent_id)
        parent_changed = old_values.get("parent_id") != entity.parent_id
        if parent_changed or previous_level != entity.level:
            await self._recalculate_descendants(entity)

    # Helper utilities ------------------------------------------------

    async def _compute_level(self, parent_id: ULID | None) -> int:
        """Compute the level of an artifact based on its parent."""
        if parent_id is None:
            return 0
        parent = await self.repository.find_by_id(parent_id)
        if parent is None:
            return 0  # pragma: no cover
        return parent.level + 1

    async def _recalculate_descendants(self, entity: Artifact) -> None:
        """Recalculate levels for all descendants of an artifact."""
        subtree = await self.repository.find_subtree(entity.id)
        by_parent: dict[ULID | None, list[Artifact]] = {}
        for node in subtree:
            by_parent.setdefault(node.parent_id, []).append(node)

        queue: deque[Artifact] = deque([entity])
        while queue:
            current = queue.popleft()
            for child in by_parent.get(current.id, []):
                child.level = current.level + 1
                queue.append(child)

    def _to_tree_node(self, entity: Artifact) -> ArtifactTreeNode:
        """Convert artifact entity to tree node with hierarchy metadata."""
        base = super()._to_output_schema(entity)
        node = ArtifactTreeNode.from_artifact(base)
        if self.hierarchy is not None:
            meta = self.hierarchy.describe(node.level)
            hierarchy_value = meta.get(self.hierarchy.hierarchy_key)
            if hierarchy_value is not None:
                node.hierarchy = str(hierarchy_value)
            label_value = meta.get(self.hierarchy.label_key)
            if label_value is not None:
                node.level_label = str(label_value)

        return node
Functions
__init__(repo, hierarchy=None)

Initialize artifact manager with repository and optional hierarchy.

Source code in src/chapkit/artifact/manager.py
def __init__(
    self,
    repo: ArtifactRepository,
    hierarchy: ArtifactHierarchy | None = None,
) -> None:
    """Initialize artifact manager with repository and optional hierarchy."""
    super().__init__(repo, Artifact, ArtifactOut)
    self.repository: ArtifactRepository = repo
    self.hierarchy = hierarchy
find_subtree(start_id) async

Find all artifacts in the subtree rooted at the given ID.

Source code in src/chapkit/artifact/manager.py
async def find_subtree(self, start_id: ULID) -> list[ArtifactTreeNode]:
    """Find all artifacts in the subtree rooted at the given ID."""
    artifacts = await self.repository.find_subtree(start_id)
    return [self._to_tree_node(artifact) for artifact in artifacts]
expand_artifact(artifact_id) async

Expand a single artifact with hierarchy metadata but without children.

Source code in src/chapkit/artifact/manager.py
async def expand_artifact(self, artifact_id: ULID) -> ArtifactTreeNode | None:
    """Expand a single artifact with hierarchy metadata but without children."""
    artifact = await self.repository.find_by_id(artifact_id)
    if artifact is None:
        return None

    node = self._to_tree_node(artifact)
    node.children = None

    return node
build_tree(start_id) async

Build a hierarchical tree structure rooted at the given artifact ID.

Source code in src/chapkit/artifact/manager.py
async def build_tree(self, start_id: ULID) -> ArtifactTreeNode | None:
    """Build a hierarchical tree structure rooted at the given artifact ID."""
    artifacts = await self.find_subtree(start_id)
    if not artifacts:
        return None

    node_map: dict[ULID, ArtifactTreeNode] = {}
    for node in artifacts:
        node.children = []
        node_map[node.id] = node

    for node in artifacts:
        if node.parent_id is None:
            continue
        parent = node_map.get(node.parent_id)
        if parent is None:
            continue
        if parent.children is None:
            parent.children = []
        parent.children.append(node)

    # Keep children as [] for leaf nodes (semantic: "loaded but empty")
    # Only expand_artifact sets children=None (semantic: "not loaded")

    root = node_map.get(start_id)

    return root
pre_save(entity, data) async

Compute and set artifact level before saving.

Source code in src/chapkit/artifact/manager.py
async def pre_save(self, entity: Artifact, data: ArtifactIn) -> None:
    """Compute and set artifact level before saving."""
    entity.level = await self._compute_level(entity.parent_id)
pre_update(entity, data, old_values) async

Recalculate artifact level and cascade updates to descendants if parent changed.

Source code in src/chapkit/artifact/manager.py
async def pre_update(self, entity: Artifact, data: ArtifactIn, old_values: dict[str, object]) -> None:
    """Recalculate artifact level and cascade updates to descendants if parent changed."""
    previous_level = old_values.get("level", entity.level)
    entity.level = await self._compute_level(entity.parent_id)
    parent_changed = old_values.get("parent_id") != entity.parent_id
    if parent_changed or previous_level != entity.level:
        await self._recalculate_descendants(entity)

Router

router

Artifact CRUD router with hierarchical tree operations.

Classes

ArtifactRouter

Bases: CrudRouter[ArtifactIn, ArtifactOut]

CRUD router for Artifact entities with tree operations.

Source code in src/chapkit/artifact/router.py
class ArtifactRouter(CrudRouter[ArtifactIn, ArtifactOut]):
    """CRUD router for Artifact entities with tree operations."""

    def __init__(
        self,
        prefix: str,
        tags: Sequence[str],
        manager_factory: Any,
        entity_in_type: type[ArtifactIn],
        entity_out_type: type[ArtifactOut],
        permissions: CrudPermissions | None = None,
        enable_config_access: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize artifact router with entity types and manager factory."""
        # Store enable_config_access to conditionally register config endpoint
        self.enable_config_access = enable_config_access

        super().__init__(
            prefix=prefix,
            tags=list(tags),
            entity_in_type=entity_in_type,
            entity_out_type=entity_out_type,
            manager_factory=manager_factory,
            permissions=permissions,
            **kwargs,
        )

    def _register_routes(self) -> None:
        """Register artifact CRUD routes and tree operations."""
        super()._register_routes()

        manager_factory = self.manager_factory

        async def expand_artifact(
            entity_id: str,
            manager: ArtifactManager = Depends(manager_factory),
        ) -> ArtifactTreeNode:
            ulid_id = self._parse_ulid(entity_id)

            expanded = await manager.expand_artifact(ulid_id)
            if expanded is None:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail=f"Artifact with id {entity_id} not found",
                )
            return expanded

        async def build_tree(
            entity_id: str,
            manager: ArtifactManager = Depends(manager_factory),
        ) -> ArtifactTreeNode:
            ulid_id = self._parse_ulid(entity_id)

            tree = await manager.build_tree(ulid_id)
            if tree is None:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail=f"Artifact with id {entity_id} not found",
                )
            return tree

        self.register_entity_operation(
            "expand",
            expand_artifact,
            response_model=ArtifactTreeNode,
            summary="Expand artifact",
            description="Get artifact with hierarchy metadata but without children",
        )

        self.register_entity_operation(
            "tree",
            build_tree,
            response_model=ArtifactTreeNode,
            summary="Build artifact tree",
            description="Build hierarchical tree structure rooted at the given artifact",
        )

        # Conditionally register config access endpoint
        if self.enable_config_access:
            from ..api.dependencies import get_config_manager
            from ..config.manager import ConfigManager

            async def get_config(
                entity_id: str,
                artifact_manager: ArtifactManager = Depends(manager_factory),
                config_manager: ConfigManager[BaseConfig] = Depends(get_config_manager),
            ) -> ConfigOut[BaseConfig]:
                """Get the config linked to this artifact."""
                ulid_id = self._parse_ulid(entity_id)

                # Get config by traversing to root artifact
                config = await config_manager.get_config_for_artifact(
                    artifact_id=ulid_id, artifact_repo=artifact_manager.repository
                )

                if config is None:
                    raise HTTPException(
                        status_code=status.HTTP_404_NOT_FOUND,
                        detail=f"No config linked to artifact {entity_id}",
                    )

                return config

            self.register_entity_operation(
                "config",
                get_config,
                response_model=ConfigOut[BaseConfig],
                summary="Get artifact config",
                description="Get configuration linked to this artifact by traversing to root",
            )
Functions
__init__(prefix, tags, manager_factory, entity_in_type, entity_out_type, permissions=None, enable_config_access=False, **kwargs)

Initialize artifact router with entity types and manager factory.

Source code in src/chapkit/artifact/router.py
def __init__(
    self,
    prefix: str,
    tags: Sequence[str],
    manager_factory: Any,
    entity_in_type: type[ArtifactIn],
    entity_out_type: type[ArtifactOut],
    permissions: CrudPermissions | None = None,
    enable_config_access: bool = False,
    **kwargs: Any,
) -> None:
    """Initialize artifact router with entity types and manager factory."""
    # Store enable_config_access to conditionally register config endpoint
    self.enable_config_access = enable_config_access

    super().__init__(
        prefix=prefix,
        tags=list(tags),
        entity_in_type=entity_in_type,
        entity_out_type=entity_out_type,
        manager_factory=manager_factory,
        permissions=permissions,
        **kwargs,
    )

Task Module

Reusable command templates for shell and Python task execution.

Models

models

Task ORM model for reusable command templates.

Classes

Task

Bases: Entity

ORM model for reusable task templates containing commands to execute.

Source code in src/chapkit/task/models.py
class Task(Entity):
    """ORM model for reusable task templates containing commands to execute."""

    __tablename__ = "tasks"

    command: Mapped[str] = mapped_column(Text, nullable=False)
    task_type: Mapped[str] = mapped_column(Text, nullable=False, default="shell", server_default="shell")
    parameters: Mapped[dict | None] = mapped_column(JSON, nullable=True)
    enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True, server_default="1")

Schemas

schemas

Task schemas for reusable command templates.

Classes

TaskIn

Bases: EntityIn

Input schema for creating or updating task templates.

Source code in src/chapkit/task/schemas.py
class TaskIn(EntityIn):
    """Input schema for creating or updating task templates."""

    command: str = Field(description="Shell command or Python function name to execute")
    task_type: Literal["shell", "python"] = Field(default="shell", description="Type of task: 'shell' or 'python'")
    parameters: dict[str, Any] | None = Field(
        default=None, description="Parameters to pass to Python function (ignored for shell tasks)"
    )
    enabled: bool = Field(default=True, description="Whether task is enabled for execution")

TaskOut

Bases: EntityOut

Output schema for task template entities.

Source code in src/chapkit/task/schemas.py
class TaskOut(EntityOut):
    """Output schema for task template entities."""

    command: str = Field(description="Shell command or Python function name to execute")
    task_type: str = Field(description="Type of task: 'shell' or 'python'")
    parameters: dict[str, Any] | None = Field(default=None, description="Parameters to pass to Python function")
    enabled: bool = Field(description="Whether task is enabled for execution")

Repository

repository

Task repository for database access and querying.

Classes

TaskRepository

Bases: BaseRepository[Task, ULID]

Repository for Task template entities.

Source code in src/chapkit/task/repository.py
class TaskRepository(BaseRepository[Task, ULID]):
    """Repository for Task template entities."""

    def __init__(self, session: AsyncSession) -> None:
        """Initialize task repository with database session."""
        super().__init__(session, Task)

    async def find_by_enabled(self, enabled: bool) -> list[Task]:
        """Find all tasks by enabled status."""
        stmt = select(Task).where(Task.enabled == enabled).order_by(Task.created_at.desc())
        result = await self.s.execute(stmt)
        return list(result.scalars().all())

    async def find_all(self, *, enabled: bool | None = None) -> list[Task]:
        """Find all tasks, optionally filtered by enabled status."""
        if enabled is None:
            result = await super().find_all()
            return list(result)
        return await self.find_by_enabled(enabled)
Functions
__init__(session)

Initialize task repository with database session.

Source code in src/chapkit/task/repository.py
def __init__(self, session: AsyncSession) -> None:
    """Initialize task repository with database session."""
    super().__init__(session, Task)
find_by_enabled(enabled) async

Find all tasks by enabled status.

Source code in src/chapkit/task/repository.py
async def find_by_enabled(self, enabled: bool) -> list[Task]:
    """Find all tasks by enabled status."""
    stmt = select(Task).where(Task.enabled == enabled).order_by(Task.created_at.desc())
    result = await self.s.execute(stmt)
    return list(result.scalars().all())
find_all(*, enabled=None) async

Find all tasks, optionally filtered by enabled status.

Source code in src/chapkit/task/repository.py
async def find_all(self, *, enabled: bool | None = None) -> list[Task]:
    """Find all tasks, optionally filtered by enabled status."""
    if enabled is None:
        result = await super().find_all()
        return list(result)
    return await self.find_by_enabled(enabled)

Manager

manager

Task manager for reusable command templates with artifact-based execution results.

Classes

TaskManager

Bases: BaseManager[Task, TaskIn, TaskOut, ULID]

Manager for Task template entities with artifact-based execution.

Source code in src/chapkit/task/manager.py
class TaskManager(BaseManager[Task, TaskIn, TaskOut, ULID]):
    """Manager for Task template entities with artifact-based execution."""

    def __init__(
        self,
        repo: TaskRepository,
        scheduler: ChapkitJobScheduler | None = None,
        database: Database | None = None,
        artifact_manager: ArtifactManager | None = None,
    ) -> None:
        """Initialize task manager with repository, scheduler, database, and artifact manager."""
        super().__init__(repo, Task, TaskOut)
        self.repository: TaskRepository = repo
        self.scheduler = scheduler
        self.database = database
        self.artifact_manager = artifact_manager

    async def find_all(self, *, enabled: bool | None = None) -> list[TaskOut]:
        """Find all tasks, optionally filtered by enabled status."""
        tasks = await self.repository.find_all(enabled=enabled)
        return [self._to_output_schema(task) for task in tasks]

    def _is_injectable_type(self, param_type: type | None) -> bool:
        """Check if a parameter type should be injected by the framework."""
        if param_type is None:
            return False

        # Handle Optional[Type] -> extract the non-None type
        origin = get_origin(param_type)
        if origin is types.UnionType or origin is Union:  # Union type (both syntaxes)
            # For Optional types, we still want to inject if the non-None type is injectable
            # This allows Optional[AsyncSession] to work
            args = getattr(param_type, "__args__", ())
            non_none_types = [arg for arg in args if arg is not type(None)]
            if len(non_none_types) == 1:
                param_type = non_none_types[0]

        # Check if type is in injectable set
        return param_type in INJECTABLE_TYPES

    def _build_injection_map(self, task_id: ULID, session: AsyncSession | None) -> dict[type, Any]:
        """Build map of injectable types to their instances."""
        return {
            AsyncSession: session,
            Database: self.database,
            ArtifactManager: self.artifact_manager,
            ChapkitJobScheduler: self.scheduler,
        }

    def _inject_parameters(
        self, func: Any, user_params: dict[str, Any], task_id: ULID, session: AsyncSession | None
    ) -> dict[str, Any]:
        """Merge user parameters with framework injections based on function signature."""
        sig = inspect.signature(func)
        type_hints = get_type_hints(func)

        # Build injection map
        injection_map = self._build_injection_map(task_id, session)

        # Start with user parameters
        final_params = dict(user_params)

        # Inspect each parameter in function signature
        for param_name, param in sig.parameters.items():
            # Skip self, *args, **kwargs
            if param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD):
                continue

            # Get type hint for this parameter
            param_type = type_hints.get(param_name)

            # Check if this type should be injected
            if self._is_injectable_type(param_type):
                # Get the actual type (handle Optional)
                actual_type = param_type
                origin = get_origin(param_type)
                if origin is types.UnionType or origin is Union:
                    args = getattr(param_type, "__args__", ())
                    non_none_types = [arg for arg in args if arg is not type(None)]
                    if non_none_types:
                        actual_type = non_none_types[0]

                # Inject if we have an instance of this type
                if actual_type in injection_map:
                    injectable_value = injection_map[actual_type]
                    # For required parameters, inject even if None
                    # For optional parameters, only inject if not None
                    if param.default is param.empty:
                        # Required parameter - inject whatever we have (even None)
                        final_params[param_name] = injectable_value
                    elif injectable_value is not None:
                        # Optional parameter - only inject if we have a value
                        final_params[param_name] = injectable_value
                continue

            # Not injectable - must come from user parameters
            if param_name not in final_params:
                # Check if parameter has a default value
                if param.default is not param.empty:
                    continue  # Will use default

                # Required parameter missing
                raise ValueError(
                    f"Missing required parameter '{param_name}' for task function. "
                    f"Parameter is not injectable and not provided in task.parameters."
                )

        return final_params

    async def execute_task(self, task_id: ULID) -> ULID:
        """Execute a task by submitting it to the scheduler and return the job ID."""
        if self.scheduler is None:
            raise ValueError("Task execution requires a scheduler. Use ServiceBuilder.with_jobs() to enable.")

        if self.artifact_manager is None:
            raise ValueError(
                "Task execution requires artifacts. Use ServiceBuilder.with_artifacts() before with_tasks()."
            )

        task = await self.repository.find_by_id(task_id)
        if task is None:
            raise ValueError(f"Task {task_id} not found")

        # Check if task is enabled
        if not task.enabled:
            raise ValueError(f"Cannot execute disabled task {task_id}")

        # Route based on task type
        if task.task_type == "python":
            job_id = await self.scheduler.add_job(self._execute_python, task_id)
        else:  # shell
            job_id = await self.scheduler.add_job(self._execute_command, task_id)

        return job_id

    async def _execute_command(self, task_id: ULID) -> ULID:
        """Execute command and return artifact_id containing results."""
        if self.database is None:
            raise RuntimeError("Database instance required for task execution")

        if self.artifact_manager is None:
            raise RuntimeError("ArtifactManager instance required for task execution")

        # Fetch task and serialize snapshot before execution
        async with self.database.session() as session:
            task_repo = TaskRepository(session)
            task = await task_repo.find_by_id(task_id)
            if task is None:
                raise ValueError(f"Task {task_id} not found")

            # Capture task snapshot
            task_snapshot = {
                "id": str(task.id),
                "command": task.command,
                "created_at": task.created_at.isoformat(),
                "updated_at": task.updated_at.isoformat(),
            }

        # Execute command using asyncio subprocess
        process = await asyncio.create_subprocess_shell(
            task.command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )

        # Wait for completion and capture output
        stdout_bytes, stderr_bytes = await process.communicate()

        # Decode outputs
        stdout_text = stdout_bytes.decode("utf-8") if stdout_bytes else ""
        stderr_text = stderr_bytes.decode("utf-8") if stderr_bytes else ""

        # Create artifact with execution results
        result_data: dict[str, Any] = {
            "task": task_snapshot,
            "stdout": stdout_text,
            "stderr": stderr_text,
            "exit_code": process.returncode,
        }

        async with self.database.session() as session:
            artifact_repo = ArtifactRepository(session)
            artifact_mgr = ArtifactManager(artifact_repo)

            artifact_out = await artifact_mgr.save(
                ArtifactIn(
                    data=result_data,
                    parent_id=None,
                )
            )

        return artifact_out.id

    async def _execute_python(self, task_id: ULID) -> ULID:
        """Execute Python function and return artifact_id containing results."""
        if self.database is None:
            raise RuntimeError("Database instance required for task execution")

        if self.artifact_manager is None:
            raise RuntimeError("ArtifactManager instance required for task execution")

        # Create a database session for potential injection
        session_context = self.database.session()
        session = await session_context.__aenter__()

        try:
            # Fetch task and serialize snapshot
            task_repo = TaskRepository(session)
            task = await task_repo.find_by_id(task_id)
            if task is None:
                raise ValueError(f"Task {task_id} not found")

            # Capture task snapshot
            task_snapshot = {
                "id": str(task.id),
                "command": task.command,
                "task_type": task.task_type,
                "parameters": task.parameters,
                "created_at": task.created_at.isoformat(),
                "updated_at": task.updated_at.isoformat(),
            }

            # Get function from registry
            try:
                func = TaskRegistry.get(task.command)
            except KeyError:
                raise ValueError(f"Python function '{task.command}' not found in registry")

            # Execute function with type-based injection
            result_data: dict[str, Any]
            try:
                user_params = task.parameters or {}

                # Inject framework dependencies based on function signature
                final_params = self._inject_parameters(func, user_params, task_id, session)

                # Handle sync/async functions
                if inspect.iscoroutinefunction(func):
                    result = await func(**final_params)
                else:
                    result = await asyncio.to_thread(func, **final_params)

                result_data = {
                    "task": task_snapshot,
                    "result": result,
                    "error": None,
                }
            except Exception as e:
                result_data = {
                    "task": task_snapshot,
                    "result": None,
                    "error": {
                        "type": type(e).__name__,
                        "message": str(e),
                        "traceback": traceback.format_exc(),
                    },
                }
        finally:
            # Always close the session
            await session_context.__aexit__(None, None, None)

        # Create artifact (with a new session)
        async with self.database.session() as artifact_session:
            artifact_repo = ArtifactRepository(artifact_session)
            artifact_mgr = ArtifactManager(artifact_repo)
            artifact_out = await artifact_mgr.save(ArtifactIn(data=result_data, parent_id=None))

        return artifact_out.id
Functions
__init__(repo, scheduler=None, database=None, artifact_manager=None)

Initialize task manager with repository, scheduler, database, and artifact manager.

Source code in src/chapkit/task/manager.py
def __init__(
    self,
    repo: TaskRepository,
    scheduler: ChapkitJobScheduler | None = None,
    database: Database | None = None,
    artifact_manager: ArtifactManager | None = None,
) -> None:
    """Initialize task manager with repository, scheduler, database, and artifact manager."""
    super().__init__(repo, Task, TaskOut)
    self.repository: TaskRepository = repo
    self.scheduler = scheduler
    self.database = database
    self.artifact_manager = artifact_manager
find_all(*, enabled=None) async

Find all tasks, optionally filtered by enabled status.

Source code in src/chapkit/task/manager.py
async def find_all(self, *, enabled: bool | None = None) -> list[TaskOut]:
    """Find all tasks, optionally filtered by enabled status."""
    tasks = await self.repository.find_all(enabled=enabled)
    return [self._to_output_schema(task) for task in tasks]
execute_task(task_id) async

Execute a task by submitting it to the scheduler and return the job ID.

Source code in src/chapkit/task/manager.py
async def execute_task(self, task_id: ULID) -> ULID:
    """Execute a task by submitting it to the scheduler and return the job ID."""
    if self.scheduler is None:
        raise ValueError("Task execution requires a scheduler. Use ServiceBuilder.with_jobs() to enable.")

    if self.artifact_manager is None:
        raise ValueError(
            "Task execution requires artifacts. Use ServiceBuilder.with_artifacts() before with_tasks()."
        )

    task = await self.repository.find_by_id(task_id)
    if task is None:
        raise ValueError(f"Task {task_id} not found")

    # Check if task is enabled
    if not task.enabled:
        raise ValueError(f"Cannot execute disabled task {task_id}")

    # Route based on task type
    if task.task_type == "python":
        job_id = await self.scheduler.add_job(self._execute_python, task_id)
    else:  # shell
        job_id = await self.scheduler.add_job(self._execute_command, task_id)

    return job_id

Router

router

Task CRUD router with execution operation.

Classes

TaskExecuteResponse

Bases: BaseModel

Response schema for task execution.

Source code in src/chapkit/task/router.py
class TaskExecuteResponse(BaseModel):
    """Response schema for task execution."""

    job_id: str = Field(description="ID of the scheduler job")
    message: str = Field(description="Human-readable message")

TaskRouter

Bases: CrudRouter[TaskIn, TaskOut]

CRUD router for Task entities with execution operation.

Source code in src/chapkit/task/router.py
class TaskRouter(CrudRouter[TaskIn, TaskOut]):
    """CRUD router for Task entities with execution operation."""

    def __init__(
        self,
        prefix: str,
        tags: Sequence[str],
        manager_factory: Any,
        entity_in_type: type[TaskIn],
        entity_out_type: type[TaskOut],
        permissions: CrudPermissions | None = None,
        **kwargs: Any,
    ) -> None:
        """Initialize task router with entity types and manager factory."""
        super().__init__(
            prefix=prefix,
            tags=list(tags),
            entity_in_type=entity_in_type,
            entity_out_type=entity_out_type,
            manager_factory=manager_factory,
            permissions=permissions,
            **kwargs,
        )

    def _register_find_all_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        """Register find all route with enabled filtering support."""
        entity_out_annotation: Any = self.entity_out_type
        collection_response_model: Any = list[entity_out_annotation] | PaginatedResponse[entity_out_annotation]

        @self.router.get("", response_model=collection_response_model)
        async def find_all(
            page: int | None = None,
            size: int | None = None,
            enabled: bool | None = Query(None, description="Filter by enabled status"),
            manager: Manager[TaskIn, TaskOut, ULID] = manager_dependency,
        ) -> list[TaskOut] | PaginatedResponse[TaskOut]:
            from servicekit.api.pagination import create_paginated_response

            # Pagination is opt-in: both page and size must be provided
            if page is not None and size is not None:
                items, total = await manager.find_paginated(page, size)
                return create_paginated_response(items, total, page, size)

            # Use TaskRepository's find_all with enabled filtering
            # Cast manager to access repository with enabled parameter
            task_manager = manager  # TaskManager with TaskRepository
            return await task_manager.find_all(enabled=enabled)  # type: ignore[call-arg]

        self._annotate_manager(find_all, manager_annotation)
        find_all.__annotations__["return"] = list[entity_out_annotation] | PaginatedResponse[entity_out_annotation]

    def _register_routes(self) -> None:
        """Register task CRUD routes and execution operation."""
        super()._register_routes()

        manager_factory = self.manager_factory

        async def execute_task(
            entity_id: str,
            manager: TaskManager = Depends(manager_factory),
        ) -> TaskExecuteResponse:
            """Execute a task asynchronously via the job scheduler."""
            task_id = self._parse_ulid(entity_id)

            try:
                job_id = await manager.execute_task(task_id)
                return TaskExecuteResponse(
                    job_id=str(job_id),
                    message=f"Task submitted for execution. Job ID: {job_id}",
                )
            except ValueError as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )
            except RuntimeError as e:
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail=str(e),
                )

        self.register_entity_operation(
            "execute",
            execute_task,
            http_method="POST",
            response_model=TaskExecuteResponse,
            status_code=status.HTTP_202_ACCEPTED,
            summary="Execute task",
            description="Submit the task to the scheduler for execution",
        )
Functions
__init__(prefix, tags, manager_factory, entity_in_type, entity_out_type, permissions=None, **kwargs)

Initialize task router with entity types and manager factory.

Source code in src/chapkit/task/router.py
def __init__(
    self,
    prefix: str,
    tags: Sequence[str],
    manager_factory: Any,
    entity_in_type: type[TaskIn],
    entity_out_type: type[TaskOut],
    permissions: CrudPermissions | None = None,
    **kwargs: Any,
) -> None:
    """Initialize task router with entity types and manager factory."""
    super().__init__(
        prefix=prefix,
        tags=list(tags),
        entity_in_type=entity_in_type,
        entity_out_type=entity_out_type,
        manager_factory=manager_factory,
        permissions=permissions,
        **kwargs,
    )

Registry

registry

Global registry for Python task functions.

Classes

TaskRegistry

Global registry for Python task functions.

Source code in src/chapkit/task/registry.py
class TaskRegistry:
    """Global registry for Python task functions."""

    _registry: dict[str, Callable[..., Any]] = {}

    @classmethod
    def register(cls, name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
        """Decorator to register a task function with support for type-based dependency injection."""

        def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
            if name in cls._registry:
                raise ValueError(f"Task '{name}' already registered")
            cls._registry[name] = func
            return func

        return decorator

    @classmethod
    def register_function(cls, name: str, func: Callable[..., Any]) -> None:
        """Imperatively register a task function."""
        if name in cls._registry:
            raise ValueError(f"Task '{name}' already registered")
        cls._registry[name] = func

    @classmethod
    def get(cls, name: str) -> Callable[..., Any]:
        """Retrieve a registered task function."""
        if name not in cls._registry:
            raise KeyError(f"Task '{name}' not found in registry")
        return cls._registry[name]

    @classmethod
    def list_all(cls) -> list[str]:
        """List all registered task names."""
        return sorted(cls._registry.keys())

    @classmethod
    def clear(cls) -> None:
        """Clear all registered tasks (useful for testing)."""
        cls._registry.clear()
Functions
register(name) classmethod

Decorator to register a task function with support for type-based dependency injection.

Source code in src/chapkit/task/registry.py
@classmethod
def register(cls, name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator to register a task function with support for type-based dependency injection."""

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        if name in cls._registry:
            raise ValueError(f"Task '{name}' already registered")
        cls._registry[name] = func
        return func

    return decorator
register_function(name, func) classmethod

Imperatively register a task function.

Source code in src/chapkit/task/registry.py
@classmethod
def register_function(cls, name: str, func: Callable[..., Any]) -> None:
    """Imperatively register a task function."""
    if name in cls._registry:
        raise ValueError(f"Task '{name}' already registered")
    cls._registry[name] = func
get(name) classmethod

Retrieve a registered task function.

Source code in src/chapkit/task/registry.py
@classmethod
def get(cls, name: str) -> Callable[..., Any]:
    """Retrieve a registered task function."""
    if name not in cls._registry:
        raise KeyError(f"Task '{name}' not found in registry")
    return cls._registry[name]
list_all() classmethod

List all registered task names.

Source code in src/chapkit/task/registry.py
@classmethod
def list_all(cls) -> list[str]:
    """List all registered task names."""
    return sorted(cls._registry.keys())
clear() classmethod

Clear all registered tasks (useful for testing).

Source code in src/chapkit/task/registry.py
@classmethod
def clear(cls) -> None:
    """Clear all registered tasks (useful for testing)."""
    cls._registry.clear()

Validation

validation

Task validation utilities for detecting orphaned Python tasks.

Classes

Functions

validate_and_disable_orphaned_tasks(app) async

Validate Python tasks and disable orphaned ones that reference missing functions.

Source code in src/chapkit/task/validation.py
async def validate_and_disable_orphaned_tasks(app: FastAPI) -> int:
    """Validate Python tasks and disable orphaned ones that reference missing functions."""
    database: Database | None = getattr(app.state, "database", None)
    if database is None:
        logger.debug("No database configured, skipping task validation")
        return 0

    disabled_count = 0

    async with database.session() as session:
        task_repo = TaskRepository(session)
        task_manager = TaskManager(task_repo, scheduler=None, database=None, artifact_manager=None)

        # Get all tasks
        all_tasks = await task_manager.find_all()

        # Get registered function names
        registered_functions = set(TaskRegistry.list_all())

        # Find orphaned Python tasks
        orphaned_tasks = [
            task for task in all_tasks if task.task_type == "python" and task.command not in registered_functions
        ]

        if orphaned_tasks:
            logger.warning(
                "Found orphaned Python tasks - disabling them",
                extra={
                    "count": len(orphaned_tasks),
                    "task_ids": [str(task.id) for task in orphaned_tasks],
                    "commands": [task.command for task in orphaned_tasks],
                },
            )

            # Disable each orphaned task
            for task in orphaned_tasks:
                logger.info(
                    f"Disabling orphaned task {task.id}: function '{task.command}' not found in registry",
                    extra={"task_id": str(task.id), "command": task.command, "task_type": task.task_type},
                )

                # Create TaskIn with enabled=False
                task_type_value = task.task_type if task.task_type in ("shell", "python") else "shell"
                task_in = TaskIn(
                    id=task.id,
                    command=task.command,
                    task_type=task_type_value,  # type: ignore[arg-type]
                    parameters=task.parameters,
                    enabled=False,
                )
                await task_manager.save(task_in)
                disabled_count += 1

    if disabled_count > 0:
        logger.warning(f"Disabled {disabled_count} orphaned Python task(s)")
    else:
        logger.debug("No orphaned Python tasks found")

    return disabled_count

Config Module

Key-value configuration storage with Pydantic schema validation.

Models

models

Config ORM models for key-value configuration storage and artifact linking.

Classes

Config

Bases: Entity

ORM model for configuration with JSON data storage.

Source code in src/chapkit/config/models.py
class Config(Entity):
    """ORM model for configuration with JSON data storage."""

    __tablename__ = "configs"

    name: Mapped[str] = mapped_column(index=True)
    _data_json: Mapped[dict[str, Any]] = mapped_column("data", JSON, nullable=False)

    @property
    def data(self) -> dict[str, Any]:
        """Return JSON data as dict."""
        return self._data_json

    @data.setter
    def data(self, value: BaseConfig | dict[str, Any]) -> None:
        """Serialize Pydantic model to JSON or store dict directly."""
        if isinstance(value, dict):
            self._data_json = value
        elif hasattr(value, "model_dump") and callable(value.model_dump):
            # BaseConfig or other Pydantic model
            self._data_json = value.model_dump(mode="json")
        else:
            raise TypeError(f"data must be a BaseConfig subclass or dict, got {type(value)}")
Attributes
data property writable

Return JSON data as dict.

ConfigArtifact

Bases: Base

Junction table linking Configs to root Artifacts.

Source code in src/chapkit/config/models.py
class ConfigArtifact(Base):
    """Junction table linking Configs to root Artifacts."""

    __tablename__ = "config_artifacts"

    config_id: Mapped[ULID] = mapped_column(
        ULIDType,
        ForeignKey("configs.id", ondelete="CASCADE"),
        primary_key=True,
    )

    artifact_id: Mapped[ULID] = mapped_column(
        ULIDType,
        ForeignKey("artifacts.id", ondelete="CASCADE"),
        primary_key=True,
        unique=True,
    )

    __table_args__ = (UniqueConstraint("artifact_id", name="uq_artifact_id"),)

Schemas

schemas

Config schemas for key-value configuration with JSON data.

Classes

BaseConfig

Bases: BaseModel

Base class for configuration schemas with arbitrary extra fields allowed.

Source code in src/chapkit/config/schemas.py
class BaseConfig(BaseModel):
    """Base class for configuration schemas with arbitrary extra fields allowed."""

    model_config = {"extra": "allow"}

ConfigIn

Bases: EntityIn

Input schema for creating or updating configurations.

Source code in src/chapkit/config/schemas.py
class ConfigIn[DataT: BaseConfig](EntityIn):
    """Input schema for creating or updating configurations."""

    name: str
    data: DataT

ConfigOut

Bases: EntityOut

Output schema for configuration entities.

Source code in src/chapkit/config/schemas.py
class ConfigOut[DataT: BaseConfig](EntityOut):
    """Output schema for configuration entities."""

    name: str
    data: DataT

    model_config = {"ser_json_timedelta": "float", "ser_json_bytes": "base64"}

    @field_validator("data", mode="before")
    @classmethod
    def convert_dict_to_model(cls, v: Any, info: ValidationInfo) -> Any:
        """Convert dict to BaseConfig model if data_cls is provided in validation context."""
        if isinstance(v, BaseConfig):
            return v
        if isinstance(v, dict):
            if info.context and "data_cls" in info.context:
                data_cls = info.context["data_cls"]
                return data_cls.model_validate(v)
        return v

    @field_serializer("data", when_used="json")
    def serialize_data(self, value: DataT) -> dict[str, Any]:
        """Serialize BaseConfig data to JSON dict."""
        if isinstance(value, BaseConfig):  # pyright: ignore[reportUnnecessaryIsInstance]
            return value.model_dump(mode="json")
        return value
Functions
convert_dict_to_model(v, info) classmethod

Convert dict to BaseConfig model if data_cls is provided in validation context.

Source code in src/chapkit/config/schemas.py
@field_validator("data", mode="before")
@classmethod
def convert_dict_to_model(cls, v: Any, info: ValidationInfo) -> Any:
    """Convert dict to BaseConfig model if data_cls is provided in validation context."""
    if isinstance(v, BaseConfig):
        return v
    if isinstance(v, dict):
        if info.context and "data_cls" in info.context:
            data_cls = info.context["data_cls"]
            return data_cls.model_validate(v)
    return v
serialize_data(value)

Serialize BaseConfig data to JSON dict.

Source code in src/chapkit/config/schemas.py
@field_serializer("data", when_used="json")
def serialize_data(self, value: DataT) -> dict[str, Any]:
    """Serialize BaseConfig data to JSON dict."""
    if isinstance(value, BaseConfig):  # pyright: ignore[reportUnnecessaryIsInstance]
        return value.model_dump(mode="json")
    return value

LinkArtifactRequest

Bases: BaseModel

Request schema for linking an artifact to a config.

Source code in src/chapkit/config/schemas.py
class LinkArtifactRequest(BaseModel):
    """Request schema for linking an artifact to a config."""

    artifact_id: ULID

UnlinkArtifactRequest

Bases: BaseModel

Request schema for unlinking an artifact from a config.

Source code in src/chapkit/config/schemas.py
class UnlinkArtifactRequest(BaseModel):
    """Request schema for unlinking an artifact from a config."""

    artifact_id: ULID

Repository

repository

Config repository for database access and artifact linking.

Classes

ConfigRepository

Bases: BaseRepository[Config, ULID]

Repository for Config entities with artifact linking operations.

Source code in src/chapkit/config/repository.py
class ConfigRepository(BaseRepository[Config, ULID]):
    """Repository for Config entities with artifact linking operations."""

    def __init__(self, session: AsyncSession) -> None:
        """Initialize config repository with database session."""
        super().__init__(session, Config)

    async def find_by_name(self, name: str) -> Config | None:
        """Find a config by its unique name."""
        result = await self.s.scalars(select(self.model).where(self.model.name == name))
        return result.one_or_none()

    async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
        """Link a config to a root artifact."""
        artifact = await self.s.get(Artifact, artifact_id)
        if artifact is None:
            raise ValueError(f"Artifact {artifact_id} not found")
        if artifact.parent_id is not None:
            raise ValueError(f"Artifact {artifact_id} is not a root artifact (parent_id={artifact.parent_id})")

        link = ConfigArtifact(config_id=config_id, artifact_id=artifact_id)
        self.s.add(link)

    async def unlink_artifact(self, artifact_id: ULID) -> None:
        """Unlink an artifact from its config."""
        stmt = sql_delete(ConfigArtifact).where(ConfigArtifact.artifact_id == artifact_id)
        await self.s.execute(stmt)

    async def delete_by_id(self, id: ULID) -> None:
        """Delete a config and cascade delete all linked artifact trees."""
        from chapkit.artifact.repository import ArtifactRepository

        linked_artifacts = await self.find_artifacts_for_config(id)

        artifact_repo = ArtifactRepository(self.s)
        for root_artifact in linked_artifacts:
            subtree = await artifact_repo.find_subtree(root_artifact.id)
            for artifact in subtree:
                await self.s.delete(artifact)

        await super().delete_by_id(id)

    async def find_by_root_artifact_id(self, artifact_id: ULID) -> Config | None:
        """Find the config linked to a root artifact."""
        stmt = (
            select(Config)
            .join(ConfigArtifact, Config.id == ConfigArtifact.config_id)
            .where(ConfigArtifact.artifact_id == artifact_id)
        )
        result = await self.s.scalars(stmt)
        return result.one_or_none()

    async def find_artifacts_for_config(self, config_id: ULID) -> list[Artifact]:
        """Find all root artifacts linked to a config."""
        stmt = (
            select(Artifact)
            .join(ConfigArtifact, Artifact.id == ConfigArtifact.artifact_id)
            .where(ConfigArtifact.config_id == config_id)
        )
        result = await self.s.scalars(stmt)
        return list(result.all())
Functions
__init__(session)

Initialize config repository with database session.

Source code in src/chapkit/config/repository.py
def __init__(self, session: AsyncSession) -> None:
    """Initialize config repository with database session."""
    super().__init__(session, Config)
find_by_name(name) async

Find a config by its unique name.

Source code in src/chapkit/config/repository.py
async def find_by_name(self, name: str) -> Config | None:
    """Find a config by its unique name."""
    result = await self.s.scalars(select(self.model).where(self.model.name == name))
    return result.one_or_none()

Link a config to a root artifact.

Source code in src/chapkit/config/repository.py
async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
    """Link a config to a root artifact."""
    artifact = await self.s.get(Artifact, artifact_id)
    if artifact is None:
        raise ValueError(f"Artifact {artifact_id} not found")
    if artifact.parent_id is not None:
        raise ValueError(f"Artifact {artifact_id} is not a root artifact (parent_id={artifact.parent_id})")

    link = ConfigArtifact(config_id=config_id, artifact_id=artifact_id)
    self.s.add(link)

Unlink an artifact from its config.

Source code in src/chapkit/config/repository.py
async def unlink_artifact(self, artifact_id: ULID) -> None:
    """Unlink an artifact from its config."""
    stmt = sql_delete(ConfigArtifact).where(ConfigArtifact.artifact_id == artifact_id)
    await self.s.execute(stmt)
delete_by_id(id) async

Delete a config and cascade delete all linked artifact trees.

Source code in src/chapkit/config/repository.py
async def delete_by_id(self, id: ULID) -> None:
    """Delete a config and cascade delete all linked artifact trees."""
    from chapkit.artifact.repository import ArtifactRepository

    linked_artifacts = await self.find_artifacts_for_config(id)

    artifact_repo = ArtifactRepository(self.s)
    for root_artifact in linked_artifacts:
        subtree = await artifact_repo.find_subtree(root_artifact.id)
        for artifact in subtree:
            await self.s.delete(artifact)

    await super().delete_by_id(id)
find_by_root_artifact_id(artifact_id) async

Find the config linked to a root artifact.

Source code in src/chapkit/config/repository.py
async def find_by_root_artifact_id(self, artifact_id: ULID) -> Config | None:
    """Find the config linked to a root artifact."""
    stmt = (
        select(Config)
        .join(ConfigArtifact, Config.id == ConfigArtifact.config_id)
        .where(ConfigArtifact.artifact_id == artifact_id)
    )
    result = await self.s.scalars(stmt)
    return result.one_or_none()
find_artifacts_for_config(config_id) async

Find all root artifacts linked to a config.

Source code in src/chapkit/config/repository.py
async def find_artifacts_for_config(self, config_id: ULID) -> list[Artifact]:
    """Find all root artifacts linked to a config."""
    stmt = (
        select(Artifact)
        .join(ConfigArtifact, Artifact.id == ConfigArtifact.artifact_id)
        .where(ConfigArtifact.config_id == config_id)
    )
    result = await self.s.scalars(stmt)
    return list(result.all())

Manager

manager

Config manager for CRUD operations and artifact linking.

Classes

ConfigManager

Bases: BaseManager[Config, ConfigIn[DataT], ConfigOut[DataT], ULID]

Manager for Config entities with artifact linking operations.

Source code in src/chapkit/config/manager.py
class ConfigManager[DataT: BaseConfig](BaseManager[Config, ConfigIn[DataT], ConfigOut[DataT], ULID]):
    """Manager for Config entities with artifact linking operations."""

    def __init__(self, repo: ConfigRepository, data_cls: type[DataT]) -> None:
        """Initialize config manager with repository and data class."""
        super().__init__(repo, Config, ConfigOut)
        self.repository: ConfigRepository = repo
        self.data_cls = data_cls

    async def find_by_name(self, name: str) -> ConfigOut[DataT] | None:
        """Find a config by its unique name."""
        config = await self.repository.find_by_name(name)
        if config:
            return self._to_output_schema(config)
        return None

    async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
        """Link a config to a root artifact."""
        await self.repository.link_artifact(config_id, artifact_id)
        await self.repository.commit()

    async def unlink_artifact(self, artifact_id: ULID) -> None:
        """Unlink an artifact from its config."""
        await self.repository.unlink_artifact(artifact_id)
        await self.repository.commit()

    async def get_config_for_artifact(
        self, artifact_id: ULID, artifact_repo: ArtifactRepository
    ) -> ConfigOut[DataT] | None:
        """Get the config for an artifact by traversing to its root."""
        root = await artifact_repo.get_root_artifact(artifact_id)
        if root is None:
            return None

        config = await self.repository.find_by_root_artifact_id(root.id)
        if config is None:
            return None

        return self._to_output_schema(config)

    async def get_linked_artifacts(self, config_id: ULID) -> list[ArtifactOut]:
        """Get all root artifacts linked to a config."""
        artifacts = await self.repository.find_artifacts_for_config(config_id)
        return [ArtifactOut.model_validate(artifact, from_attributes=True) for artifact in artifacts]

    def _to_output_schema(self, entity: Config) -> ConfigOut[DataT]:
        """Convert ORM entity to output schema with proper data class validation."""
        return ConfigOut[DataT].model_validate(entity, from_attributes=True, context={"data_cls": self.data_cls})
Functions
__init__(repo, data_cls)

Initialize config manager with repository and data class.

Source code in src/chapkit/config/manager.py
def __init__(self, repo: ConfigRepository, data_cls: type[DataT]) -> None:
    """Initialize config manager with repository and data class."""
    super().__init__(repo, Config, ConfigOut)
    self.repository: ConfigRepository = repo
    self.data_cls = data_cls
find_by_name(name) async

Find a config by its unique name.

Source code in src/chapkit/config/manager.py
async def find_by_name(self, name: str) -> ConfigOut[DataT] | None:
    """Find a config by its unique name."""
    config = await self.repository.find_by_name(name)
    if config:
        return self._to_output_schema(config)
    return None

Link a config to a root artifact.

Source code in src/chapkit/config/manager.py
async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
    """Link a config to a root artifact."""
    await self.repository.link_artifact(config_id, artifact_id)
    await self.repository.commit()

Unlink an artifact from its config.

Source code in src/chapkit/config/manager.py
async def unlink_artifact(self, artifact_id: ULID) -> None:
    """Unlink an artifact from its config."""
    await self.repository.unlink_artifact(artifact_id)
    await self.repository.commit()
get_config_for_artifact(artifact_id, artifact_repo) async

Get the config for an artifact by traversing to its root.

Source code in src/chapkit/config/manager.py
async def get_config_for_artifact(
    self, artifact_id: ULID, artifact_repo: ArtifactRepository
) -> ConfigOut[DataT] | None:
    """Get the config for an artifact by traversing to its root."""
    root = await artifact_repo.get_root_artifact(artifact_id)
    if root is None:
        return None

    config = await self.repository.find_by_root_artifact_id(root.id)
    if config is None:
        return None

    return self._to_output_schema(config)
get_linked_artifacts(config_id) async

Get all root artifacts linked to a config.

Source code in src/chapkit/config/manager.py
async def get_linked_artifacts(self, config_id: ULID) -> list[ArtifactOut]:
    """Get all root artifacts linked to a config."""
    artifacts = await self.repository.find_artifacts_for_config(config_id)
    return [ArtifactOut.model_validate(artifact, from_attributes=True) for artifact in artifacts]

Router

router

Config CRUD router with artifact linking operations.

Classes

ConfigRouter

Bases: CrudRouter[ConfigIn[BaseConfig], ConfigOut[BaseConfig]]

CRUD router for Config entities with artifact linking operations.

Source code in src/chapkit/config/router.py
class ConfigRouter(CrudRouter[ConfigIn[BaseConfig], ConfigOut[BaseConfig]]):
    """CRUD router for Config entities with artifact linking operations."""

    def __init__(
        self,
        prefix: str,
        tags: Sequence[str],
        manager_factory: Any,
        entity_in_type: type[ConfigIn[BaseConfig]],
        entity_out_type: type[ConfigOut[BaseConfig]],
        permissions: CrudPermissions | None = None,
        enable_artifact_operations: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize config router with entity types and manager factory."""
        self.enable_artifact_operations = enable_artifact_operations
        super().__init__(
            prefix=prefix,
            tags=list(tags),
            entity_in_type=entity_in_type,
            entity_out_type=entity_out_type,
            manager_factory=manager_factory,
            permissions=permissions,
            **kwargs,
        )

    def _register_routes(self) -> None:
        """Register config CRUD routes and artifact linking operations."""
        super()._register_routes()

        if not self.enable_artifact_operations:
            return

        manager_factory = self.manager_factory

        async def link_artifact(
            entity_id: str,
            request: LinkArtifactRequest,
            manager: ConfigManager[BaseConfig] = Depends(manager_factory),
        ) -> None:
            config_id = self._parse_ulid(entity_id)

            try:
                await manager.link_artifact(config_id, request.artifact_id)
            except ValueError as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )

        async def unlink_artifact(
            entity_id: str,
            request: UnlinkArtifactRequest,
            manager: ConfigManager[BaseConfig] = Depends(manager_factory),
        ) -> None:
            try:
                await manager.unlink_artifact(request.artifact_id)
            except Exception as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )

        async def get_linked_artifacts(
            entity_id: str,
            manager: ConfigManager[BaseConfig] = Depends(manager_factory),
        ) -> list[ArtifactOut]:
            config_id = self._parse_ulid(entity_id)
            return await manager.get_linked_artifacts(config_id)

        self.register_entity_operation(
            "link-artifact",
            link_artifact,
            http_method="POST",
            status_code=status.HTTP_204_NO_CONTENT,
            summary="Link artifact to config",
            description="Link a config to a root artifact (parent_id IS NULL)",
        )

        self.register_entity_operation(
            "unlink-artifact",
            unlink_artifact,
            http_method="POST",
            status_code=status.HTTP_204_NO_CONTENT,
            summary="Unlink artifact from config",
            description="Remove the link between a config and an artifact",
        )

        self.register_entity_operation(
            "artifacts",
            get_linked_artifacts,
            http_method="GET",
            response_model=list[ArtifactOut],
            summary="Get linked artifacts",
            description="Get all root artifacts linked to this config",
        )
Functions
__init__(prefix, tags, manager_factory, entity_in_type, entity_out_type, permissions=None, enable_artifact_operations=False, **kwargs)

Initialize config router with entity types and manager factory.

Source code in src/chapkit/config/router.py
def __init__(
    self,
    prefix: str,
    tags: Sequence[str],
    manager_factory: Any,
    entity_in_type: type[ConfigIn[BaseConfig]],
    entity_out_type: type[ConfigOut[BaseConfig]],
    permissions: CrudPermissions | None = None,
    enable_artifact_operations: bool = False,
    **kwargs: Any,
) -> None:
    """Initialize config router with entity types and manager factory."""
    self.enable_artifact_operations = enable_artifact_operations
    super().__init__(
        prefix=prefix,
        tags=list(tags),
        entity_in_type=entity_in_type,
        entity_out_type=entity_out_type,
        manager_factory=manager_factory,
        permissions=permissions,
        **kwargs,
    )

ML Module

Train/predict workflows with artifact-based model storage and timing metadata.

Schemas

schemas

Pydantic schemas for ML train/predict operations.

Classes

TrainRequest

Bases: BaseModel

Request schema for training a model.

Source code in src/chapkit/ml/schemas.py
class TrainRequest(BaseModel):
    """Request schema for training a model."""

    config_id: ULID = Field(description="ID of the config to use for training")
    data: PandasDataFrame = Field(description="Training data as pandas DataFrame")
    geo: FeatureCollection | None = Field(default=None, description="Optional geospatial data")

TrainResponse

Bases: BaseModel

Response schema for train operation submission.

Source code in src/chapkit/ml/schemas.py
class TrainResponse(BaseModel):
    """Response schema for train operation submission."""

    job_id: str = Field(description="ID of the training job in the scheduler")
    model_artifact_id: str = Field(description="ID that will contain the trained model artifact")
    message: str = Field(description="Human-readable message")

PredictRequest

Bases: BaseModel

Request schema for making predictions.

Source code in src/chapkit/ml/schemas.py
class PredictRequest(BaseModel):
    """Request schema for making predictions."""

    model_artifact_id: ULID = Field(description="ID of the artifact containing the trained model")
    historic: PandasDataFrame = Field(description="Historic data as pandas DataFrame")
    future: PandasDataFrame = Field(description="Future/prediction data as pandas DataFrame")
    geo: FeatureCollection | None = Field(default=None, description="Optional geospatial data")

PredictResponse

Bases: BaseModel

Response schema for predict operation submission.

Source code in src/chapkit/ml/schemas.py
class PredictResponse(BaseModel):
    """Response schema for predict operation submission."""

    job_id: str = Field(description="ID of the prediction job in the scheduler")
    prediction_artifact_id: str = Field(description="ID that will contain the prediction artifact")
    message: str = Field(description="Human-readable message")

TrainedModelArtifactData

Bases: BaseModel

Schema for trained model artifact data stored in the artifact system.

Source code in src/chapkit/ml/schemas.py
class TrainedModelArtifactData(BaseModel):
    """Schema for trained model artifact data stored in the artifact system."""

    ml_type: Literal["trained_model"] = Field(description="Artifact type identifier")
    config_id: str = Field(description="ID of the config used for training")
    started_at: str = Field(description="ISO format timestamp when operation started")
    completed_at: str = Field(description="ISO format timestamp when operation completed")
    duration_seconds: float = Field(description="Operation duration in seconds (rounded to 2 decimals)")
    model: Any = Field(description="The trained model object (must be pickleable)")
    model_type: str | None = Field(default=None, description="Fully qualified class name of the model")
    model_size_bytes: int | None = Field(default=None, description="Serialized pickle size of the model in bytes")

    model_config = {"arbitrary_types_allowed": True}

PredictionArtifactData

Bases: BaseModel

Schema for prediction artifact data stored in the artifact system.

Source code in src/chapkit/ml/schemas.py
class PredictionArtifactData(BaseModel):
    """Schema for prediction artifact data stored in the artifact system."""

    ml_type: Literal["prediction"] = Field(description="Artifact type identifier")
    config_id: str = Field(description="ID of the config used for prediction")
    model_artifact_id: str = Field(description="ID of the trained model artifact used for prediction")
    started_at: str = Field(description="ISO format timestamp when operation started")
    completed_at: str = Field(description="ISO format timestamp when operation completed")
    duration_seconds: float = Field(description="Operation duration in seconds (rounded to 2 decimals)")
    predictions: PandasDataFrame = Field(description="Prediction results as structured DataFrame")

ModelRunnerProtocol

Bases: Protocol

Protocol defining the interface for model runners.

Source code in src/chapkit/ml/schemas.py
class ModelRunnerProtocol(Protocol):
    """Protocol defining the interface for model runners."""

    async def on_train(
        self,
        config: BaseConfig,
        data: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model and return the trained model object (must be pickleable)."""
        ...

    async def on_predict(
        self,
        config: BaseConfig,
        model: Any,
        historic: pd.DataFrame,
        future: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> pd.DataFrame:
        """Make predictions using a trained model and return predictions as DataFrame."""
        ...
Functions
on_train(config, data, geo=None) async

Train a model and return the trained model object (must be pickleable).

Source code in src/chapkit/ml/schemas.py
async def on_train(
    self,
    config: BaseConfig,
    data: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model and return the trained model object (must be pickleable)."""
    ...
on_predict(config, model, historic, future, geo=None) async

Make predictions using a trained model and return predictions as DataFrame.

Source code in src/chapkit/ml/schemas.py
async def on_predict(
    self,
    config: BaseConfig,
    model: Any,
    historic: pd.DataFrame,
    future: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> pd.DataFrame:
    """Make predictions using a trained model and return predictions as DataFrame."""
    ...

Manager

manager

Manager for ML train/predict operations with artifact-based storage.

Classes

MLManager

Manager for ML train/predict operations with job scheduling and artifact storage.

Source code in src/chapkit/ml/manager.py
class MLManager:
    """Manager for ML train/predict operations with job scheduling and artifact storage."""

    def __init__(
        self,
        runner: ModelRunnerProtocol,
        scheduler: ChapkitJobScheduler,
        database: Database,
        config_schema: type[BaseConfig],
    ) -> None:
        """Initialize ML manager with runner, scheduler, database, and config schema."""
        self.runner = runner
        self.scheduler = scheduler
        self.database = database
        self.config_schema = config_schema

    async def execute_train(self, request: TrainRequest) -> TrainResponse:
        """Submit a training job to the scheduler and return job/artifact IDs."""
        # Pre-allocate artifact ID for the trained model
        model_artifact_id = ULID()

        # Submit job to scheduler
        job_id = await self.scheduler.add_job(
            self._train_task,
            request,
            model_artifact_id,
        )

        return TrainResponse(
            job_id=str(job_id),
            model_artifact_id=str(model_artifact_id),
            message=f"Training job submitted. Job ID: {job_id}",
        )

    async def execute_predict(self, request: PredictRequest) -> PredictResponse:
        """Submit a prediction job to the scheduler and return job/artifact IDs."""
        # Pre-allocate artifact ID for predictions
        prediction_artifact_id = ULID()

        # Submit job to scheduler
        job_id = await self.scheduler.add_job(
            self._predict_task,
            request,
            prediction_artifact_id,
        )

        return PredictResponse(
            job_id=str(job_id),
            prediction_artifact_id=str(prediction_artifact_id),
            message=f"Prediction job submitted. Job ID: {job_id}",
        )

    async def _train_task(self, request: TrainRequest, model_artifact_id: ULID) -> ULID:
        """Execute training task and store trained model in artifact."""
        # Load config
        async with self.database.session() as session:
            config_repo = ConfigRepository(session)
            config_manager: ConfigManager[BaseConfig] = ConfigManager(config_repo, self.config_schema)
            config = await config_manager.find_by_id(request.config_id)

            if config is None:
                raise ValueError(f"Config {request.config_id} not found")

        # Convert PandasDataFrame to pandas
        data_df = request.data.to_dataframe()

        # Train model with timing
        training_started_at = datetime.datetime.now(datetime.UTC)
        trained_model = await self.runner.on_train(
            config=config.data,
            data=data_df,
            geo=request.geo,
        )
        training_completed_at = datetime.datetime.now(datetime.UTC)
        training_duration = (training_completed_at - training_started_at).total_seconds()

        # Calculate model metrics
        model_type = _extract_model_type(trained_model)
        model_size_bytes = _calculate_model_size(trained_model)

        # Store trained model in artifact with metadata
        async with self.database.session() as session:
            artifact_repo = ArtifactRepository(session)
            artifact_manager = ArtifactManager(artifact_repo)
            config_repo = ConfigRepository(session)

            # Create and validate artifact data with Pydantic
            artifact_data_model = TrainedModelArtifactData(
                ml_type="trained_model",
                config_id=str(request.config_id),
                model=trained_model,
                started_at=training_started_at.isoformat(),
                completed_at=training_completed_at.isoformat(),
                duration_seconds=round(training_duration, 2),
                model_type=model_type,
                model_size_bytes=model_size_bytes,
            )

            await artifact_manager.save(
                ArtifactIn(
                    id=model_artifact_id,
                    data=artifact_data_model.model_dump(),
                    parent_id=None,
                    level=0,
                )
            )

            # Link config to root artifact for tree traversal
            await config_repo.link_artifact(request.config_id, model_artifact_id)
            await config_repo.commit()

        return model_artifact_id

    async def _predict_task(self, request: PredictRequest, prediction_artifact_id: ULID) -> ULID:
        """Execute prediction task and store predictions in artifact."""
        # Load model artifact
        async with self.database.session() as session:
            artifact_repo = ArtifactRepository(session)
            artifact_manager = ArtifactManager(artifact_repo)
            model_artifact = await artifact_manager.find_by_id(request.model_artifact_id)

            if model_artifact is None:
                raise ValueError(f"Model artifact {request.model_artifact_id} not found")

        # Extract model and config_id from artifact
        model_data = model_artifact.data
        if not isinstance(model_data, dict) or model_data.get("ml_type") != "trained_model":
            raise ValueError(f"Artifact {request.model_artifact_id} is not a trained model")

        trained_model = model_data["model"]
        config_id = ULID.from_str(model_data["config_id"])

        # Load config
        async with self.database.session() as session:
            config_repo = ConfigRepository(session)
            config_manager: ConfigManager[BaseConfig] = ConfigManager(config_repo, self.config_schema)
            config = await config_manager.find_by_id(config_id)

            if config is None:
                raise ValueError(f"Config {config_id} not found")

        # Convert PandasDataFrames to pandas
        historic_df = request.historic.to_dataframe()
        future_df = request.future.to_dataframe()

        # Make predictions with timing
        prediction_started_at = datetime.datetime.now(datetime.UTC)
        predictions_df = await self.runner.on_predict(
            config=config.data,
            model=trained_model,
            historic=historic_df,
            future=future_df,
            geo=request.geo,
        )
        prediction_completed_at = datetime.datetime.now(datetime.UTC)
        prediction_duration = (prediction_completed_at - prediction_started_at).total_seconds()

        # Store predictions in artifact with parent linkage
        async with self.database.session() as session:
            artifact_repo = ArtifactRepository(session)
            artifact_manager = ArtifactManager(artifact_repo)

            from chapkit.artifact.schemas import PandasDataFrame

            # Create and validate artifact data with Pydantic
            artifact_data_model = PredictionArtifactData(
                ml_type="prediction",
                model_artifact_id=str(request.model_artifact_id),
                config_id=str(config_id),
                predictions=PandasDataFrame.from_dataframe(predictions_df),
                started_at=prediction_started_at.isoformat(),
                completed_at=prediction_completed_at.isoformat(),
                duration_seconds=round(prediction_duration, 2),
            )

            await artifact_manager.save(
                ArtifactIn(
                    id=prediction_artifact_id,
                    data=artifact_data_model.model_dump(),
                    parent_id=request.model_artifact_id,
                    level=1,
                )
            )

        return prediction_artifact_id
Functions
__init__(runner, scheduler, database, config_schema)

Initialize ML manager with runner, scheduler, database, and config schema.

Source code in src/chapkit/ml/manager.py
def __init__(
    self,
    runner: ModelRunnerProtocol,
    scheduler: ChapkitJobScheduler,
    database: Database,
    config_schema: type[BaseConfig],
) -> None:
    """Initialize ML manager with runner, scheduler, database, and config schema."""
    self.runner = runner
    self.scheduler = scheduler
    self.database = database
    self.config_schema = config_schema
execute_train(request) async

Submit a training job to the scheduler and return job/artifact IDs.

Source code in src/chapkit/ml/manager.py
async def execute_train(self, request: TrainRequest) -> TrainResponse:
    """Submit a training job to the scheduler and return job/artifact IDs."""
    # Pre-allocate artifact ID for the trained model
    model_artifact_id = ULID()

    # Submit job to scheduler
    job_id = await self.scheduler.add_job(
        self._train_task,
        request,
        model_artifact_id,
    )

    return TrainResponse(
        job_id=str(job_id),
        model_artifact_id=str(model_artifact_id),
        message=f"Training job submitted. Job ID: {job_id}",
    )
execute_predict(request) async

Submit a prediction job to the scheduler and return job/artifact IDs.

Source code in src/chapkit/ml/manager.py
async def execute_predict(self, request: PredictRequest) -> PredictResponse:
    """Submit a prediction job to the scheduler and return job/artifact IDs."""
    # Pre-allocate artifact ID for predictions
    prediction_artifact_id = ULID()

    # Submit job to scheduler
    job_id = await self.scheduler.add_job(
        self._predict_task,
        request,
        prediction_artifact_id,
    )

    return PredictResponse(
        job_id=str(job_id),
        prediction_artifact_id=str(prediction_artifact_id),
        message=f"Prediction job submitted. Job ID: {job_id}",
    )

Router

router

REST API router for ML train/predict operations.

Classes

MLRouter

Bases: Router

Router with $train and $predict collection operations.

Source code in src/chapkit/ml/router.py
class MLRouter(Router):
    """Router with $train and $predict collection operations."""

    def __init__(
        self,
        prefix: str,
        tags: list[str],
        manager_factory: Any,
        **kwargs: Any,
    ) -> None:
        """Initialize ML router with manager factory."""
        self.manager_factory = manager_factory
        super().__init__(prefix=prefix, tags=tags, **kwargs)

    def _register_routes(self) -> None:
        """Register ML train and predict routes."""
        from fastapi import HTTPException

        manager_factory = self.manager_factory

        @self.router.post(
            "/$train",
            response_model=TrainResponse,
            status_code=status.HTTP_202_ACCEPTED,
            summary="Train model",
            description="Submit a training job to the scheduler",
        )
        async def train(
            request: TrainRequest,
            manager: MLManager = Depends(manager_factory),
        ) -> TrainResponse:
            """Train a model asynchronously and return job/artifact IDs."""
            try:
                response = await manager.execute_train(request)
                train_counter, _ = _get_counters()
                train_counter.add(1)
                return response
            except ValueError as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )
            except RuntimeError as e:
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail=str(e),
                )

        @self.router.post(
            "/$predict",
            response_model=PredictResponse,
            status_code=status.HTTP_202_ACCEPTED,
            summary="Make predictions",
            description="Submit a prediction job to the scheduler",
        )
        async def predict(
            request: PredictRequest,
            manager: MLManager = Depends(manager_factory),
        ) -> PredictResponse:
            """Make predictions asynchronously and return job/artifact IDs."""
            try:
                response = await manager.execute_predict(request)
                _, predict_counter = _get_counters()
                predict_counter.add(1)
                return response
            except ValueError as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )
            except RuntimeError as e:
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail=str(e),
                )
Functions
__init__(prefix, tags, manager_factory, **kwargs)

Initialize ML router with manager factory.

Source code in src/chapkit/ml/router.py
def __init__(
    self,
    prefix: str,
    tags: list[str],
    manager_factory: Any,
    **kwargs: Any,
) -> None:
    """Initialize ML router with manager factory."""
    self.manager_factory = manager_factory
    super().__init__(prefix=prefix, tags=tags, **kwargs)

Model Runners

Protocol and implementations for ML model training and prediction.

BaseModelRunner

BaseModelRunner

Bases: ABC

Abstract base class for model runners with lifecycle hooks.

Source code in src/chapkit/ml/runner.py
class BaseModelRunner(ABC):
    """Abstract base class for model runners with lifecycle hooks."""

    async def on_init(self) -> None:
        """Optional initialization hook called before training or prediction."""
        pass

    async def on_cleanup(self) -> None:
        """Optional cleanup hook called after training or prediction."""
        pass

    @abstractmethod
    async def on_train(
        self,
        config: BaseConfig,
        data: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model and return the trained model object (must be pickleable)."""
        ...

    @abstractmethod
    async def on_predict(
        self,
        config: BaseConfig,
        model: Any,
        historic: pd.DataFrame,
        future: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> pd.DataFrame:
        """Make predictions using a trained model and return predictions as DataFrame."""
        ...

Functions

on_init() async

Optional initialization hook called before training or prediction.

Source code in src/chapkit/ml/runner.py
async def on_init(self) -> None:
    """Optional initialization hook called before training or prediction."""
    pass

on_cleanup() async

Optional cleanup hook called after training or prediction.

Source code in src/chapkit/ml/runner.py
async def on_cleanup(self) -> None:
    """Optional cleanup hook called after training or prediction."""
    pass

on_train(config, data, geo=None) abstractmethod async

Train a model and return the trained model object (must be pickleable).

Source code in src/chapkit/ml/runner.py
@abstractmethod
async def on_train(
    self,
    config: BaseConfig,
    data: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model and return the trained model object (must be pickleable)."""
    ...

on_predict(config, model, historic, future, geo=None) abstractmethod async

Make predictions using a trained model and return predictions as DataFrame.

Source code in src/chapkit/ml/runner.py
@abstractmethod
async def on_predict(
    self,
    config: BaseConfig,
    model: Any,
    historic: pd.DataFrame,
    future: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> pd.DataFrame:
    """Make predictions using a trained model and return predictions as DataFrame."""
    ...

FunctionalModelRunner

FunctionalModelRunner

Bases: BaseModelRunner, Generic[ConfigT]

Functional model runner wrapping train and predict functions.

Source code in src/chapkit/ml/runner.py
class FunctionalModelRunner(BaseModelRunner, Generic[ConfigT]):
    """Functional model runner wrapping train and predict functions."""

    def __init__(
        self,
        on_train: TrainFunction[ConfigT],
        on_predict: PredictFunction[ConfigT],
    ) -> None:
        """Initialize functional runner with train and predict functions."""
        self._on_train = on_train
        self._on_predict = on_predict

    async def on_train(
        self,
        config: BaseConfig,
        data: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model and return the trained model object."""
        return await self._on_train(config, data, geo)  # type: ignore[arg-type]

    async def on_predict(
        self,
        config: BaseConfig,
        model: Any,
        historic: pd.DataFrame,
        future: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> pd.DataFrame:
        """Make predictions using a trained model."""
        return await self._on_predict(config, model, historic, future, geo)  # type: ignore[arg-type]

Functions

__init__(on_train, on_predict)

Initialize functional runner with train and predict functions.

Source code in src/chapkit/ml/runner.py
def __init__(
    self,
    on_train: TrainFunction[ConfigT],
    on_predict: PredictFunction[ConfigT],
) -> None:
    """Initialize functional runner with train and predict functions."""
    self._on_train = on_train
    self._on_predict = on_predict

on_train(config, data, geo=None) async

Train a model and return the trained model object.

Source code in src/chapkit/ml/runner.py
async def on_train(
    self,
    config: BaseConfig,
    data: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model and return the trained model object."""
    return await self._on_train(config, data, geo)  # type: ignore[arg-type]

on_predict(config, model, historic, future, geo=None) async

Make predictions using a trained model.

Source code in src/chapkit/ml/runner.py
async def on_predict(
    self,
    config: BaseConfig,
    model: Any,
    historic: pd.DataFrame,
    future: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> pd.DataFrame:
    """Make predictions using a trained model."""
    return await self._on_predict(config, model, historic, future, geo)  # type: ignore[arg-type]

ShellModelRunner

ShellModelRunner

Bases: BaseModelRunner

Shell-based model runner that executes external scripts for train/predict operations.

Source code in src/chapkit/ml/runner.py
class ShellModelRunner(BaseModelRunner):
    """Shell-based model runner that executes external scripts for train/predict operations."""

    def __init__(
        self,
        train_command: str,
        predict_command: str,
        model_format: str = "pickle",
    ) -> None:
        """Initialize shell runner with command templates for train/predict operations."""
        self.train_command = train_command
        self.predict_command = predict_command
        self.model_format = model_format

    async def on_train(
        self,
        config: BaseConfig,
        data: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model by executing external training script."""
        temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_train_"))

        try:
            # Write config to JSON file
            config_file = temp_dir / "config.json"
            config_file.write_text(json.dumps(config.model_dump(), indent=2))

            # Write training data to CSV
            data_file = temp_dir / "data.csv"
            data.to_csv(data_file, index=False)

            # Write geo data if provided
            geo_file = temp_dir / "geo.json" if geo else None
            if geo:
                assert geo_file is not None  # For type checker
                geo_file.write_text(geo.model_dump_json(indent=2))

            # Model file path
            model_file = temp_dir / f"model.{self.model_format}"

            # Substitute variables in command
            command = self.train_command.format(
                config_file=str(config_file),
                data_file=str(data_file),
                model_file=str(model_file),
                geo_file=str(geo_file) if geo_file else "",
            )

            logger.info("executing_train_script", command=command, temp_dir=str(temp_dir))

            # Execute subprocess
            process = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=str(temp_dir),
            )

            stdout_bytes, stderr_bytes = await process.communicate()
            stdout = stdout_bytes.decode("utf-8") if stdout_bytes else ""
            stderr = stderr_bytes.decode("utf-8") if stderr_bytes else ""

            if process.returncode != 0:
                logger.error("train_script_failed", exit_code=process.returncode, stderr=stderr)
                raise RuntimeError(f"Training script failed with exit code {process.returncode}: {stderr}")

            logger.info("train_script_completed", stdout=stdout[:500], stderr=stderr[:500])

            # Load trained model from file
            if not model_file.exists():
                raise RuntimeError(f"Training script did not create model file at {model_file}")

            with open(model_file, "rb") as f:
                model = pickle.load(f)

            return model

        finally:
            # Cleanup temp files
            import shutil

            shutil.rmtree(temp_dir, ignore_errors=True)

    async def on_predict(
        self,
        config: BaseConfig,
        model: Any,
        historic: pd.DataFrame,
        future: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> pd.DataFrame:
        """Make predictions by executing external prediction script."""
        temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_predict_"))

        try:
            # Write config to JSON file
            config_file = temp_dir / "config.json"
            config_file.write_text(json.dumps(config.model_dump(), indent=2))

            # Write model to file
            model_file = temp_dir / f"model.{self.model_format}"
            with open(model_file, "wb") as f:
                pickle.dump(model, f)

            # Write historic data
            historic_file = temp_dir / "historic.csv"
            historic.to_csv(historic_file, index=False)

            # Write future data to CSV
            future_file = temp_dir / "future.csv"
            future.to_csv(future_file, index=False)

            # Write geo data if provided
            geo_file = temp_dir / "geo.json" if geo else None
            if geo:
                assert geo_file is not None  # For type checker
                geo_file.write_text(geo.model_dump_json(indent=2))

            # Output file path
            output_file = temp_dir / "predictions.csv"

            # Substitute variables in command
            command = self.predict_command.format(
                config_file=str(config_file),
                model_file=str(model_file),
                historic_file=str(historic_file),
                future_file=str(future_file),
                output_file=str(output_file),
                geo_file=str(geo_file) if geo_file else "",
            )

            logger.info("executing_predict_script", command=command, temp_dir=str(temp_dir))

            # Execute subprocess
            process = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=str(temp_dir),
            )

            stdout_bytes, stderr_bytes = await process.communicate()
            stdout = stdout_bytes.decode("utf-8") if stdout_bytes else ""
            stderr = stderr_bytes.decode("utf-8") if stderr_bytes else ""

            if process.returncode != 0:
                logger.error("predict_script_failed", exit_code=process.returncode, stderr=stderr)
                raise RuntimeError(f"Prediction script failed with exit code {process.returncode}: {stderr}")

            logger.info("predict_script_completed", stdout=stdout[:500], stderr=stderr[:500])

            # Load predictions from file
            if not output_file.exists():
                raise RuntimeError(f"Prediction script did not create output file at {output_file}")

            predictions = pd.read_csv(output_file)
            return predictions

        finally:
            # Cleanup temp files
            import shutil

            shutil.rmtree(temp_dir, ignore_errors=True)

Functions

__init__(train_command, predict_command, model_format='pickle')

Initialize shell runner with command templates for train/predict operations.

Source code in src/chapkit/ml/runner.py
def __init__(
    self,
    train_command: str,
    predict_command: str,
    model_format: str = "pickle",
) -> None:
    """Initialize shell runner with command templates for train/predict operations."""
    self.train_command = train_command
    self.predict_command = predict_command
    self.model_format = model_format

on_train(config, data, geo=None) async

Train a model by executing external training script.

Source code in src/chapkit/ml/runner.py
async def on_train(
    self,
    config: BaseConfig,
    data: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model by executing external training script."""
    temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_train_"))

    try:
        # Write config to JSON file
        config_file = temp_dir / "config.json"
        config_file.write_text(json.dumps(config.model_dump(), indent=2))

        # Write training data to CSV
        data_file = temp_dir / "data.csv"
        data.to_csv(data_file, index=False)

        # Write geo data if provided
        geo_file = temp_dir / "geo.json" if geo else None
        if geo:
            assert geo_file is not None  # For type checker
            geo_file.write_text(geo.model_dump_json(indent=2))

        # Model file path
        model_file = temp_dir / f"model.{self.model_format}"

        # Substitute variables in command
        command = self.train_command.format(
            config_file=str(config_file),
            data_file=str(data_file),
            model_file=str(model_file),
            geo_file=str(geo_file) if geo_file else "",
        )

        logger.info("executing_train_script", command=command, temp_dir=str(temp_dir))

        # Execute subprocess
        process = await asyncio.create_subprocess_shell(
            command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=str(temp_dir),
        )

        stdout_bytes, stderr_bytes = await process.communicate()
        stdout = stdout_bytes.decode("utf-8") if stdout_bytes else ""
        stderr = stderr_bytes.decode("utf-8") if stderr_bytes else ""

        if process.returncode != 0:
            logger.error("train_script_failed", exit_code=process.returncode, stderr=stderr)
            raise RuntimeError(f"Training script failed with exit code {process.returncode}: {stderr}")

        logger.info("train_script_completed", stdout=stdout[:500], stderr=stderr[:500])

        # Load trained model from file
        if not model_file.exists():
            raise RuntimeError(f"Training script did not create model file at {model_file}")

        with open(model_file, "rb") as f:
            model = pickle.load(f)

        return model

    finally:
        # Cleanup temp files
        import shutil

        shutil.rmtree(temp_dir, ignore_errors=True)

on_predict(config, model, historic, future, geo=None) async

Make predictions by executing external prediction script.

Source code in src/chapkit/ml/runner.py
async def on_predict(
    self,
    config: BaseConfig,
    model: Any,
    historic: pd.DataFrame,
    future: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> pd.DataFrame:
    """Make predictions by executing external prediction script."""
    temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_predict_"))

    try:
        # Write config to JSON file
        config_file = temp_dir / "config.json"
        config_file.write_text(json.dumps(config.model_dump(), indent=2))

        # Write model to file
        model_file = temp_dir / f"model.{self.model_format}"
        with open(model_file, "wb") as f:
            pickle.dump(model, f)

        # Write historic data
        historic_file = temp_dir / "historic.csv"
        historic.to_csv(historic_file, index=False)

        # Write future data to CSV
        future_file = temp_dir / "future.csv"
        future.to_csv(future_file, index=False)

        # Write geo data if provided
        geo_file = temp_dir / "geo.json" if geo else None
        if geo:
            assert geo_file is not None  # For type checker
            geo_file.write_text(geo.model_dump_json(indent=2))

        # Output file path
        output_file = temp_dir / "predictions.csv"

        # Substitute variables in command
        command = self.predict_command.format(
            config_file=str(config_file),
            model_file=str(model_file),
            historic_file=str(historic_file),
            future_file=str(future_file),
            output_file=str(output_file),
            geo_file=str(geo_file) if geo_file else "",
        )

        logger.info("executing_predict_script", command=command, temp_dir=str(temp_dir))

        # Execute subprocess
        process = await asyncio.create_subprocess_shell(
            command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=str(temp_dir),
        )

        stdout_bytes, stderr_bytes = await process.communicate()
        stdout = stdout_bytes.decode("utf-8") if stdout_bytes else ""
        stderr = stderr_bytes.decode("utf-8") if stderr_bytes else ""

        if process.returncode != 0:
            logger.error("predict_script_failed", exit_code=process.returncode, stderr=stderr)
            raise RuntimeError(f"Prediction script failed with exit code {process.returncode}: {stderr}")

        logger.info("predict_script_completed", stdout=stdout[:500], stderr=stderr[:500])

        # Load predictions from file
        if not output_file.exists():
            raise RuntimeError(f"Prediction script did not create output file at {output_file}")

        predictions = pd.read_csv(output_file)
        return predictions

    finally:
        # Cleanup temp files
        import shutil

        shutil.rmtree(temp_dir, ignore_errors=True)

Scheduler

Chapkit job scheduler with artifact tracking for ML/task workflows.

scheduler

Chapkit-specific job scheduler with artifact tracking.

Classes

ChapkitJobRecord

Bases: JobRecord

Job record extended with artifact_id tracking for ML/task workflows.

Source code in src/chapkit/scheduler.py
class ChapkitJobRecord(JobRecord):
    """Job record extended with artifact_id tracking for ML/task workflows."""

    artifact_id: ULID | None = Field(default=None, description="ID of artifact created by job (if job returns a ULID)")

ChapkitJobScheduler

Bases: AIOJobScheduler

Chapkit job scheduler with automatic artifact tracking for jobs that return ULIDs.

Source code in src/chapkit/scheduler.py
class ChapkitJobScheduler(AIOJobScheduler):
    """Chapkit job scheduler with automatic artifact tracking for jobs that return ULIDs."""

    # Override with compatible type - dict is invariant, but we handle both types correctly
    _records: dict[ULID, JobRecord] = PrivateAttr(default_factory=dict)

    async def add_job(
        self,
        target: JobTarget,
        /,
        *args: Any,
        **kwargs: Any,
    ) -> ULID:
        """Add a job to the scheduler and return its ID."""
        now = datetime.now(timezone.utc)
        jid = ULID()

        record = ChapkitJobRecord(
            id=jid,
            status=JobStatus.pending,
            submitted_at=now,
        )

        async with self._lock:
            if jid in self._tasks:
                raise RuntimeError(f"Job {jid!r} already scheduled")
            self._records[jid] = record

        async def _execute_target() -> Any:
            if inspect.isawaitable(target):
                if args or kwargs:
                    # Close the coroutine to avoid "coroutine was never awaited" warning
                    if inspect.iscoroutine(target):
                        target.close()
                    raise TypeError("Args/kwargs not supported when target is an awaitable object.")
                return await target
            if inspect.iscoroutinefunction(target):
                return await target(*args, **kwargs)
            return await asyncio.to_thread(target, *args, **kwargs)

        async def _runner() -> Any:
            if self._sema:
                async with self._sema:
                    return await self._run_with_state(jid, _execute_target)
            else:
                return await self._run_with_state(jid, _execute_target)

        task = asyncio.create_task(_runner(), name=f"{self.name}-job-{jid}")

        def _drain(t: asyncio.Task[Any]) -> None:
            try:
                t.result()
            except Exception:
                pass

        task.add_done_callback(_drain)

        async with self._lock:
            self._tasks[jid] = task

        return jid

    async def get_record(self, job_id: ULID) -> ChapkitJobRecord:
        """Get complete job record with artifact_id if available."""
        async with self._lock:
            if job_id not in self._records:
                raise KeyError(f"Job {job_id} not found")
            rec = self._records[job_id]
            # Records are created as ChapkitJobRecord, cast is safe
            return rec  # type: ignore[return-value]

    async def list_records(
        self, *, status_filter: JobStatus | None = None, reverse: bool = False
    ) -> list[ChapkitJobRecord]:
        """List all job records with optional status filtering."""
        async with self._lock:
            records = list(self._records.values())
            if status_filter:
                records = [r for r in records if r.status == status_filter]
            if reverse:
                records = list(reversed(records))
            # Records are created as ChapkitJobRecord, cast is safe
            return records  # type: ignore[return-value]

    async def _run_with_state(
        self,
        jid: ULID,
        exec_fn: JobExecutor,
    ) -> Any:
        """Execute job function and track artifact_id if result is a ULID."""
        async with self._lock:
            rec = self._records[jid]
            rec.status = JobStatus.running
            rec.started_at = datetime.now(timezone.utc)

        try:
            result = await exec_fn()

            # Track artifact_id if job returns a ULID
            artifact_id: ULID | None = result if isinstance(result, ULID) else None

            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.completed
                rec.finished_at = datetime.now(timezone.utc)
                rec.artifact_id = artifact_id
                self._results[jid] = result

            return result

        except asyncio.CancelledError:
            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.canceled
                rec.finished_at = datetime.now(timezone.utc)

            raise

        except Exception as e:
            tb = traceback.format_exc()
            # Extract clean error message (exception type and message only)
            error_lines = tb.strip().split("\n")
            clean_error = error_lines[-1] if error_lines else str(e)

            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.failed
                rec.finished_at = datetime.now(timezone.utc)
                rec.error = clean_error
                rec.error_traceback = tb

            raise
Functions
add_job(target, /, *args, **kwargs) async

Add a job to the scheduler and return its ID.

Source code in src/chapkit/scheduler.py
async def add_job(
    self,
    target: JobTarget,
    /,
    *args: Any,
    **kwargs: Any,
) -> ULID:
    """Add a job to the scheduler and return its ID."""
    now = datetime.now(timezone.utc)
    jid = ULID()

    record = ChapkitJobRecord(
        id=jid,
        status=JobStatus.pending,
        submitted_at=now,
    )

    async with self._lock:
        if jid in self._tasks:
            raise RuntimeError(f"Job {jid!r} already scheduled")
        self._records[jid] = record

    async def _execute_target() -> Any:
        if inspect.isawaitable(target):
            if args or kwargs:
                # Close the coroutine to avoid "coroutine was never awaited" warning
                if inspect.iscoroutine(target):
                    target.close()
                raise TypeError("Args/kwargs not supported when target is an awaitable object.")
            return await target
        if inspect.iscoroutinefunction(target):
            return await target(*args, **kwargs)
        return await asyncio.to_thread(target, *args, **kwargs)

    async def _runner() -> Any:
        if self._sema:
            async with self._sema:
                return await self._run_with_state(jid, _execute_target)
        else:
            return await self._run_with_state(jid, _execute_target)

    task = asyncio.create_task(_runner(), name=f"{self.name}-job-{jid}")

    def _drain(t: asyncio.Task[Any]) -> None:
        try:
            t.result()
        except Exception:
            pass

    task.add_done_callback(_drain)

    async with self._lock:
        self._tasks[jid] = task

    return jid
get_record(job_id) async

Get complete job record with artifact_id if available.

Source code in src/chapkit/scheduler.py
async def get_record(self, job_id: ULID) -> ChapkitJobRecord:
    """Get complete job record with artifact_id if available."""
    async with self._lock:
        if job_id not in self._records:
            raise KeyError(f"Job {job_id} not found")
        rec = self._records[job_id]
        # Records are created as ChapkitJobRecord, cast is safe
        return rec  # type: ignore[return-value]
list_records(*, status_filter=None, reverse=False) async

List all job records with optional status filtering.

Source code in src/chapkit/scheduler.py
async def list_records(
    self, *, status_filter: JobStatus | None = None, reverse: bool = False
) -> list[ChapkitJobRecord]:
    """List all job records with optional status filtering."""
    async with self._lock:
        records = list(self._records.values())
        if status_filter:
            records = [r for r in records if r.status == status_filter]
        if reverse:
            records = list(reversed(records))
        # Records are created as ChapkitJobRecord, cast is safe
        return records  # type: ignore[return-value]

API Layer

FastAPI-specific components built on servicekit.

Dependencies

FastAPI dependency injection functions.

dependencies

Feature-specific FastAPI dependency injection for managers.

Classes

Functions

get_config_manager(session) async

Get a config manager instance for dependency injection.

Source code in src/chapkit/api/dependencies.py
async def get_config_manager(session: Annotated[AsyncSession, Depends(get_session)]) -> ConfigManager[BaseConfig]:
    """Get a config manager instance for dependency injection."""
    repo = ConfigRepository(session)
    return ConfigManager[BaseConfig](repo, BaseConfig)

get_artifact_manager(session) async

Get an artifact manager instance for dependency injection.

Source code in src/chapkit/api/dependencies.py
async def get_artifact_manager(session: Annotated[AsyncSession, Depends(get_session)]) -> ArtifactManager:
    """Get an artifact manager instance for dependency injection."""
    artifact_repo = ArtifactRepository(session)
    return ArtifactManager(artifact_repo)

get_ml_manager() async

Get an ML manager instance for dependency injection.

Note: This is a placeholder. The actual dependency is built by ServiceBuilder with the runner in closure, then overridden via app.dependency_overrides.

Source code in src/chapkit/api/dependencies.py
async def get_ml_manager() -> MLManager:
    """Get an ML manager instance for dependency injection.

    Note: This is a placeholder. The actual dependency is built by ServiceBuilder
    with the runner in closure, then overridden via app.dependency_overrides.
    """
    raise RuntimeError("ML manager dependency not configured. Use ServiceBuilder.with_ml() to enable ML operations.")

Alembic Helpers

Reusable migration helpers for chapkit database tables.

alembic_helpers

Reusable Alembic migration helpers for chapkit tables.

Functions

create_artifacts_table(op)

Create artifacts table for hierarchical artifact storage.

Source code in src/chapkit/alembic_helpers.py
def create_artifacts_table(op: Any) -> None:
    """Create artifacts table for hierarchical artifact storage."""
    op.create_table(
        "artifacts",
        sa.Column("parent_id", servicekit.types.ULIDType(length=26), nullable=True),
        sa.Column("data", sa.PickleType(), nullable=False),
        sa.Column("level", sa.Integer(), nullable=False),
        sa.Column("id", servicekit.types.ULIDType(length=26), nullable=False),
        sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.Column("updated_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.ForeignKeyConstraint(["parent_id"], ["artifacts.id"], ondelete="SET NULL"),
        sa.PrimaryKeyConstraint("id"),
    )
    op.create_index(op.f("ix_artifacts_level"), "artifacts", ["level"], unique=False)
    op.create_index(op.f("ix_artifacts_parent_id"), "artifacts", ["parent_id"], unique=False)

drop_artifacts_table(op)

Drop artifacts table.

Source code in src/chapkit/alembic_helpers.py
def drop_artifacts_table(op: Any) -> None:
    """Drop artifacts table."""
    op.drop_index(op.f("ix_artifacts_parent_id"), table_name="artifacts")
    op.drop_index(op.f("ix_artifacts_level"), table_name="artifacts")
    op.drop_table("artifacts")

create_configs_table(op)

Create configs table for configuration storage.

Source code in src/chapkit/alembic_helpers.py
def create_configs_table(op: Any) -> None:
    """Create configs table for configuration storage."""
    op.create_table(
        "configs",
        sa.Column("name", sa.String(), nullable=False),
        sa.Column("data", sa.JSON(), nullable=False),
        sa.Column("id", servicekit.types.ULIDType(length=26), nullable=False),
        sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.Column("updated_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.PrimaryKeyConstraint("id"),
    )
    op.create_index(op.f("ix_configs_name"), "configs", ["name"], unique=False)

drop_configs_table(op)

Drop configs table.

Source code in src/chapkit/alembic_helpers.py
def drop_configs_table(op: Any) -> None:
    """Drop configs table."""
    op.drop_index(op.f("ix_configs_name"), table_name="configs")
    op.drop_table("configs")

create_config_artifacts_table(op)

Create config_artifacts junction table linking configs to artifacts.

Source code in src/chapkit/alembic_helpers.py
def create_config_artifacts_table(op: Any) -> None:
    """Create config_artifacts junction table linking configs to artifacts."""
    op.create_table(
        "config_artifacts",
        sa.Column("config_id", servicekit.types.ULIDType(length=26), nullable=False),
        sa.Column("artifact_id", servicekit.types.ULIDType(length=26), nullable=False),
        sa.ForeignKeyConstraint(["artifact_id"], ["artifacts.id"], ondelete="CASCADE"),
        sa.ForeignKeyConstraint(["config_id"], ["configs.id"], ondelete="CASCADE"),
        sa.PrimaryKeyConstraint("config_id", "artifact_id"),
        sa.UniqueConstraint("artifact_id"),
        sa.UniqueConstraint("artifact_id", name="uq_artifact_id"),
    )

drop_config_artifacts_table(op)

Drop config_artifacts junction table.

Source code in src/chapkit/alembic_helpers.py
def drop_config_artifacts_table(op: Any) -> None:
    """Drop config_artifacts junction table."""
    op.drop_table("config_artifacts")

create_tasks_table(op)

Create tasks table for task execution infrastructure.

Source code in src/chapkit/alembic_helpers.py
def create_tasks_table(op: Any) -> None:
    """Create tasks table for task execution infrastructure."""
    op.create_table(
        "tasks",
        sa.Column("command", sa.Text(), nullable=False),
        sa.Column("task_type", sa.Text(), nullable=False, server_default="shell"),
        sa.Column("parameters", sa.JSON(), nullable=True),
        sa.Column("enabled", sa.Boolean(), nullable=False, server_default="1"),
        sa.Column("id", servicekit.types.ULIDType(length=26), nullable=False),
        sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.Column("updated_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.PrimaryKeyConstraint("id"),
    )

drop_tasks_table(op)

Drop tasks table.

Source code in src/chapkit/alembic_helpers.py
def drop_tasks_table(op: Any) -> None:
    """Drop tasks table."""
    op.drop_table("tasks")