Skip to content

API Reference

Complete API documentation for all chapkit modules, classes, and functions.

Artifact Module

Hierarchical storage system for models, data, and experiment tracking.

Models

models

Artifact ORM model for hierarchical data storage.

Classes

Artifact

Bases: Entity

ORM model for hierarchical artifacts with parent-child relationships.

Source code in src/chapkit/artifact/models.py
class Artifact(Entity):
    """ORM model for hierarchical artifacts with parent-child relationships."""

    __tablename__ = "artifacts"

    parent_id: Mapped[ULID | None] = mapped_column(
        ULIDType,
        ForeignKey("artifacts.id", ondelete="SET NULL"),
        nullable=True,
        index=True,
    )

    parent: Mapped[Artifact | None] = relationship(
        remote_side="Artifact.id",
        back_populates="children",
    )

    children: Mapped[list[Artifact]] = relationship(
        back_populates="parent",
    )

    data: Mapped[Any] = mapped_column(PickleType(protocol=4), nullable=False)
    level: Mapped[int] = mapped_column(default=0, nullable=False, index=True)

Schemas

schemas

Pydantic schemas for hierarchical artifacts with tree structures.

Attributes

ArtifactData = Annotated[MLTrainingArtifactData | MLPredictionArtifactData | GenericArtifactData, Field(discriminator='type')] module-attribute

Discriminated union type for all artifact data types.

Classes

ArtifactIn

Bases: EntityIn

Input schema for creating or updating artifacts.

Source code in src/chapkit/artifact/schemas.py
class ArtifactIn(EntityIn):
    """Input schema for creating or updating artifacts."""

    data: Any
    parent_id: ULID | None = None
    level: int | None = None

ArtifactOut

Bases: EntityOut

Output schema for artifact entities.

Source code in src/chapkit/artifact/schemas.py
class ArtifactOut(EntityOut):
    """Output schema for artifact entities."""

    data: JsonSafe
    parent_id: ULID | None = None
    level: int

ArtifactTreeNode

Bases: ArtifactOut

Artifact node with tree structure metadata.

Source code in src/chapkit/artifact/schemas.py
class ArtifactTreeNode(ArtifactOut):
    """Artifact node with tree structure metadata."""

    level_label: str | None = None
    hierarchy: str | None = None
    children: list["ArtifactTreeNode"] | None = None

    @classmethod
    def from_artifact(cls, artifact: ArtifactOut) -> Self:
        """Create a tree node from an artifact output schema."""
        return cls.model_validate(artifact.model_dump())
Functions
from_artifact(artifact) classmethod

Create a tree node from an artifact output schema.

Source code in src/chapkit/artifact/schemas.py
@classmethod
def from_artifact(cls, artifact: ArtifactOut) -> Self:
    """Create a tree node from an artifact output schema."""
    return cls.model_validate(artifact.model_dump())

ArtifactHierarchy

Bases: BaseModel

Configuration for artifact hierarchy with level labels.

Source code in src/chapkit/artifact/schemas.py
class ArtifactHierarchy(BaseModel):
    """Configuration for artifact hierarchy with level labels."""

    name: str = Field(..., description="Human readable name of this hierarchy")
    level_labels: Mapping[int, str] = Field(
        default_factory=dict,
        description="Mapping of numeric levels to labels (0 -> 'train', etc.)",
    )

    model_config = {"frozen": True}

    hierarchy_key: ClassVar[str] = "hierarchy"
    depth_key: ClassVar[str] = "level_depth"
    label_key: ClassVar[str] = "level_label"

    def label_for(self, level: int) -> str:
        """Get the label for a given level or return default."""
        return self.level_labels.get(level, f"level_{level}")

    def describe(self, level: int) -> dict[str, Any]:
        """Get hierarchy metadata dict for a given level."""
        return {
            self.hierarchy_key: self.name,
            self.depth_key: level,
            self.label_key: self.label_for(level),
        }
Functions
label_for(level)

Get the label for a given level or return default.

Source code in src/chapkit/artifact/schemas.py
def label_for(self, level: int) -> str:
    """Get the label for a given level or return default."""
    return self.level_labels.get(level, f"level_{level}")
describe(level)

Get hierarchy metadata dict for a given level.

Source code in src/chapkit/artifact/schemas.py
def describe(self, level: int) -> dict[str, Any]:
    """Get hierarchy metadata dict for a given level."""
    return {
        self.hierarchy_key: self.name,
        self.depth_key: level,
        self.label_key: self.label_for(level),
    }

BaseArtifactData

Bases: BaseModel

Base class for all artifact data types with typed metadata.

Source code in src/chapkit/artifact/schemas.py
class BaseArtifactData[MetadataT: BaseModel](BaseModel):
    """Base class for all artifact data types with typed metadata."""

    type: str = Field(description="Discriminator field for artifact type")
    metadata: MetadataT = Field(description="Strongly-typed JSON-serializable metadata")
    content: JsonSafe = Field(description="Content as Python object (bytes, DataFrame, models, etc.)")
    content_type: str | None = Field(default=None, description="MIME type for download responses")
    content_size: int | None = Field(default=None, description="Size of content in bytes")

    model_config = {"extra": "forbid"}

MLMetadata

Bases: BaseModel

Metadata for ML artifacts (training and prediction).

Source code in src/chapkit/artifact/schemas.py
class MLMetadata(BaseModel):
    """Metadata for ML artifacts (training and prediction)."""

    status: Literal["success", "failed"] = Field(description="Job execution status")
    config_id: str = Field(description="ID of the config used for this operation")
    started_at: str = Field(description="ISO 8601 timestamp when operation started")
    completed_at: str = Field(description="ISO 8601 timestamp when operation completed")
    duration_seconds: float = Field(description="Operation duration in seconds")

GenericMetadata

Bases: BaseModel

Free-form metadata for generic artifacts.

Source code in src/chapkit/artifact/schemas.py
class GenericMetadata(BaseModel):
    """Free-form metadata for generic artifacts."""

    model_config = {"extra": "allow"}

MLTrainingArtifactData

Bases: BaseArtifactData[MLMetadata]

Schema for ML training artifact data with trained model.

Source code in src/chapkit/artifact/schemas.py
class MLTrainingArtifactData(BaseArtifactData[MLMetadata]):
    """Schema for ML training artifact data with trained model."""

    type: Literal["ml_training"] = Field(default="ml_training", frozen=True)  # pyright: ignore[reportIncompatibleVariableOverride]
    metadata: MLMetadata

MLPredictionArtifactData

Bases: BaseArtifactData[MLMetadata]

Schema for ML prediction artifact data with results.

Source code in src/chapkit/artifact/schemas.py
class MLPredictionArtifactData(BaseArtifactData[MLMetadata]):
    """Schema for ML prediction artifact data with results."""

    type: Literal["ml_prediction"] = Field(default="ml_prediction", frozen=True)  # pyright: ignore[reportIncompatibleVariableOverride]
    metadata: MLMetadata

GenericArtifactData

Bases: BaseArtifactData[GenericMetadata]

Schema for generic artifact data with free-form metadata.

Source code in src/chapkit/artifact/schemas.py
class GenericArtifactData(BaseArtifactData[GenericMetadata]):
    """Schema for generic artifact data with free-form metadata."""

    type: Literal["generic"] = Field(default="generic", frozen=True)  # pyright: ignore[reportIncompatibleVariableOverride]
    metadata: GenericMetadata

Functions

validate_artifact_data(data)

Validate artifact data against appropriate schema based on type field.

Source code in src/chapkit/artifact/schemas.py
def validate_artifact_data(data: dict[str, Any]) -> BaseArtifactData:
    """Validate artifact data against appropriate schema based on type field."""
    artifact_type = data.get("type", "generic")

    schema_map: dict[str, type[BaseArtifactData]] = {
        "ml_training": MLTrainingArtifactData,
        "ml_prediction": MLPredictionArtifactData,
        "generic": GenericArtifactData,
    }

    schema = schema_map.get(artifact_type, GenericArtifactData)
    return schema.model_validate(data)

Repository

repository

Artifact repository for hierarchical data access with tree traversal.

Classes

ArtifactRepository

Bases: BaseRepository[Artifact, ULID]

Repository for Artifact entities with tree traversal operations.

Source code in src/chapkit/artifact/repository.py
class ArtifactRepository(BaseRepository[Artifact, ULID]):
    """Repository for Artifact entities with tree traversal operations."""

    def __init__(self, session: AsyncSession) -> None:
        """Initialize artifact repository with database session."""
        super().__init__(session, Artifact)

    async def find_by_id(self, id: ULID) -> Artifact | None:
        """Find an artifact by ID with children eagerly loaded."""
        return await self.s.get(self.model, id, options=[selectinload(self.model.children)])

    async def find_subtree(self, start_id: ULID) -> Iterable[Artifact]:
        """Find all artifacts in the subtree rooted at the given ID using recursive CTE."""
        cte = select(self.model.id).where(self.model.id == start_id).cte(name="descendants", recursive=True)
        cte = cte.union_all(select(self.model.id).where(self.model.parent_id == cte.c.id))

        subtree_ids = (await self.s.scalars(select(cte.c.id))).all()
        rows = (await self.s.scalars(select(self.model).where(self.model.id.in_(subtree_ids)))).all()
        return rows

    async def get_root_artifact(self, artifact_id: ULID) -> Artifact | None:
        """Find the root artifact by traversing up the parent chain."""
        artifact = await self.s.get(self.model, artifact_id)
        if artifact is None:
            return None

        while artifact.parent_id is not None:
            parent = await self.s.get(self.model, artifact.parent_id)
            if parent is None:
                break
            artifact = parent

        return artifact
Functions
__init__(session)

Initialize artifact repository with database session.

Source code in src/chapkit/artifact/repository.py
def __init__(self, session: AsyncSession) -> None:
    """Initialize artifact repository with database session."""
    super().__init__(session, Artifact)
find_by_id(id) async

Find an artifact by ID with children eagerly loaded.

Source code in src/chapkit/artifact/repository.py
async def find_by_id(self, id: ULID) -> Artifact | None:
    """Find an artifact by ID with children eagerly loaded."""
    return await self.s.get(self.model, id, options=[selectinload(self.model.children)])
find_subtree(start_id) async

Find all artifacts in the subtree rooted at the given ID using recursive CTE.

Source code in src/chapkit/artifact/repository.py
async def find_subtree(self, start_id: ULID) -> Iterable[Artifact]:
    """Find all artifacts in the subtree rooted at the given ID using recursive CTE."""
    cte = select(self.model.id).where(self.model.id == start_id).cte(name="descendants", recursive=True)
    cte = cte.union_all(select(self.model.id).where(self.model.parent_id == cte.c.id))

    subtree_ids = (await self.s.scalars(select(cte.c.id))).all()
    rows = (await self.s.scalars(select(self.model).where(self.model.id.in_(subtree_ids)))).all()
    return rows
get_root_artifact(artifact_id) async

Find the root artifact by traversing up the parent chain.

Source code in src/chapkit/artifact/repository.py
async def get_root_artifact(self, artifact_id: ULID) -> Artifact | None:
    """Find the root artifact by traversing up the parent chain."""
    artifact = await self.s.get(self.model, artifact_id)
    if artifact is None:
        return None

    while artifact.parent_id is not None:
        parent = await self.s.get(self.model, artifact.parent_id)
        if parent is None:
            break
        artifact = parent

    return artifact

Manager

manager

Artifact manager for hierarchical data with parent-child relationships.

Classes

ArtifactManager

Bases: BaseManager[Artifact, ArtifactIn, ArtifactOut, ULID]

Manager for Artifact entities with hierarchical tree operations.

Source code in src/chapkit/artifact/manager.py
class ArtifactManager(BaseManager[Artifact, ArtifactIn, ArtifactOut, ULID]):
    """Manager for Artifact entities with hierarchical tree operations."""

    def __init__(
        self,
        repo: ArtifactRepository,
        hierarchy: ArtifactHierarchy | None = None,
    ) -> None:
        """Initialize artifact manager with repository and optional hierarchy."""
        super().__init__(repo, Artifact, ArtifactOut)
        self.repository: ArtifactRepository = repo
        self.hierarchy = hierarchy

    # Public API ------------------------------------------------------

    async def find_subtree(self, start_id: ULID) -> list[ArtifactTreeNode]:
        """Find all artifacts in the subtree rooted at the given ID."""
        artifacts = await self.repository.find_subtree(start_id)
        return [self._to_tree_node(artifact) for artifact in artifacts]

    async def expand_artifact(self, artifact_id: ULID) -> ArtifactTreeNode | None:
        """Expand a single artifact with hierarchy metadata but without children."""
        artifact = await self.repository.find_by_id(artifact_id)
        if artifact is None:
            return None

        node = self._to_tree_node(artifact)
        node.children = None

        return node

    async def build_tree(self, start_id: ULID) -> ArtifactTreeNode | None:
        """Build a hierarchical tree structure rooted at the given artifact ID."""
        artifacts = await self.find_subtree(start_id)
        if not artifacts:
            return None

        node_map: dict[ULID, ArtifactTreeNode] = {}
        for node in artifacts:
            node.children = []
            node_map[node.id] = node

        for node in artifacts:
            if node.parent_id is None:
                continue
            parent = node_map.get(node.parent_id)
            if parent is None:
                continue
            if parent.children is None:
                parent.children = []
            parent.children.append(node)

        # Keep children as [] for leaf nodes (semantic: "loaded but empty")
        # Only expand_artifact sets children=None (semantic: "not loaded")

        root = node_map.get(start_id)

        return root

    # Lifecycle overrides --------------------------------------------

    def _should_assign_field(self, field: str, value: object) -> bool:
        """Prevent assigning None to level field during updates."""
        if field == "level" and value is None:
            return False
        return super()._should_assign_field(field, value)

    async def pre_save(self, entity: Artifact, data: ArtifactIn) -> None:
        """Compute and set artifact level before saving."""
        entity.level = await self._compute_level(entity.parent_id)

    async def pre_update(self, entity: Artifact, data: ArtifactIn, old_values: dict[str, object]) -> None:
        """Recalculate artifact level and cascade updates to descendants if parent changed."""
        previous_level = old_values.get("level", entity.level)
        entity.level = await self._compute_level(entity.parent_id)
        parent_changed = old_values.get("parent_id") != entity.parent_id
        if parent_changed or previous_level != entity.level:
            await self._recalculate_descendants(entity)

    # Helper utilities ------------------------------------------------

    async def _compute_level(self, parent_id: ULID | None) -> int:
        """Compute the level of an artifact based on its parent."""
        if parent_id is None:
            return 0
        parent = await self.repository.find_by_id(parent_id)
        if parent is None:
            return 0  # pragma: no cover
        return parent.level + 1

    async def _recalculate_descendants(self, entity: Artifact) -> None:
        """Recalculate levels for all descendants of an artifact."""
        subtree = await self.repository.find_subtree(entity.id)
        by_parent: dict[ULID | None, list[Artifact]] = {}
        for node in subtree:
            by_parent.setdefault(node.parent_id, []).append(node)

        queue: deque[Artifact] = deque([entity])
        while queue:
            current = queue.popleft()
            for child in by_parent.get(current.id, []):
                child.level = current.level + 1
                queue.append(child)

    def _to_tree_node(self, entity: Artifact) -> ArtifactTreeNode:
        """Convert artifact entity to tree node with hierarchy metadata."""
        base = super()._to_output_schema(entity)
        node = ArtifactTreeNode.from_artifact(base)
        if self.hierarchy is not None:
            meta = self.hierarchy.describe(node.level)
            hierarchy_value = meta.get(self.hierarchy.hierarchy_key)
            if hierarchy_value is not None:
                node.hierarchy = str(hierarchy_value)
            label_value = meta.get(self.hierarchy.label_key)
            if label_value is not None:
                node.level_label = str(label_value)

        return node
Functions
__init__(repo, hierarchy=None)

Initialize artifact manager with repository and optional hierarchy.

Source code in src/chapkit/artifact/manager.py
def __init__(
    self,
    repo: ArtifactRepository,
    hierarchy: ArtifactHierarchy | None = None,
) -> None:
    """Initialize artifact manager with repository and optional hierarchy."""
    super().__init__(repo, Artifact, ArtifactOut)
    self.repository: ArtifactRepository = repo
    self.hierarchy = hierarchy
find_subtree(start_id) async

Find all artifacts in the subtree rooted at the given ID.

Source code in src/chapkit/artifact/manager.py
async def find_subtree(self, start_id: ULID) -> list[ArtifactTreeNode]:
    """Find all artifacts in the subtree rooted at the given ID."""
    artifacts = await self.repository.find_subtree(start_id)
    return [self._to_tree_node(artifact) for artifact in artifacts]
expand_artifact(artifact_id) async

Expand a single artifact with hierarchy metadata but without children.

Source code in src/chapkit/artifact/manager.py
async def expand_artifact(self, artifact_id: ULID) -> ArtifactTreeNode | None:
    """Expand a single artifact with hierarchy metadata but without children."""
    artifact = await self.repository.find_by_id(artifact_id)
    if artifact is None:
        return None

    node = self._to_tree_node(artifact)
    node.children = None

    return node
build_tree(start_id) async

Build a hierarchical tree structure rooted at the given artifact ID.

Source code in src/chapkit/artifact/manager.py
async def build_tree(self, start_id: ULID) -> ArtifactTreeNode | None:
    """Build a hierarchical tree structure rooted at the given artifact ID."""
    artifacts = await self.find_subtree(start_id)
    if not artifacts:
        return None

    node_map: dict[ULID, ArtifactTreeNode] = {}
    for node in artifacts:
        node.children = []
        node_map[node.id] = node

    for node in artifacts:
        if node.parent_id is None:
            continue
        parent = node_map.get(node.parent_id)
        if parent is None:
            continue
        if parent.children is None:
            parent.children = []
        parent.children.append(node)

    # Keep children as [] for leaf nodes (semantic: "loaded but empty")
    # Only expand_artifact sets children=None (semantic: "not loaded")

    root = node_map.get(start_id)

    return root
pre_save(entity, data) async

Compute and set artifact level before saving.

Source code in src/chapkit/artifact/manager.py
async def pre_save(self, entity: Artifact, data: ArtifactIn) -> None:
    """Compute and set artifact level before saving."""
    entity.level = await self._compute_level(entity.parent_id)
pre_update(entity, data, old_values) async

Recalculate artifact level and cascade updates to descendants if parent changed.

Source code in src/chapkit/artifact/manager.py
async def pre_update(self, entity: Artifact, data: ArtifactIn, old_values: dict[str, object]) -> None:
    """Recalculate artifact level and cascade updates to descendants if parent changed."""
    previous_level = old_values.get("level", entity.level)
    entity.level = await self._compute_level(entity.parent_id)
    parent_changed = old_values.get("parent_id") != entity.parent_id
    if parent_changed or previous_level != entity.level:
        await self._recalculate_descendants(entity)

Router

router

Artifact CRUD router with hierarchical tree operations.

Classes

ArtifactRouter

Bases: CrudRouter[ArtifactIn, ArtifactOut]

CRUD router for Artifact entities with tree operations.

Source code in src/chapkit/artifact/router.py
class ArtifactRouter(CrudRouter[ArtifactIn, ArtifactOut]):
    """CRUD router for Artifact entities with tree operations."""

    def __init__(
        self,
        prefix: str,
        tags: Sequence[str],
        manager_factory: Any,
        entity_in_type: type[ArtifactIn],
        entity_out_type: type[ArtifactOut],
        permissions: CrudPermissions | None = None,
        enable_config_access: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize artifact router with entity types and manager factory."""
        # Store enable_config_access to conditionally register config endpoint
        self.enable_config_access = enable_config_access

        super().__init__(
            prefix=prefix,
            tags=list(tags),
            entity_in_type=entity_in_type,
            entity_out_type=entity_out_type,
            manager_factory=manager_factory,
            permissions=permissions,
            **kwargs,
        )

    def _register_routes(self) -> None:
        """Register artifact CRUD routes and tree operations."""
        super()._register_routes()

        manager_factory = self.manager_factory

        async def expand_artifact(
            entity_id: str,
            manager: ArtifactManager = Depends(manager_factory),
        ) -> ArtifactTreeNode:
            ulid_id = self._parse_ulid(entity_id)

            expanded = await manager.expand_artifact(ulid_id)
            if expanded is None:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail=f"Artifact with id {entity_id} not found",
                )
            return expanded

        async def build_tree(
            entity_id: str,
            manager: ArtifactManager = Depends(manager_factory),
        ) -> ArtifactTreeNode:
            ulid_id = self._parse_ulid(entity_id)

            tree = await manager.build_tree(ulid_id)
            if tree is None:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail=f"Artifact with id {entity_id} not found",
                )
            return tree

        self.register_entity_operation(
            "expand",
            expand_artifact,
            response_model=ArtifactTreeNode,
            summary="Expand artifact",
            description="Get artifact with hierarchy metadata but without children",
        )

        self.register_entity_operation(
            "tree",
            build_tree,
            response_model=ArtifactTreeNode,
            summary="Build artifact tree",
            description="Build hierarchical tree structure rooted at the given artifact",
        )

        # Conditionally register config access endpoint
        if self.enable_config_access:
            from ..api.dependencies import get_config_manager
            from ..config.manager import ConfigManager

            async def get_config(
                entity_id: str,
                artifact_manager: ArtifactManager = Depends(manager_factory),
                config_manager: ConfigManager[BaseConfig] = Depends(get_config_manager),
            ) -> ConfigOut[BaseConfig]:
                """Get the config linked to this artifact."""
                ulid_id = self._parse_ulid(entity_id)

                # Get config by traversing to root artifact
                config = await config_manager.get_config_for_artifact(
                    artifact_id=ulid_id, artifact_repo=artifact_manager.repository
                )

                if config is None:
                    raise HTTPException(
                        status_code=status.HTTP_404_NOT_FOUND,
                        detail=f"No config linked to artifact {entity_id}",
                    )

                return config

            self.register_entity_operation(
                "config",
                get_config,
                response_model=ConfigOut[BaseConfig],
                summary="Get artifact config",
                description="Get configuration linked to this artifact by traversing to root",
            )

        # Download endpoint
        async def download_artifact(
            entity_id: str,
            manager: ArtifactManager = Depends(manager_factory),
        ) -> Response:
            """Download artifact content as binary file."""
            ulid_id = self._parse_ulid(entity_id)

            artifact = await manager.find_by_id(ulid_id)
            if artifact is None:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail=f"Artifact with id {entity_id} not found",
                )

            if not isinstance(artifact.data, dict):
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail="Artifact has no downloadable content",
                )

            content = artifact.data.get("content")
            if content is None:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail="Artifact has no content",
                )

            content_type = artifact.data.get("content_type", "application/octet-stream")

            # Serialize content to bytes based on type
            if isinstance(content, bytes):
                # Most common case: ZIP files, PNG images, etc.
                binary = content
            elif isinstance(content, DataFrame):
                # Serialize DataFrame based on content_type
                if content_type == "text/csv":
                    csv_string = content.to_csv()
                    binary = csv_string.encode() if csv_string else b""
                else:
                    # Default to JSON for all other types
                    binary = content.to_json().encode()
            elif isinstance(content, dict):
                # DataFrame serialized to dict in database - reconstruct and serialize
                if content_type == "application/vnd.chapkit.dataframe+json":
                    df = DataFrame.model_validate(content)
                    binary = df.to_json().encode()
                else:
                    # Generic dict content - serialize to JSON
                    import json

                    binary = json.dumps(content).encode()
            else:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=f"Cannot serialize content of type {type(content).__name__}",
                )

            # Determine filename extension
            extension_map = {
                "application/zip": "zip",
                "text/csv": "csv",
                "application/json": "json",
                "application/vnd.chapkit.dataframe+json": "json",
                "image/png": "png",
            }
            ext = extension_map.get(content_type, "bin")

            return Response(
                content=binary,
                media_type=content_type,
                headers={"Content-Disposition": f"attachment; filename=artifact_{entity_id}.{ext}"},
            )

        # Metadata endpoint
        async def get_artifact_metadata(
            entity_id: str,
            manager: ArtifactManager = Depends(manager_factory),
        ) -> dict[str, Any]:
            """Get only JSON-serializable metadata, excluding binary content."""
            ulid_id = self._parse_ulid(entity_id)

            artifact = await manager.find_by_id(ulid_id)
            if artifact is None:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail=f"Artifact with id {entity_id} not found",
                )

            if not isinstance(artifact.data, dict):
                return {}

            return artifact.data.get("metadata", {})

        self.register_entity_operation(
            "download",
            download_artifact,
            response_model=None,  # Raw Response, don't serialize
            summary="Download artifact content",
            description="Download artifact content as binary file (ZIP, CSV, etc.)",
        )

        self.register_entity_operation(
            "metadata",
            get_artifact_metadata,
            response_model=dict[str, Any],
            summary="Get artifact metadata",
            description="Get JSON-serializable metadata without binary content",
        )
Functions
__init__(prefix, tags, manager_factory, entity_in_type, entity_out_type, permissions=None, enable_config_access=False, **kwargs)

Initialize artifact router with entity types and manager factory.

Source code in src/chapkit/artifact/router.py
def __init__(
    self,
    prefix: str,
    tags: Sequence[str],
    manager_factory: Any,
    entity_in_type: type[ArtifactIn],
    entity_out_type: type[ArtifactOut],
    permissions: CrudPermissions | None = None,
    enable_config_access: bool = False,
    **kwargs: Any,
) -> None:
    """Initialize artifact router with entity types and manager factory."""
    # Store enable_config_access to conditionally register config endpoint
    self.enable_config_access = enable_config_access

    super().__init__(
        prefix=prefix,
        tags=list(tags),
        entity_in_type=entity_in_type,
        entity_out_type=entity_out_type,
        manager_factory=manager_factory,
        permissions=permissions,
        **kwargs,
    )

Task Module

Registry-based task execution system for Python functions with dependency injection.

Registry

registry

Global registry for Python task functions with metadata support.

Classes

TaskMetadata

Bases: TypedDict

Metadata for a registered task.

Source code in src/chapkit/task/registry.py
class TaskMetadata(TypedDict):
    """Metadata for a registered task."""

    func: Callable[..., Any]
    tags: list[str]

TaskRegistry

Global registry for Python task functions with tags and metadata.

Source code in src/chapkit/task/registry.py
class TaskRegistry:
    """Global registry for Python task functions with tags and metadata."""

    _registry: dict[str, TaskMetadata] = {}

    @classmethod
    def register(
        cls,
        name: str,
        tags: list[str] | None = None,
    ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
        """Decorator to register a task function with optional tags."""
        # Validate URL-safe name
        if not re.match(r"^[a-zA-Z0-9_-]+$", name):
            raise ValueError(f"Task name '{name}' must be URL-safe (alphanumeric, underscore, hyphen only)")

        def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
            if name in cls._registry:
                raise ValueError(f"Task '{name}' already registered")
            cls._registry[name] = {
                "func": func,
                "tags": tags or [],
            }
            return func

        return decorator

    @classmethod
    def register_function(
        cls,
        name: str,
        func: Callable[..., Any],
        tags: list[str] | None = None,
    ) -> None:
        """Imperatively register a task function with optional tags."""
        # Validate URL-safe name
        if not re.match(r"^[a-zA-Z0-9_-]+$", name):
            raise ValueError(f"Task name '{name}' must be URL-safe (alphanumeric, underscore, hyphen only)")

        if name in cls._registry:
            raise ValueError(f"Task '{name}' already registered")
        cls._registry[name] = {
            "func": func,
            "tags": tags or [],
        }

    @classmethod
    def get(cls, name: str) -> Callable[..., Any]:
        """Retrieve a registered task function."""
        if name not in cls._registry:
            raise KeyError(f"Task '{name}' not found in registry")
        return cls._registry[name]["func"]

    @classmethod
    def has(cls, name: str) -> bool:
        """Check if a task is registered."""
        return name in cls._registry

    @classmethod
    def get_tags(cls, name: str) -> list[str]:
        """Get tags for a registered task."""
        if name not in cls._registry:
            raise KeyError(f"Task '{name}' not found in registry")
        return cls._registry[name]["tags"]

    @classmethod
    def get_info(cls, name: str) -> "TaskInfo":
        """Get metadata for a registered task."""
        from .schemas import ParameterInfo, TaskInfo

        if name not in cls._registry:
            raise KeyError(f"Task '{name}' not found in registry")

        metadata = cls._registry[name]
        func = metadata["func"]
        sig = inspect.signature(func)

        # Extract parameter info
        parameters = []
        for param_name, param in sig.parameters.items():
            if param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD):
                continue
            parameters.append(
                ParameterInfo(
                    name=param_name,
                    annotation=str(param.annotation) if param.annotation != param.empty else None,
                    default=str(param.default) if param.default != param.empty else None,
                    required=param.default == param.empty,
                )
            )

        return TaskInfo(
            name=name,
            docstring=inspect.getdoc(func),
            signature=str(sig),
            parameters=parameters,
            tags=metadata["tags"],
        )

    @classmethod
    def list_all(cls) -> list[str]:
        """List all registered task names."""
        return sorted(cls._registry.keys())

    @classmethod
    def list_all_info(cls) -> list["TaskInfo"]:
        """List metadata for all registered tasks."""
        return [cls.get_info(name) for name in cls.list_all()]

    @classmethod
    def list_by_tags(cls, tags: list[str]) -> list[str]:
        """List task names that have ALL specified tags."""
        if not tags:
            return cls.list_all()

        matching_tasks = []
        for name, metadata in cls._registry.items():
            task_tags = set(metadata["tags"])
            if all(tag in task_tags for tag in tags):
                matching_tasks.append(name)

        return sorted(matching_tasks)

    @classmethod
    def clear(cls) -> None:
        """Clear all registered tasks (useful for testing and hot-reload)."""
        cls._registry.clear()
Functions
register(name, tags=None) classmethod

Decorator to register a task function with optional tags.

Source code in src/chapkit/task/registry.py
@classmethod
def register(
    cls,
    name: str,
    tags: list[str] | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator to register a task function with optional tags."""
    # Validate URL-safe name
    if not re.match(r"^[a-zA-Z0-9_-]+$", name):
        raise ValueError(f"Task name '{name}' must be URL-safe (alphanumeric, underscore, hyphen only)")

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        if name in cls._registry:
            raise ValueError(f"Task '{name}' already registered")
        cls._registry[name] = {
            "func": func,
            "tags": tags or [],
        }
        return func

    return decorator
register_function(name, func, tags=None) classmethod

Imperatively register a task function with optional tags.

Source code in src/chapkit/task/registry.py
@classmethod
def register_function(
    cls,
    name: str,
    func: Callable[..., Any],
    tags: list[str] | None = None,
) -> None:
    """Imperatively register a task function with optional tags."""
    # Validate URL-safe name
    if not re.match(r"^[a-zA-Z0-9_-]+$", name):
        raise ValueError(f"Task name '{name}' must be URL-safe (alphanumeric, underscore, hyphen only)")

    if name in cls._registry:
        raise ValueError(f"Task '{name}' already registered")
    cls._registry[name] = {
        "func": func,
        "tags": tags or [],
    }
get(name) classmethod

Retrieve a registered task function.

Source code in src/chapkit/task/registry.py
@classmethod
def get(cls, name: str) -> Callable[..., Any]:
    """Retrieve a registered task function."""
    if name not in cls._registry:
        raise KeyError(f"Task '{name}' not found in registry")
    return cls._registry[name]["func"]
has(name) classmethod

Check if a task is registered.

Source code in src/chapkit/task/registry.py
@classmethod
def has(cls, name: str) -> bool:
    """Check if a task is registered."""
    return name in cls._registry
get_tags(name) classmethod

Get tags for a registered task.

Source code in src/chapkit/task/registry.py
@classmethod
def get_tags(cls, name: str) -> list[str]:
    """Get tags for a registered task."""
    if name not in cls._registry:
        raise KeyError(f"Task '{name}' not found in registry")
    return cls._registry[name]["tags"]
get_info(name) classmethod

Get metadata for a registered task.

Source code in src/chapkit/task/registry.py
@classmethod
def get_info(cls, name: str) -> "TaskInfo":
    """Get metadata for a registered task."""
    from .schemas import ParameterInfo, TaskInfo

    if name not in cls._registry:
        raise KeyError(f"Task '{name}' not found in registry")

    metadata = cls._registry[name]
    func = metadata["func"]
    sig = inspect.signature(func)

    # Extract parameter info
    parameters = []
    for param_name, param in sig.parameters.items():
        if param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD):
            continue
        parameters.append(
            ParameterInfo(
                name=param_name,
                annotation=str(param.annotation) if param.annotation != param.empty else None,
                default=str(param.default) if param.default != param.empty else None,
                required=param.default == param.empty,
            )
        )

    return TaskInfo(
        name=name,
        docstring=inspect.getdoc(func),
        signature=str(sig),
        parameters=parameters,
        tags=metadata["tags"],
    )
list_all() classmethod

List all registered task names.

Source code in src/chapkit/task/registry.py
@classmethod
def list_all(cls) -> list[str]:
    """List all registered task names."""
    return sorted(cls._registry.keys())
list_all_info() classmethod

List metadata for all registered tasks.

Source code in src/chapkit/task/registry.py
@classmethod
def list_all_info(cls) -> list["TaskInfo"]:
    """List metadata for all registered tasks."""
    return [cls.get_info(name) for name in cls.list_all()]
list_by_tags(tags) classmethod

List task names that have ALL specified tags.

Source code in src/chapkit/task/registry.py
@classmethod
def list_by_tags(cls, tags: list[str]) -> list[str]:
    """List task names that have ALL specified tags."""
    if not tags:
        return cls.list_all()

    matching_tasks = []
    for name, metadata in cls._registry.items():
        task_tags = set(metadata["tags"])
        if all(tag in task_tags for tag in tags):
            matching_tasks.append(name)

    return sorted(matching_tasks)
clear() classmethod

Clear all registered tasks (useful for testing and hot-reload).

Source code in src/chapkit/task/registry.py
@classmethod
def clear(cls) -> None:
    """Clear all registered tasks (useful for testing and hot-reload)."""
    cls._registry.clear()

Executor

executor

Task executor for registry-based execution with dependency injection.

Classes

TaskExecutor

Executes registered task functions with dependency injection.

Source code in src/chapkit/task/executor.py
class TaskExecutor:
    """Executes registered task functions with dependency injection."""

    def __init__(
        self,
        database: Database,
        scheduler: ChapkitScheduler | None = None,
        artifact_manager: ArtifactManager | None = None,
    ) -> None:
        """Initialize task executor with framework dependencies."""
        self.database = database
        self.scheduler = scheduler
        self.artifact_manager = artifact_manager

    async def execute(self, name: str, params: dict[str, Any] | None = None) -> Any:
        """Execute registered function by name with runtime parameters and return result."""
        # Verify function exists
        if not TaskRegistry.has(name):
            raise ValueError(f"Task '{name}' not found in registry")

        # Get function from registry
        func = TaskRegistry.get(name)

        # Create a database session for potential injection
        async with self.database.session() as session:
            # Inject framework dependencies based on function signature
            final_params = self._inject_parameters(func, params or {}, session)

            # Handle sync/async functions
            if inspect.iscoroutinefunction(func):
                result = await func(**final_params)
            else:
                result = await asyncio.to_thread(func, **final_params)

        return result

    def _is_injectable_type(self, param_type: type | None) -> bool:
        """Check if a parameter type should be injected by the framework."""
        if param_type is None:
            return False

        # Handle Optional[Type] -> extract the non-None type
        origin = get_origin(param_type)
        if origin is types.UnionType or origin is Union:
            args = getattr(param_type, "__args__", ())
            non_none_types = [arg for arg in args if arg is not type(None)]
            if len(non_none_types) == 1:
                param_type = non_none_types[0]

        return param_type in INJECTABLE_TYPES

    def _build_injection_map(self, session: AsyncSession | None) -> dict[type, Any]:
        """Build map of injectable types to their instances."""
        injection_map: dict[type, Any] = {
            AsyncSession: session,
            Database: self.database,
        }
        # Add optional dependencies if available
        if self.scheduler is not None:
            injection_map[ChapkitScheduler] = self.scheduler
        if self.artifact_manager is not None:
            injection_map[ArtifactManager] = self.artifact_manager
        return injection_map

    def _inject_parameters(
        self,
        func: Any,
        user_params: dict[str, Any],
        session: AsyncSession | None,
    ) -> dict[str, Any]:
        """Merge user parameters with framework injections based on function signature."""
        sig = inspect.signature(func)
        type_hints = get_type_hints(func)

        # Build injection map
        injection_map = self._build_injection_map(session)

        # Start with user parameters
        final_params = dict(user_params)

        # Inspect each parameter in function signature
        for param_name, param in sig.parameters.items():
            # Skip self, *args, **kwargs
            if param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD):
                continue

            # Get type hint for this parameter
            param_type = type_hints.get(param_name)

            # Check if this type should be injected
            if self._is_injectable_type(param_type):
                # Get the actual type (handle Optional)
                actual_type = param_type
                origin = get_origin(param_type)
                if origin is types.UnionType or origin is Union:
                    args = getattr(param_type, "__args__", ())
                    non_none_types = [arg for arg in args if arg is not type(None)]
                    if non_none_types:
                        actual_type = non_none_types[0]

                # Inject if we have an instance of this type
                if actual_type in injection_map:
                    injectable_value = injection_map[actual_type]
                    # For required parameters, inject even if None
                    # For optional parameters, only inject if not None
                    if param.default is param.empty:
                        # Required parameter - inject whatever we have (even None)
                        final_params[param_name] = injectable_value
                    elif injectable_value is not None:
                        # Optional parameter - only inject if we have a value
                        final_params[param_name] = injectable_value
                continue

            # Not injectable - must come from user parameters
            if param_name not in final_params:
                # Check if parameter has a default value
                if param.default is not param.empty:
                    continue  # Will use default

                # Required parameter missing
                raise ValueError(
                    f"Missing required parameter '{param_name}' for task '{func.__name__}'. "
                    f"Parameter is not injectable and not provided in params."
                )

        return final_params
Functions
__init__(database, scheduler=None, artifact_manager=None)

Initialize task executor with framework dependencies.

Source code in src/chapkit/task/executor.py
def __init__(
    self,
    database: Database,
    scheduler: ChapkitScheduler | None = None,
    artifact_manager: ArtifactManager | None = None,
) -> None:
    """Initialize task executor with framework dependencies."""
    self.database = database
    self.scheduler = scheduler
    self.artifact_manager = artifact_manager
execute(name, params=None) async

Execute registered function by name with runtime parameters and return result.

Source code in src/chapkit/task/executor.py
async def execute(self, name: str, params: dict[str, Any] | None = None) -> Any:
    """Execute registered function by name with runtime parameters and return result."""
    # Verify function exists
    if not TaskRegistry.has(name):
        raise ValueError(f"Task '{name}' not found in registry")

    # Get function from registry
    func = TaskRegistry.get(name)

    # Create a database session for potential injection
    async with self.database.session() as session:
        # Inject framework dependencies based on function signature
        final_params = self._inject_parameters(func, params or {}, session)

        # Handle sync/async functions
        if inspect.iscoroutinefunction(func):
            result = await func(**final_params)
        else:
            result = await asyncio.to_thread(func, **final_params)

    return result

Router

router

Task router for registry-based execution.

Classes

TaskRouter

Router for task execution (registry-based, no CRUD).

Source code in src/chapkit/task/router.py
class TaskRouter:
    """Router for task execution (registry-based, no CRUD)."""

    def __init__(
        self,
        prefix: str,
        tags: Sequence[str],
        executor_factory: Any,
    ) -> None:
        """Initialize task router with executor factory."""
        self.prefix = prefix
        self.tags = tags
        self.executor_factory = executor_factory
        self.router = APIRouter(prefix=prefix, tags=list(tags))
        self._register_routes()

    @classmethod
    def create(
        cls,
        prefix: str,
        tags: Sequence[str],
        executor_factory: Any,
    ) -> TaskRouter:
        """Create a task router with executor factory."""
        return cls(prefix=prefix, tags=tags, executor_factory=executor_factory)

    def _register_routes(self) -> None:
        """Register task routes."""
        executor_factory = self.executor_factory

        @self.router.get("", response_model=list[TaskInfo])
        async def list_tasks() -> list[TaskInfo]:
            """List all registered tasks."""
            return TaskRegistry.list_all_info()

        @self.router.get("/{name}", response_model=TaskInfo)
        async def get_task(name: str) -> TaskInfo:
            """Get task metadata by name."""
            try:
                return TaskRegistry.get_info(name)
            except KeyError as e:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail=str(e),
                ) from e

        @self.router.post("/{name}/$execute", response_model=TaskExecuteResponse)
        async def execute_task(
            name: str,
            request: TaskExecuteRequest = TaskExecuteRequest(),
            executor: TaskExecutor = Depends(executor_factory),
        ) -> TaskExecuteResponse:
            """Execute task by name with runtime parameters and return result."""
            import traceback

            # Check if task exists
            if not TaskRegistry.has(name):
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail=f"Task '{name}' not found in registry",
                )

            params = request.params or {}

            # Execute task and handle errors
            try:
                result = await executor.execute(name, params)
                return TaskExecuteResponse(
                    task_name=name,
                    params=params,
                    result=result,
                    error=None,
                )
            except Exception as e:
                # Return error in response (don't raise exception)
                return TaskExecuteResponse(
                    task_name=name,
                    params=params,
                    result=None,
                    error={
                        "type": type(e).__name__,
                        "message": str(e),
                        "traceback": traceback.format_exc(),
                    },
                )
Functions
__init__(prefix, tags, executor_factory)

Initialize task router with executor factory.

Source code in src/chapkit/task/router.py
def __init__(
    self,
    prefix: str,
    tags: Sequence[str],
    executor_factory: Any,
) -> None:
    """Initialize task router with executor factory."""
    self.prefix = prefix
    self.tags = tags
    self.executor_factory = executor_factory
    self.router = APIRouter(prefix=prefix, tags=list(tags))
    self._register_routes()
create(prefix, tags, executor_factory) classmethod

Create a task router with executor factory.

Source code in src/chapkit/task/router.py
@classmethod
def create(
    cls,
    prefix: str,
    tags: Sequence[str],
    executor_factory: Any,
) -> TaskRouter:
    """Create a task router with executor factory."""
    return cls(prefix=prefix, tags=tags, executor_factory=executor_factory)

Schemas

schemas

Task schemas for registry-based execution.

Classes

ParameterInfo

Bases: BaseModel

Function parameter metadata.

Source code in src/chapkit/task/schemas.py
class ParameterInfo(BaseModel):
    """Function parameter metadata."""

    name: str = Field(description="Parameter name")
    annotation: str | None = Field(default=None, description="Type annotation as string")
    default: str | None = Field(default=None, description="Default value as string")
    required: bool = Field(description="Whether parameter is required")

TaskInfo

Bases: BaseModel

Task metadata from registry.

Source code in src/chapkit/task/schemas.py
class TaskInfo(BaseModel):
    """Task metadata from registry."""

    name: str = Field(description="Task name (URL-safe)")
    docstring: str | None = Field(default=None, description="Function docstring")
    signature: str = Field(description="Function signature")
    parameters: list[ParameterInfo] = Field(default_factory=list, description="Function parameters")
    tags: list[str] = Field(default_factory=list, description="Task tags for filtering")

TaskExecuteRequest

Bases: BaseModel

Request to execute a task.

Source code in src/chapkit/task/schemas.py
class TaskExecuteRequest(BaseModel):
    """Request to execute a task."""

    params: dict[str, Any] | None = Field(default=None, description="Runtime parameters for task execution")

TaskExecuteResponse

Bases: BaseModel

Response from task execution.

Source code in src/chapkit/task/schemas.py
class TaskExecuteResponse(BaseModel):
    """Response from task execution."""

    task_name: str = Field(description="Name of the executed task")
    params: dict[str, Any] = Field(default_factory=dict, description="Parameters used for execution")
    result: Any = Field(description="Task execution result")
    error: dict[str, str] | None = Field(default=None, description="Error information if execution failed")

Data Module

Universal DataFrame interchange format for tabular data across pandas, polars, xarray, and other libraries.

DataFrame

DataFrame

Bases: BaseModel

Universal interchange format for tabular data from pandas, polars, xarray, and other libraries.

Source code in src/chapkit/data/dataframe.py
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
class DataFrame(BaseModel):
    """Universal interchange format for tabular data from pandas, polars, xarray, and other libraries."""

    columns: list[str]
    data: list[list[Any]]

    @classmethod
    def from_pandas(cls, df: Any) -> Self:
        """Create schema from pandas DataFrame."""
        try:
            import pandas as pd
        except ImportError:
            raise ImportError("pandas is required for from_pandas(). Install with: uv add pandas") from None

        if not isinstance(df, pd.DataFrame):
            raise TypeError(f"Expected pandas DataFrame, got {type(df)}")

        return cls(
            columns=df.columns.tolist(),
            data=df.values.tolist(),
        )

    @classmethod
    def from_polars(cls, df: Any) -> Self:
        """Create schema from Polars DataFrame."""
        try:
            import polars as pl
        except ImportError:
            raise ImportError("polars is required for from_polars(). Install with: uv add polars") from None

        if not isinstance(df, pl.DataFrame):
            raise TypeError(f"Expected Polars DataFrame, got {type(df)}")

        return cls(
            columns=df.columns,
            data=[list(row) for row in df.rows()],
        )

    @classmethod
    def from_xarray(cls, da: Any) -> Self:
        """Create schema from xarray DataArray (2D only)."""
        try:
            import xarray as xr
        except ImportError:
            raise ImportError("xarray is required for from_xarray(). Install with: uv add xarray") from None

        if not isinstance(da, xr.DataArray):
            raise TypeError(f"Expected xarray DataArray, got {type(da)}")

        if len(da.dims) != 2:
            raise ValueError(f"Only 2D DataArrays supported, got {len(da.dims)} dimensions")

        # Convert to pandas then use from_pandas
        pdf = da.to_pandas()
        return cls.from_pandas(pdf)

    @classmethod
    def from_dict(cls, data: dict[str, list[Any]]) -> Self:
        """Create schema from dictionary of columns."""
        if not data:
            return cls(columns=[], data=[])

        columns = list(data.keys())
        num_rows = len(next(iter(data.values())))

        if not all(len(vals) == num_rows for vals in data.values()):
            raise ValueError("All columns must have the same length")

        rows = [[data[col][i] for col in columns] for i in range(num_rows)]

        return cls(columns=columns, data=rows)

    @classmethod
    def from_records(cls, records: list[dict[str, Any]]) -> Self:
        """Create schema from list of records (row-oriented)."""
        if not records:
            return cls(columns=[], data=[])

        columns = list(records[0].keys())
        data = [[record[col] for col in columns] for record in records]

        return cls(columns=columns, data=data)

    @classmethod
    def from_csv(
        cls,
        path: str | Path | None = None,
        *,
        csv_string: str | None = None,
        delimiter: str = ",",
        has_header: bool = True,
        encoding: str = "utf-8",
    ) -> Self:
        """Create DataFrame from CSV file or string."""
        # Validate mutually exclusive parameters
        if path is None and csv_string is None:
            raise ValueError("Either path or csv_string must be provided")
        if path is not None and csv_string is not None:
            raise ValueError("path and csv_string are mutually exclusive")

        # Read CSV data
        if path is not None:
            path_obj = Path(path)
            if not path_obj.exists():
                raise FileNotFoundError(f"File not found: {path}")
            with path_obj.open("r", encoding=encoding, newline="") as f:
                reader = csv.reader(f, delimiter=delimiter)
                rows = list(reader)
        else:
            # csv_string is not None
            string_io = io.StringIO(csv_string)
            reader = csv.reader(string_io, delimiter=delimiter)
            rows = list(reader)

        # Handle empty CSV
        if not rows:
            return cls(columns=[], data=[])

        # Extract columns and data
        if has_header:
            columns = rows[0]
            data = rows[1:]
        else:
            # Generate column names
            num_cols = len(rows[0]) if rows else 0
            columns = [f"col_{i}" for i in range(num_cols)]
            data = rows

        return cls(columns=columns, data=data)

    def to_pandas(self) -> Any:
        """Convert schema to pandas DataFrame."""
        try:
            import pandas as pd
        except ImportError:
            raise ImportError("pandas is required for to_pandas(). Install with: uv add pandas") from None

        return pd.DataFrame(self.data, columns=self.columns)

    def to_polars(self) -> Any:
        """Convert schema to Polars DataFrame."""
        try:
            import polars as pl
        except ImportError:
            raise ImportError("polars is required for to_polars(). Install with: uv add polars") from None

        return pl.DataFrame(self.data, schema=self.columns, orient="row")

    def to_dict(self, orient: Literal["dict", "list", "records"] = "dict") -> Any:
        """Convert schema to dictionary with specified orient (dict, list, or records)."""
        if orient == "dict":
            return {col: {i: self.data[i][j] for i in range(len(self.data))} for j, col in enumerate(self.columns)}
        elif orient == "list":
            return {col: [row[j] for row in self.data] for j, col in enumerate(self.columns)}
        elif orient == "records":
            return [{col: row[j] for j, col in enumerate(self.columns)} for row in self.data]
        else:
            raise ValueError(f"Invalid orient: {orient}")

    def to_csv(
        self,
        path: str | Path | None = None,
        *,
        delimiter: str = ",",
        include_header: bool = True,
        encoding: str = "utf-8",
    ) -> str | None:
        """Export DataFrame to CSV file or string."""
        # Write to string buffer or file
        if path is None:
            # Return as string
            output = io.StringIO()
            writer = csv.writer(output, delimiter=delimiter)

            if include_header:
                writer.writerow(self.columns)

            writer.writerows(self.data)

            return output.getvalue()
        else:
            # Write to file
            path_obj = Path(path)
            with path_obj.open("w", encoding=encoding, newline="") as f:
                writer = csv.writer(f, delimiter=delimiter)

                if include_header:
                    writer.writerow(self.columns)

                writer.writerows(self.data)

            return None

    # Convenience aliases
    from_dataframe = from_pandas
    to_dataframe = to_pandas

    @property
    def shape(self) -> tuple[int, int]:
        """Return tuple representing dimensionality of the DataFrame."""
        return (len(self.data), len(self.columns))

    @property
    def empty(self) -> bool:
        """Indicator whether DataFrame is empty."""
        return len(self.data) == 0 or len(self.columns) == 0

    @property
    def size(self) -> int:
        """Return int representing number of elements in this object."""
        return len(self.data) * len(self.columns)

    @property
    def ndim(self) -> int:
        """Return int representing number of axes/array dimensions."""
        return 2

    def head(self, n: int = 5) -> Self:
        """Return first n rows."""
        if n >= 0:
            selected_data = self.data[:n]
        else:
            selected_data = self.data[:n] if n != 0 else self.data
        return self.__class__(columns=self.columns, data=selected_data)

    def tail(self, n: int = 5) -> Self:
        """Return last n rows."""
        if n >= 0:
            selected_data = self.data[-n:] if n > 0 else []
        else:
            selected_data = self.data[abs(n) :]
        return self.__class__(columns=self.columns, data=selected_data)

    def sample(
        self,
        n: int | None = None,
        frac: float | None = None,
        *,
        random_state: int | None = None,
    ) -> Self:
        """Return random sample of rows."""
        # Validate parameters
        if n is None and frac is None:
            raise ValueError("Either n or frac must be provided")
        if n is not None and frac is not None:
            raise ValueError("n and frac are mutually exclusive")

        # Set random seed if provided
        if random_state is not None:
            random.seed(random_state)

        # Calculate sample size
        total_rows = len(self.data)
        if frac is not None:
            if frac > 1.0:
                raise ValueError("frac must be <= 1.0")
            sample_size = int(total_rows * frac)
        else:
            sample_size = min(n, total_rows) if n is not None else 0

        # Sample indices
        if sample_size >= total_rows:
            sampled_indices = list(range(total_rows))
            random.shuffle(sampled_indices)
        else:
            sampled_indices = random.sample(range(total_rows), sample_size)

        # Extract sampled rows
        sampled_data = [self.data[i] for i in sampled_indices]

        return self.__class__(columns=self.columns, data=sampled_data)

    def select(self, columns: list[str]) -> Self:
        """Return DataFrame with only specified columns."""
        # Validate all columns exist
        for col in columns:
            if col not in self.columns:
                raise KeyError(f"Column '{col}' not found in DataFrame")

        # Get column indices
        indices = [self.columns.index(col) for col in columns]

        # Extract data for selected columns
        new_data = [[row[i] for i in indices] for row in self.data]

        return self.__class__(columns=columns, data=new_data)

    def drop(self, columns: list[str]) -> Self:
        """Return DataFrame without specified columns."""
        # Validate all columns exist
        for col in columns:
            if col not in self.columns:
                raise KeyError(f"Column '{col}' not found in DataFrame")

        # Get columns to keep
        keep_cols = [c for c in self.columns if c not in columns]

        # Get indices for columns to keep
        indices = [self.columns.index(col) for col in keep_cols]

        # Extract data for kept columns
        new_data = [[row[i] for i in indices] for row in self.data]

        return self.__class__(columns=keep_cols, data=new_data)

    def rename(self, mapper: dict[str, str]) -> Self:
        """Return DataFrame with renamed columns."""
        # Validate all old column names exist
        for old_name in mapper:
            if old_name not in self.columns:
                raise KeyError(f"Column '{old_name}' not found in DataFrame")

        # Create new column list
        new_cols = [mapper.get(col, col) for col in self.columns]

        # Check for duplicates
        if len(new_cols) != len(set(new_cols)):
            raise ValueError("Renaming would create duplicate column names")

        return self.__class__(columns=new_cols, data=self.data)

    def rename_columns(self, mapper: dict[str, str]) -> Self:
        """Return DataFrame with renamed columns (alias for rename)."""
        return self.rename(mapper)

    def validate_structure(self) -> None:
        """Validate DataFrame structure."""
        # Check for empty column names
        for i, col in enumerate(self.columns):
            if col == "":
                raise ValueError(f"Column at index {i} is empty")

        # Check for duplicate column names
        if len(self.columns) != len(set(self.columns)):
            duplicates = [col for col in self.columns if self.columns.count(col) > 1]
            raise ValueError(f"Duplicate column names found: {set(duplicates)}")

        # Check all rows have same length as columns
        num_cols = len(self.columns)
        for i, row in enumerate(self.data):
            if len(row) != num_cols:
                raise ValueError(f"Row {i} has {len(row)} values, expected {num_cols}")

    def infer_types(self) -> dict[str, str]:
        """Infer column data types."""
        result: dict[str, str] = {}

        for col_idx, col_name in enumerate(self.columns):
            # Extract all values for this column
            values = [row[col_idx] for row in self.data]

            # Filter out None values for type checking
            non_null_values = [v for v in values if v is not None]

            if not non_null_values:
                result[col_name] = "null"
                continue

            # Check types
            types_found = set()
            for val in non_null_values:
                if isinstance(val, bool):
                    types_found.add("bool")
                elif isinstance(val, int):
                    types_found.add("int")
                elif isinstance(val, float):
                    types_found.add("float")
                elif isinstance(val, str):
                    types_found.add("str")
                else:
                    types_found.add("other")

            # Determine final type
            if len(types_found) > 1:
                # Special case: int and float can be treated as float
                if types_found == {"int", "float"}:
                    result[col_name] = "float"
                else:
                    result[col_name] = "mixed"
            elif "bool" in types_found:
                result[col_name] = "bool"
            elif "int" in types_found:
                result[col_name] = "int"
            elif "float" in types_found:
                result[col_name] = "float"
            elif "str" in types_found:
                result[col_name] = "str"
            else:
                result[col_name] = "mixed"

        return result

    def has_nulls(self) -> dict[str, bool]:
        """Check for null values in each column."""
        result: dict[str, bool] = {}

        for col_idx, col_name in enumerate(self.columns):
            # Check if any value in this column is None
            has_null = any(row[col_idx] is None for row in self.data)
            result[col_name] = has_null

        return result

    # Iteration and length

    def __len__(self) -> int:
        """Return number of rows."""
        return len(self.data)

    def __iter__(self) -> Any:
        """Iterate over rows as dictionaries."""
        for row in self.data:
            yield dict(zip(self.columns, row))

    # JSON support

    @classmethod
    def from_json(cls, json_string: str) -> Self:
        """Create DataFrame from JSON string (array of objects)."""
        records = json.loads(json_string)
        if not isinstance(records, list):
            raise ValueError("JSON must be an array of objects")
        return cls.from_records(records)

    def to_json(self, orient: Literal["records", "columns"] = "records") -> str:
        """Export DataFrame as JSON string."""
        # Map "columns" to "list" for to_dict()
        dict_orient: Literal["dict", "list", "records"] = "list" if orient == "columns" else orient
        return json.dumps(self.to_dict(orient=dict_orient))

    # Column access

    def get_column(self, column: str) -> list[Any]:
        """Get all values for a column."""
        if column not in self.columns:
            raise KeyError(f"Column '{column}' not found in DataFrame")
        idx = self.columns.index(column)
        return [row[idx] for row in self.data]

    def __getitem__(self, key: str | list[str]) -> list[Any] | Self:
        """Support df['col'] and df[['col1', 'col2']]."""
        if isinstance(key, str):
            return self.get_column(key)
        return self.select(key)

    # Analytics methods

    def unique(self, column: str) -> list[Any]:
        """Get unique values from a column (preserves order)."""
        if column not in self.columns:
            raise KeyError(f"Column '{column}' not found in DataFrame")

        col_idx = self.columns.index(column)
        seen = set()
        result = []
        for row in self.data:
            val = row[col_idx]
            if val not in seen:
                seen.add(val)
                result.append(val)
        return result

    def value_counts(self, column: str) -> dict[Any, int]:
        """Count occurrences of each unique value in column."""
        if column not in self.columns:
            raise KeyError(f"Column '{column}' not found in DataFrame")

        col_idx = self.columns.index(column)
        counts: dict[Any, int] = {}
        for row in self.data:
            val = row[col_idx]
            counts[val] = counts.get(val, 0) + 1
        return counts

    def sort(self, by: str, ascending: bool = True) -> Self:
        """Sort DataFrame by column."""
        if by not in self.columns:
            raise KeyError(f"Column '{by}' not found in DataFrame")

        col_idx = self.columns.index(by)

        # Sort with None values at the end
        def sort_key(row: list[Any]) -> tuple[int, Any]:
            val = row[col_idx]
            if val is None:
                # Use a tuple to ensure None sorts last
                return (1, None) if ascending else (0, None)
            return (0, val) if ascending else (1, val)

        sorted_data = sorted(self.data, key=sort_key, reverse=not ascending)
        return self.__class__(columns=self.columns, data=sorted_data)

    # Row filtering and transformation

    def filter(self, predicate: Any) -> Self:
        """Filter rows using a predicate function."""
        filtered_data = []
        for row in self.data:
            row_dict = dict(zip(self.columns, row))
            if predicate(row_dict):
                filtered_data.append(row)
        return self.__class__(columns=self.columns, data=filtered_data)

    def apply(self, func: Any, column: str) -> Self:
        """Apply function to column values."""
        if column not in self.columns:
            raise KeyError(f"Column '{column}' not found in DataFrame")

        col_idx = self.columns.index(column)
        new_data = []
        for row in self.data:
            new_row = row.copy()
            new_row[col_idx] = func(row[col_idx])
            new_data.append(new_row)

        return self.__class__(columns=self.columns, data=new_data)

    def add_column(self, name: str, values: list[Any]) -> Self:
        """Add new column to DataFrame."""
        if name in self.columns:
            raise ValueError(f"Column '{name}' already exists")

        if len(values) != len(self.data):
            raise ValueError(f"Values length ({len(values)}) must match row count ({len(self.data)})")

        new_columns = self.columns + [name]
        new_data = [row + [values[i]] for i, row in enumerate(self.data)]

        return self.__class__(columns=new_columns, data=new_data)

    def drop_rows(self, indices: list[int]) -> Self:
        """Drop rows by index."""
        indices_set = set(indices)
        new_data = [row for i, row in enumerate(self.data) if i not in indices_set]
        return self.__class__(columns=self.columns, data=new_data)

    def drop_duplicates(self, subset: list[str] | None = None) -> Self:
        """Remove duplicate rows."""
        # Validate subset columns
        if subset is not None:
            for col in subset:
                if col not in self.columns:
                    raise KeyError(f"Column '{col}' not found in DataFrame")
            col_indices = [self.columns.index(col) for col in subset]
        else:
            col_indices = list(range(len(self.columns)))

        # Track seen values
        seen = set()
        new_data = []

        for row in self.data:
            # Create tuple of relevant column values
            key = tuple(row[i] for i in col_indices)

            if key not in seen:
                seen.add(key)
                new_data.append(row)

        return self.__class__(columns=self.columns, data=new_data)

    def fillna(self, value: Any | dict[str, Any]) -> Self:
        """Replace None values."""
        if isinstance(value, dict):
            # Validate column names
            for col in value:
                if col not in self.columns:
                    raise KeyError(f"Column '{col}' not found in DataFrame")

            # Create mapping of column index to fill value
            fill_map = {self.columns.index(col): val for col, val in value.items()}

            # Fill values
            new_data = []
            for row in self.data:
                new_row = [fill_map[i] if i in fill_map and val is None else val for i, val in enumerate(row)]
                new_data.append(new_row)
        else:
            # Single fill value for all None
            new_data = [[value if val is None else val for val in row] for row in self.data]

        return self.__class__(columns=self.columns, data=new_data)

    def concat(self, other: Self) -> Self:
        """Concatenate DataFrames vertically (stack rows)."""
        if self.columns != other.columns:
            raise ValueError(f"Column mismatch: {self.columns} != {other.columns}")

        combined_data = self.data + other.data
        return self.__class__(columns=self.columns, data=combined_data)

    def melt(
        self,
        id_vars: list[str] | None = None,
        value_vars: list[str] | None = None,
        var_name: str = "variable",
        value_name: str = "value",
    ) -> Self:
        """Unpivot DataFrame from wide to long format."""
        # Handle empty DataFrame
        if not self.columns or not self.data:
            return self.__class__(columns=[var_name, value_name], data=[])

        # Default id_vars to empty list if not specified
        if id_vars is None:
            id_vars = []

        # Validate id_vars exist
        for col in id_vars:
            if col not in self.columns:
                raise KeyError(f"Column '{col}' not found in DataFrame")

        # Default value_vars to all non-id columns
        if value_vars is None:
            value_vars = [col for col in self.columns if col not in id_vars]
        else:
            # Validate value_vars exist
            for col in value_vars:
                if col not in self.columns:
                    raise KeyError(f"Column '{col}' not found in DataFrame")

        # If no value_vars to melt, return empty result
        if not value_vars:
            # Return just id columns if all columns are id_vars
            if id_vars:
                return self.select(id_vars)
            return self.__class__(columns=[var_name, value_name], data=[])

        # Check for column name conflicts
        new_columns = id_vars + [var_name, value_name]
        if len(new_columns) != len(set(new_columns)):
            raise ValueError(
                f"Duplicate column names in result: {new_columns}. "
                f"Choose different var_name or value_name to avoid conflicts."
            )

        # Get indices for id and value columns
        id_indices = [self.columns.index(col) for col in id_vars]
        value_indices = [(self.columns.index(col), col) for col in value_vars]

        # Build melted data
        melted_data: list[list[Any]] = []

        for row in self.data:
            # Extract id values for this row
            id_values = [row[idx] for idx in id_indices]

            # Create one new row for each value_var
            for val_idx, var_col_name in value_indices:
                new_row = id_values + [var_col_name, row[val_idx]]
                melted_data.append(new_row)

        return self.__class__(columns=new_columns, data=melted_data)

    def pivot(self, index: str, columns: str, values: str) -> Self:
        """Pivot DataFrame from long to wide format."""
        # Validate columns exist
        for col_name, param in [(index, "index"), (columns, "columns"), (values, "values")]:
            if col_name not in self.columns:
                raise KeyError(f"Column '{col_name}' not found in DataFrame (parameter: {param})")

        # Get column indices
        index_idx = self.columns.index(index)
        columns_idx = self.columns.index(columns)
        values_idx = self.columns.index(values)

        # Build pivot structure: dict[index_value, dict[column_value, value]]
        pivot_dict: dict[Any, dict[Any, Any]] = {}
        column_values_set: set[Any] = set()

        for row in self.data:
            idx_val = row[index_idx]
            col_val = row[columns_idx]
            val = row[values_idx]

            # Track column values for final column list
            column_values_set.add(col_val)

            # Initialize nested dict if needed
            if idx_val not in pivot_dict:
                pivot_dict[idx_val] = {}

            # Check for duplicates
            if col_val in pivot_dict[idx_val]:
                raise ValueError(
                    f"Duplicate entries found for index='{idx_val}' and columns='{col_val}'. "
                    f"Cannot reshape with duplicate index/column combinations. "
                    f"Consider using aggregation or removing duplicates first."
                )

            pivot_dict[idx_val] = {**pivot_dict[idx_val], col_val: val}

        # Sort column values for consistent ordering
        column_values = sorted(column_values_set, key=lambda x: (x is None, x))

        # Build result columns: [index_column, col1, col2, ...]
        result_columns = [index] + column_values

        # Build result data
        result_data: list[list[Any]] = []
        for idx_val in sorted(pivot_dict.keys(), key=lambda x: (x is None, x)):
            row_dict = pivot_dict[idx_val]
            # Build row: [index_value, value_for_col1, value_for_col2, ...]
            row = [idx_val] + [row_dict.get(col_val, None) for col_val in column_values]
            result_data.append(row)

        return self.__class__(columns=result_columns, data=result_data)

    def merge(
        self,
        other: Self,
        on: str | list[str] | None = None,
        how: Literal["inner", "left", "right", "outer"] = "inner",
        left_on: str | list[str] | None = None,
        right_on: str | list[str] | None = None,
        suffixes: tuple[str, str] = ("_x", "_y"),
    ) -> Self:
        """Merge DataFrames using database-style join."""
        # Determine join keys
        if on is not None:
            if left_on is not None or right_on is not None:
                raise ValueError("Cannot specify both 'on' and 'left_on'/'right_on'")
            left_keys = [on] if isinstance(on, str) else on
            right_keys = left_keys
        elif left_on is not None and right_on is not None:
            left_keys = [left_on] if isinstance(left_on, str) else left_on
            right_keys = [right_on] if isinstance(right_on, str) else right_on
            if len(left_keys) != len(right_keys):
                raise ValueError("left_on and right_on must have same length")
        else:
            raise ValueError("Must specify either 'on' or both 'left_on' and 'right_on'")

        # Validate join keys exist
        for key in left_keys:
            if key not in self.columns:
                raise KeyError(f"Join key '{key}' not found in left DataFrame")
        for key in right_keys:
            if key not in other.columns:
                raise KeyError(f"Join key '{key}' not found in right DataFrame")

        # Get indices for join keys
        left_key_indices = [self.columns.index(k) for k in left_keys]
        right_key_indices = [other.columns.index(k) for k in right_keys]

        # Build lookup dict for right DataFrame: key_tuple -> list[row_indices]
        right_lookup: dict[tuple[Any, ...], list[int]] = {}
        for row_idx, row in enumerate(other.data):
            key_tuple = tuple(row[idx] for idx in right_key_indices)
            if key_tuple not in right_lookup:
                right_lookup[key_tuple] = []
            right_lookup[key_tuple].append(row_idx)

        # Determine result columns
        left_suffix, right_suffix = suffixes

        # Start with left DataFrame columns
        result_columns = self.columns.copy()

        # Add right DataFrame columns (excluding join keys if using 'on')
        for col in other.columns:
            if on is not None and col in left_keys:
                # Skip join key columns from right when using 'on'
                continue

            if col in result_columns:
                # Handle collision with suffix
                result_columns.append(f"{col}{right_suffix}")
                # Also need to rename left column
                left_col_idx = result_columns.index(col)
                result_columns[left_col_idx] = f"{col}{left_suffix}"
            else:
                result_columns.append(col)

        # Get indices of right columns to include
        right_col_indices = []
        for col in other.columns:
            if on is not None and col in right_keys:
                continue
            right_col_indices.append(other.columns.index(col))

        # Perform join
        result_data: list[list[Any]] = []
        matched_right_indices: set[int] = set()

        for left_row in self.data:
            # Extract key from left row
            left_key_tuple = tuple(left_row[idx] for idx in left_key_indices)

            # Find matching rows in right DataFrame
            right_matches = right_lookup.get(left_key_tuple, [])

            if right_matches:
                # Join matched rows
                for right_idx in right_matches:
                    matched_right_indices.add(right_idx)
                    right_row = other.data[right_idx]

                    # Build result row: left columns + right columns (excluding join keys)
                    result_row = left_row.copy()
                    for col_idx in right_col_indices:
                        result_row.append(right_row[col_idx])

                    result_data.append(result_row)
            else:
                # No match
                if how in ("left", "outer"):
                    # Include left row with None for right columns
                    result_row = left_row.copy()
                    result_row.extend([None] * len(right_col_indices))
                    result_data.append(result_row)

        # Handle right/outer joins - add unmatched right rows
        if how in ("right", "outer"):
            for right_idx, right_row in enumerate(other.data):
                if right_idx not in matched_right_indices:
                    # Build row with None for left columns
                    result_row = [None] * len(self.columns)

                    # Fill in join key values if using 'on'
                    if on is not None:
                        for left_idx, right_idx_key in zip(left_key_indices, right_key_indices):
                            result_row[left_idx] = right_row[right_idx_key]

                    # Add right columns
                    for col_idx in right_col_indices:
                        result_row.append(right_row[col_idx])

                    result_data.append(result_row)

        return self.__class__(columns=result_columns, data=result_data)

    def transpose(self) -> Self:
        """Transpose DataFrame by swapping rows and columns."""
        if not self.data:
            # Empty DataFrame - return with swapped structure
            return self.__class__(columns=[], data=[])

        # First column becomes the new column names
        # Remaining columns become data rows
        if not self.columns:
            return self.__class__(columns=[], data=[])

        # Extract first column values as new column names
        # Convert to strings to ensure valid column names
        new_columns = [str(row[0]) for row in self.data]

        # Transpose the remaining columns
        num_original_cols = len(self.columns)
        if num_original_cols == 1:
            # Only one column (the index) - result is just column names as rows
            single_col_data = [[col] for col in self.columns]
            return self.__class__(columns=new_columns if new_columns else ["0"], data=single_col_data)

        # Build transposed data
        # Each original column (except first) becomes a row
        # Each original row becomes a column
        result_data: list[list[Any]] = []

        for col_idx in range(1, num_original_cols):
            # Original column name becomes first value in new row
            row = [self.columns[col_idx]]
            # Add values from each original row for this column
            for orig_row in self.data:
                row.append(orig_row[col_idx])
            result_data.append(row)

        # New columns: first is placeholder for original column names, rest are from first column
        result_columns = ["index"] + new_columns

        return self.__class__(columns=result_columns, data=result_data)

    # Statistical methods

    def describe(self) -> Self:
        """Generate statistical summary for numeric columns."""
        import statistics

        stats_rows: list[list[Any]] = []
        stat_names = ["count", "mean", "std", "min", "25%", "50%", "75%", "max"]

        for col_idx in range(len(self.columns)):
            # Extract numeric values (filter None and non-numeric)
            values = []
            for row in self.data:
                val = row[col_idx]
                if val is not None and isinstance(val, (int, float)) and not isinstance(val, bool):
                    values.append(float(val))

            if not values:
                # Non-numeric column - fill with None
                stats_rows.append([None] * len(stat_names))
                continue

            # Calculate statistics
            count = len(values)
            mean = statistics.mean(values)
            std = statistics.stdev(values) if count > 1 else 0.0
            min_val = min(values)
            max_val = max(values)

            # Quantiles
            sorted_vals = sorted(values)
            try:
                q25 = statistics.quantiles(sorted_vals, n=4)[0] if count > 1 else sorted_vals[0]
                q50 = statistics.median(sorted_vals)
                q75 = statistics.quantiles(sorted_vals, n=4)[2] if count > 1 else sorted_vals[0]
            except statistics.StatisticsError:
                q25 = q50 = q75 = sorted_vals[0] if sorted_vals else 0.0

            stats_rows.append([count, mean, std, min_val, q25, q50, q75, max_val])

        # Transpose to make stats the rows and columns the columns
        transposed_data = [
            [stats_rows[col_idx][stat_idx] for col_idx in range(len(self.columns))]
            for stat_idx in range(len(stat_names))
        ]

        return self.__class__(columns=self.columns, data=transposed_data).add_column("stat", stat_names)

    def groupby(self, by: str) -> "GroupBy":
        """Group DataFrame by column values."""
        if by not in self.columns:
            raise KeyError(f"Column '{by}' not found in DataFrame")

        return GroupBy(self, by)

    # Utility methods

    def equals(self, other: Any) -> bool:
        """Check if two DataFrames are identical."""
        if not isinstance(other, DataFrame):
            return False
        return self.columns == other.columns and self.data == other.data

    def deepcopy(self) -> Self:
        """Create a deep copy of the DataFrame."""
        import copy

        return self.__class__(columns=self.columns.copy(), data=copy.deepcopy(self.data))

    def isna(self) -> Self:
        """Return DataFrame of booleans showing None locations."""
        null_data = [[val is None for val in row] for row in self.data]
        return self.__class__(columns=self.columns, data=null_data)

    def notna(self) -> Self:
        """Return DataFrame of booleans showing non-None locations."""
        not_null_data = [[val is not None for val in row] for row in self.data]
        return self.__class__(columns=self.columns, data=not_null_data)

    def dropna(self, axis: Literal[0, 1] = 0, how: Literal["any", "all"] = "any") -> Self:
        """Drop rows or columns with None values."""
        if axis == 0:
            # Drop rows
            if how == "any":
                # Drop rows with any None
                new_data = [row for row in self.data if not any(val is None for val in row)]
            else:
                # Drop rows with all None
                new_data = [row for row in self.data if not all(val is None for val in row)]
            return self.__class__(columns=self.columns, data=new_data)
        else:
            # Drop columns (axis=1)
            cols_to_keep = []
            indices_to_keep = []

            for col_idx, col_name in enumerate(self.columns):
                col_values = [row[col_idx] for row in self.data]

                if how == "any":
                    # Keep column if no None values
                    if not any(val is None for val in col_values):
                        cols_to_keep.append(col_name)
                        indices_to_keep.append(col_idx)
                else:
                    # Keep column if not all None
                    if not all(val is None for val in col_values):
                        cols_to_keep.append(col_name)
                        indices_to_keep.append(col_idx)

            # Extract data for kept columns
            new_data = [[row[i] for i in indices_to_keep] for row in self.data]
            return self.__class__(columns=cols_to_keep, data=new_data)

    def nunique(self, column: str) -> int:
        """Count number of unique values in column."""
        if column not in self.columns:
            raise KeyError(f"Column '{column}' not found in DataFrame")

        col_idx = self.columns.index(column)
        unique_values = set()
        for row in self.data:
            val = row[col_idx]
            # Count None as a unique value
            unique_values.add(val)
        return len(unique_values)

Attributes

shape property

Return tuple representing dimensionality of the DataFrame.

empty property

Indicator whether DataFrame is empty.

size property

Return int representing number of elements in this object.

ndim property

Return int representing number of axes/array dimensions.

Functions

from_pandas(df) classmethod

Create schema from pandas DataFrame.

Source code in src/chapkit/data/dataframe.py
@classmethod
def from_pandas(cls, df: Any) -> Self:
    """Create schema from pandas DataFrame."""
    try:
        import pandas as pd
    except ImportError:
        raise ImportError("pandas is required for from_pandas(). Install with: uv add pandas") from None

    if not isinstance(df, pd.DataFrame):
        raise TypeError(f"Expected pandas DataFrame, got {type(df)}")

    return cls(
        columns=df.columns.tolist(),
        data=df.values.tolist(),
    )

from_polars(df) classmethod

Create schema from Polars DataFrame.

Source code in src/chapkit/data/dataframe.py
@classmethod
def from_polars(cls, df: Any) -> Self:
    """Create schema from Polars DataFrame."""
    try:
        import polars as pl
    except ImportError:
        raise ImportError("polars is required for from_polars(). Install with: uv add polars") from None

    if not isinstance(df, pl.DataFrame):
        raise TypeError(f"Expected Polars DataFrame, got {type(df)}")

    return cls(
        columns=df.columns,
        data=[list(row) for row in df.rows()],
    )

from_xarray(da) classmethod

Create schema from xarray DataArray (2D only).

Source code in src/chapkit/data/dataframe.py
@classmethod
def from_xarray(cls, da: Any) -> Self:
    """Create schema from xarray DataArray (2D only)."""
    try:
        import xarray as xr
    except ImportError:
        raise ImportError("xarray is required for from_xarray(). Install with: uv add xarray") from None

    if not isinstance(da, xr.DataArray):
        raise TypeError(f"Expected xarray DataArray, got {type(da)}")

    if len(da.dims) != 2:
        raise ValueError(f"Only 2D DataArrays supported, got {len(da.dims)} dimensions")

    # Convert to pandas then use from_pandas
    pdf = da.to_pandas()
    return cls.from_pandas(pdf)

from_dict(data) classmethod

Create schema from dictionary of columns.

Source code in src/chapkit/data/dataframe.py
@classmethod
def from_dict(cls, data: dict[str, list[Any]]) -> Self:
    """Create schema from dictionary of columns."""
    if not data:
        return cls(columns=[], data=[])

    columns = list(data.keys())
    num_rows = len(next(iter(data.values())))

    if not all(len(vals) == num_rows for vals in data.values()):
        raise ValueError("All columns must have the same length")

    rows = [[data[col][i] for col in columns] for i in range(num_rows)]

    return cls(columns=columns, data=rows)

from_records(records) classmethod

Create schema from list of records (row-oriented).

Source code in src/chapkit/data/dataframe.py
@classmethod
def from_records(cls, records: list[dict[str, Any]]) -> Self:
    """Create schema from list of records (row-oriented)."""
    if not records:
        return cls(columns=[], data=[])

    columns = list(records[0].keys())
    data = [[record[col] for col in columns] for record in records]

    return cls(columns=columns, data=data)

from_csv(path=None, *, csv_string=None, delimiter=',', has_header=True, encoding='utf-8') classmethod

Create DataFrame from CSV file or string.

Source code in src/chapkit/data/dataframe.py
@classmethod
def from_csv(
    cls,
    path: str | Path | None = None,
    *,
    csv_string: str | None = None,
    delimiter: str = ",",
    has_header: bool = True,
    encoding: str = "utf-8",
) -> Self:
    """Create DataFrame from CSV file or string."""
    # Validate mutually exclusive parameters
    if path is None and csv_string is None:
        raise ValueError("Either path or csv_string must be provided")
    if path is not None and csv_string is not None:
        raise ValueError("path and csv_string are mutually exclusive")

    # Read CSV data
    if path is not None:
        path_obj = Path(path)
        if not path_obj.exists():
            raise FileNotFoundError(f"File not found: {path}")
        with path_obj.open("r", encoding=encoding, newline="") as f:
            reader = csv.reader(f, delimiter=delimiter)
            rows = list(reader)
    else:
        # csv_string is not None
        string_io = io.StringIO(csv_string)
        reader = csv.reader(string_io, delimiter=delimiter)
        rows = list(reader)

    # Handle empty CSV
    if not rows:
        return cls(columns=[], data=[])

    # Extract columns and data
    if has_header:
        columns = rows[0]
        data = rows[1:]
    else:
        # Generate column names
        num_cols = len(rows[0]) if rows else 0
        columns = [f"col_{i}" for i in range(num_cols)]
        data = rows

    return cls(columns=columns, data=data)

to_pandas()

Convert schema to pandas DataFrame.

Source code in src/chapkit/data/dataframe.py
def to_pandas(self) -> Any:
    """Convert schema to pandas DataFrame."""
    try:
        import pandas as pd
    except ImportError:
        raise ImportError("pandas is required for to_pandas(). Install with: uv add pandas") from None

    return pd.DataFrame(self.data, columns=self.columns)

to_polars()

Convert schema to Polars DataFrame.

Source code in src/chapkit/data/dataframe.py
def to_polars(self) -> Any:
    """Convert schema to Polars DataFrame."""
    try:
        import polars as pl
    except ImportError:
        raise ImportError("polars is required for to_polars(). Install with: uv add polars") from None

    return pl.DataFrame(self.data, schema=self.columns, orient="row")

to_dict(orient='dict')

Convert schema to dictionary with specified orient (dict, list, or records).

Source code in src/chapkit/data/dataframe.py
def to_dict(self, orient: Literal["dict", "list", "records"] = "dict") -> Any:
    """Convert schema to dictionary with specified orient (dict, list, or records)."""
    if orient == "dict":
        return {col: {i: self.data[i][j] for i in range(len(self.data))} for j, col in enumerate(self.columns)}
    elif orient == "list":
        return {col: [row[j] for row in self.data] for j, col in enumerate(self.columns)}
    elif orient == "records":
        return [{col: row[j] for j, col in enumerate(self.columns)} for row in self.data]
    else:
        raise ValueError(f"Invalid orient: {orient}")

to_csv(path=None, *, delimiter=',', include_header=True, encoding='utf-8')

Export DataFrame to CSV file or string.

Source code in src/chapkit/data/dataframe.py
def to_csv(
    self,
    path: str | Path | None = None,
    *,
    delimiter: str = ",",
    include_header: bool = True,
    encoding: str = "utf-8",
) -> str | None:
    """Export DataFrame to CSV file or string."""
    # Write to string buffer or file
    if path is None:
        # Return as string
        output = io.StringIO()
        writer = csv.writer(output, delimiter=delimiter)

        if include_header:
            writer.writerow(self.columns)

        writer.writerows(self.data)

        return output.getvalue()
    else:
        # Write to file
        path_obj = Path(path)
        with path_obj.open("w", encoding=encoding, newline="") as f:
            writer = csv.writer(f, delimiter=delimiter)

            if include_header:
                writer.writerow(self.columns)

            writer.writerows(self.data)

        return None

head(n=5)

Return first n rows.

Source code in src/chapkit/data/dataframe.py
def head(self, n: int = 5) -> Self:
    """Return first n rows."""
    if n >= 0:
        selected_data = self.data[:n]
    else:
        selected_data = self.data[:n] if n != 0 else self.data
    return self.__class__(columns=self.columns, data=selected_data)

tail(n=5)

Return last n rows.

Source code in src/chapkit/data/dataframe.py
def tail(self, n: int = 5) -> Self:
    """Return last n rows."""
    if n >= 0:
        selected_data = self.data[-n:] if n > 0 else []
    else:
        selected_data = self.data[abs(n) :]
    return self.__class__(columns=self.columns, data=selected_data)

sample(n=None, frac=None, *, random_state=None)

Return random sample of rows.

Source code in src/chapkit/data/dataframe.py
def sample(
    self,
    n: int | None = None,
    frac: float | None = None,
    *,
    random_state: int | None = None,
) -> Self:
    """Return random sample of rows."""
    # Validate parameters
    if n is None and frac is None:
        raise ValueError("Either n or frac must be provided")
    if n is not None and frac is not None:
        raise ValueError("n and frac are mutually exclusive")

    # Set random seed if provided
    if random_state is not None:
        random.seed(random_state)

    # Calculate sample size
    total_rows = len(self.data)
    if frac is not None:
        if frac > 1.0:
            raise ValueError("frac must be <= 1.0")
        sample_size = int(total_rows * frac)
    else:
        sample_size = min(n, total_rows) if n is not None else 0

    # Sample indices
    if sample_size >= total_rows:
        sampled_indices = list(range(total_rows))
        random.shuffle(sampled_indices)
    else:
        sampled_indices = random.sample(range(total_rows), sample_size)

    # Extract sampled rows
    sampled_data = [self.data[i] for i in sampled_indices]

    return self.__class__(columns=self.columns, data=sampled_data)

select(columns)

Return DataFrame with only specified columns.

Source code in src/chapkit/data/dataframe.py
def select(self, columns: list[str]) -> Self:
    """Return DataFrame with only specified columns."""
    # Validate all columns exist
    for col in columns:
        if col not in self.columns:
            raise KeyError(f"Column '{col}' not found in DataFrame")

    # Get column indices
    indices = [self.columns.index(col) for col in columns]

    # Extract data for selected columns
    new_data = [[row[i] for i in indices] for row in self.data]

    return self.__class__(columns=columns, data=new_data)

drop(columns)

Return DataFrame without specified columns.

Source code in src/chapkit/data/dataframe.py
def drop(self, columns: list[str]) -> Self:
    """Return DataFrame without specified columns."""
    # Validate all columns exist
    for col in columns:
        if col not in self.columns:
            raise KeyError(f"Column '{col}' not found in DataFrame")

    # Get columns to keep
    keep_cols = [c for c in self.columns if c not in columns]

    # Get indices for columns to keep
    indices = [self.columns.index(col) for col in keep_cols]

    # Extract data for kept columns
    new_data = [[row[i] for i in indices] for row in self.data]

    return self.__class__(columns=keep_cols, data=new_data)

rename(mapper)

Return DataFrame with renamed columns.

Source code in src/chapkit/data/dataframe.py
def rename(self, mapper: dict[str, str]) -> Self:
    """Return DataFrame with renamed columns."""
    # Validate all old column names exist
    for old_name in mapper:
        if old_name not in self.columns:
            raise KeyError(f"Column '{old_name}' not found in DataFrame")

    # Create new column list
    new_cols = [mapper.get(col, col) for col in self.columns]

    # Check for duplicates
    if len(new_cols) != len(set(new_cols)):
        raise ValueError("Renaming would create duplicate column names")

    return self.__class__(columns=new_cols, data=self.data)

rename_columns(mapper)

Return DataFrame with renamed columns (alias for rename).

Source code in src/chapkit/data/dataframe.py
def rename_columns(self, mapper: dict[str, str]) -> Self:
    """Return DataFrame with renamed columns (alias for rename)."""
    return self.rename(mapper)

validate_structure()

Validate DataFrame structure.

Source code in src/chapkit/data/dataframe.py
def validate_structure(self) -> None:
    """Validate DataFrame structure."""
    # Check for empty column names
    for i, col in enumerate(self.columns):
        if col == "":
            raise ValueError(f"Column at index {i} is empty")

    # Check for duplicate column names
    if len(self.columns) != len(set(self.columns)):
        duplicates = [col for col in self.columns if self.columns.count(col) > 1]
        raise ValueError(f"Duplicate column names found: {set(duplicates)}")

    # Check all rows have same length as columns
    num_cols = len(self.columns)
    for i, row in enumerate(self.data):
        if len(row) != num_cols:
            raise ValueError(f"Row {i} has {len(row)} values, expected {num_cols}")

infer_types()

Infer column data types.

Source code in src/chapkit/data/dataframe.py
def infer_types(self) -> dict[str, str]:
    """Infer column data types."""
    result: dict[str, str] = {}

    for col_idx, col_name in enumerate(self.columns):
        # Extract all values for this column
        values = [row[col_idx] for row in self.data]

        # Filter out None values for type checking
        non_null_values = [v for v in values if v is not None]

        if not non_null_values:
            result[col_name] = "null"
            continue

        # Check types
        types_found = set()
        for val in non_null_values:
            if isinstance(val, bool):
                types_found.add("bool")
            elif isinstance(val, int):
                types_found.add("int")
            elif isinstance(val, float):
                types_found.add("float")
            elif isinstance(val, str):
                types_found.add("str")
            else:
                types_found.add("other")

        # Determine final type
        if len(types_found) > 1:
            # Special case: int and float can be treated as float
            if types_found == {"int", "float"}:
                result[col_name] = "float"
            else:
                result[col_name] = "mixed"
        elif "bool" in types_found:
            result[col_name] = "bool"
        elif "int" in types_found:
            result[col_name] = "int"
        elif "float" in types_found:
            result[col_name] = "float"
        elif "str" in types_found:
            result[col_name] = "str"
        else:
            result[col_name] = "mixed"

    return result

has_nulls()

Check for null values in each column.

Source code in src/chapkit/data/dataframe.py
def has_nulls(self) -> dict[str, bool]:
    """Check for null values in each column."""
    result: dict[str, bool] = {}

    for col_idx, col_name in enumerate(self.columns):
        # Check if any value in this column is None
        has_null = any(row[col_idx] is None for row in self.data)
        result[col_name] = has_null

    return result

__len__()

Return number of rows.

Source code in src/chapkit/data/dataframe.py
def __len__(self) -> int:
    """Return number of rows."""
    return len(self.data)

__iter__()

Iterate over rows as dictionaries.

Source code in src/chapkit/data/dataframe.py
def __iter__(self) -> Any:
    """Iterate over rows as dictionaries."""
    for row in self.data:
        yield dict(zip(self.columns, row))

from_json(json_string) classmethod

Create DataFrame from JSON string (array of objects).

Source code in src/chapkit/data/dataframe.py
@classmethod
def from_json(cls, json_string: str) -> Self:
    """Create DataFrame from JSON string (array of objects)."""
    records = json.loads(json_string)
    if not isinstance(records, list):
        raise ValueError("JSON must be an array of objects")
    return cls.from_records(records)

to_json(orient='records')

Export DataFrame as JSON string.

Source code in src/chapkit/data/dataframe.py
def to_json(self, orient: Literal["records", "columns"] = "records") -> str:
    """Export DataFrame as JSON string."""
    # Map "columns" to "list" for to_dict()
    dict_orient: Literal["dict", "list", "records"] = "list" if orient == "columns" else orient
    return json.dumps(self.to_dict(orient=dict_orient))

get_column(column)

Get all values for a column.

Source code in src/chapkit/data/dataframe.py
def get_column(self, column: str) -> list[Any]:
    """Get all values for a column."""
    if column not in self.columns:
        raise KeyError(f"Column '{column}' not found in DataFrame")
    idx = self.columns.index(column)
    return [row[idx] for row in self.data]

__getitem__(key)

Support df['col'] and df[['col1', 'col2']].

Source code in src/chapkit/data/dataframe.py
def __getitem__(self, key: str | list[str]) -> list[Any] | Self:
    """Support df['col'] and df[['col1', 'col2']]."""
    if isinstance(key, str):
        return self.get_column(key)
    return self.select(key)

unique(column)

Get unique values from a column (preserves order).

Source code in src/chapkit/data/dataframe.py
def unique(self, column: str) -> list[Any]:
    """Get unique values from a column (preserves order)."""
    if column not in self.columns:
        raise KeyError(f"Column '{column}' not found in DataFrame")

    col_idx = self.columns.index(column)
    seen = set()
    result = []
    for row in self.data:
        val = row[col_idx]
        if val not in seen:
            seen.add(val)
            result.append(val)
    return result

value_counts(column)

Count occurrences of each unique value in column.

Source code in src/chapkit/data/dataframe.py
def value_counts(self, column: str) -> dict[Any, int]:
    """Count occurrences of each unique value in column."""
    if column not in self.columns:
        raise KeyError(f"Column '{column}' not found in DataFrame")

    col_idx = self.columns.index(column)
    counts: dict[Any, int] = {}
    for row in self.data:
        val = row[col_idx]
        counts[val] = counts.get(val, 0) + 1
    return counts

sort(by, ascending=True)

Sort DataFrame by column.

Source code in src/chapkit/data/dataframe.py
def sort(self, by: str, ascending: bool = True) -> Self:
    """Sort DataFrame by column."""
    if by not in self.columns:
        raise KeyError(f"Column '{by}' not found in DataFrame")

    col_idx = self.columns.index(by)

    # Sort with None values at the end
    def sort_key(row: list[Any]) -> tuple[int, Any]:
        val = row[col_idx]
        if val is None:
            # Use a tuple to ensure None sorts last
            return (1, None) if ascending else (0, None)
        return (0, val) if ascending else (1, val)

    sorted_data = sorted(self.data, key=sort_key, reverse=not ascending)
    return self.__class__(columns=self.columns, data=sorted_data)

filter(predicate)

Filter rows using a predicate function.

Source code in src/chapkit/data/dataframe.py
def filter(self, predicate: Any) -> Self:
    """Filter rows using a predicate function."""
    filtered_data = []
    for row in self.data:
        row_dict = dict(zip(self.columns, row))
        if predicate(row_dict):
            filtered_data.append(row)
    return self.__class__(columns=self.columns, data=filtered_data)

apply(func, column)

Apply function to column values.

Source code in src/chapkit/data/dataframe.py
def apply(self, func: Any, column: str) -> Self:
    """Apply function to column values."""
    if column not in self.columns:
        raise KeyError(f"Column '{column}' not found in DataFrame")

    col_idx = self.columns.index(column)
    new_data = []
    for row in self.data:
        new_row = row.copy()
        new_row[col_idx] = func(row[col_idx])
        new_data.append(new_row)

    return self.__class__(columns=self.columns, data=new_data)

add_column(name, values)

Add new column to DataFrame.

Source code in src/chapkit/data/dataframe.py
def add_column(self, name: str, values: list[Any]) -> Self:
    """Add new column to DataFrame."""
    if name in self.columns:
        raise ValueError(f"Column '{name}' already exists")

    if len(values) != len(self.data):
        raise ValueError(f"Values length ({len(values)}) must match row count ({len(self.data)})")

    new_columns = self.columns + [name]
    new_data = [row + [values[i]] for i, row in enumerate(self.data)]

    return self.__class__(columns=new_columns, data=new_data)

drop_rows(indices)

Drop rows by index.

Source code in src/chapkit/data/dataframe.py
def drop_rows(self, indices: list[int]) -> Self:
    """Drop rows by index."""
    indices_set = set(indices)
    new_data = [row for i, row in enumerate(self.data) if i not in indices_set]
    return self.__class__(columns=self.columns, data=new_data)

drop_duplicates(subset=None)

Remove duplicate rows.

Source code in src/chapkit/data/dataframe.py
def drop_duplicates(self, subset: list[str] | None = None) -> Self:
    """Remove duplicate rows."""
    # Validate subset columns
    if subset is not None:
        for col in subset:
            if col not in self.columns:
                raise KeyError(f"Column '{col}' not found in DataFrame")
        col_indices = [self.columns.index(col) for col in subset]
    else:
        col_indices = list(range(len(self.columns)))

    # Track seen values
    seen = set()
    new_data = []

    for row in self.data:
        # Create tuple of relevant column values
        key = tuple(row[i] for i in col_indices)

        if key not in seen:
            seen.add(key)
            new_data.append(row)

    return self.__class__(columns=self.columns, data=new_data)

fillna(value)

Replace None values.

Source code in src/chapkit/data/dataframe.py
def fillna(self, value: Any | dict[str, Any]) -> Self:
    """Replace None values."""
    if isinstance(value, dict):
        # Validate column names
        for col in value:
            if col not in self.columns:
                raise KeyError(f"Column '{col}' not found in DataFrame")

        # Create mapping of column index to fill value
        fill_map = {self.columns.index(col): val for col, val in value.items()}

        # Fill values
        new_data = []
        for row in self.data:
            new_row = [fill_map[i] if i in fill_map and val is None else val for i, val in enumerate(row)]
            new_data.append(new_row)
    else:
        # Single fill value for all None
        new_data = [[value if val is None else val for val in row] for row in self.data]

    return self.__class__(columns=self.columns, data=new_data)

concat(other)

Concatenate DataFrames vertically (stack rows).

Source code in src/chapkit/data/dataframe.py
def concat(self, other: Self) -> Self:
    """Concatenate DataFrames vertically (stack rows)."""
    if self.columns != other.columns:
        raise ValueError(f"Column mismatch: {self.columns} != {other.columns}")

    combined_data = self.data + other.data
    return self.__class__(columns=self.columns, data=combined_data)

melt(id_vars=None, value_vars=None, var_name='variable', value_name='value')

Unpivot DataFrame from wide to long format.

Source code in src/chapkit/data/dataframe.py
def melt(
    self,
    id_vars: list[str] | None = None,
    value_vars: list[str] | None = None,
    var_name: str = "variable",
    value_name: str = "value",
) -> Self:
    """Unpivot DataFrame from wide to long format."""
    # Handle empty DataFrame
    if not self.columns or not self.data:
        return self.__class__(columns=[var_name, value_name], data=[])

    # Default id_vars to empty list if not specified
    if id_vars is None:
        id_vars = []

    # Validate id_vars exist
    for col in id_vars:
        if col not in self.columns:
            raise KeyError(f"Column '{col}' not found in DataFrame")

    # Default value_vars to all non-id columns
    if value_vars is None:
        value_vars = [col for col in self.columns if col not in id_vars]
    else:
        # Validate value_vars exist
        for col in value_vars:
            if col not in self.columns:
                raise KeyError(f"Column '{col}' not found in DataFrame")

    # If no value_vars to melt, return empty result
    if not value_vars:
        # Return just id columns if all columns are id_vars
        if id_vars:
            return self.select(id_vars)
        return self.__class__(columns=[var_name, value_name], data=[])

    # Check for column name conflicts
    new_columns = id_vars + [var_name, value_name]
    if len(new_columns) != len(set(new_columns)):
        raise ValueError(
            f"Duplicate column names in result: {new_columns}. "
            f"Choose different var_name or value_name to avoid conflicts."
        )

    # Get indices for id and value columns
    id_indices = [self.columns.index(col) for col in id_vars]
    value_indices = [(self.columns.index(col), col) for col in value_vars]

    # Build melted data
    melted_data: list[list[Any]] = []

    for row in self.data:
        # Extract id values for this row
        id_values = [row[idx] for idx in id_indices]

        # Create one new row for each value_var
        for val_idx, var_col_name in value_indices:
            new_row = id_values + [var_col_name, row[val_idx]]
            melted_data.append(new_row)

    return self.__class__(columns=new_columns, data=melted_data)

pivot(index, columns, values)

Pivot DataFrame from long to wide format.

Source code in src/chapkit/data/dataframe.py
def pivot(self, index: str, columns: str, values: str) -> Self:
    """Pivot DataFrame from long to wide format."""
    # Validate columns exist
    for col_name, param in [(index, "index"), (columns, "columns"), (values, "values")]:
        if col_name not in self.columns:
            raise KeyError(f"Column '{col_name}' not found in DataFrame (parameter: {param})")

    # Get column indices
    index_idx = self.columns.index(index)
    columns_idx = self.columns.index(columns)
    values_idx = self.columns.index(values)

    # Build pivot structure: dict[index_value, dict[column_value, value]]
    pivot_dict: dict[Any, dict[Any, Any]] = {}
    column_values_set: set[Any] = set()

    for row in self.data:
        idx_val = row[index_idx]
        col_val = row[columns_idx]
        val = row[values_idx]

        # Track column values for final column list
        column_values_set.add(col_val)

        # Initialize nested dict if needed
        if idx_val not in pivot_dict:
            pivot_dict[idx_val] = {}

        # Check for duplicates
        if col_val in pivot_dict[idx_val]:
            raise ValueError(
                f"Duplicate entries found for index='{idx_val}' and columns='{col_val}'. "
                f"Cannot reshape with duplicate index/column combinations. "
                f"Consider using aggregation or removing duplicates first."
            )

        pivot_dict[idx_val] = {**pivot_dict[idx_val], col_val: val}

    # Sort column values for consistent ordering
    column_values = sorted(column_values_set, key=lambda x: (x is None, x))

    # Build result columns: [index_column, col1, col2, ...]
    result_columns = [index] + column_values

    # Build result data
    result_data: list[list[Any]] = []
    for idx_val in sorted(pivot_dict.keys(), key=lambda x: (x is None, x)):
        row_dict = pivot_dict[idx_val]
        # Build row: [index_value, value_for_col1, value_for_col2, ...]
        row = [idx_val] + [row_dict.get(col_val, None) for col_val in column_values]
        result_data.append(row)

    return self.__class__(columns=result_columns, data=result_data)

merge(other, on=None, how='inner', left_on=None, right_on=None, suffixes=('_x', '_y'))

Merge DataFrames using database-style join.

Source code in src/chapkit/data/dataframe.py
def merge(
    self,
    other: Self,
    on: str | list[str] | None = None,
    how: Literal["inner", "left", "right", "outer"] = "inner",
    left_on: str | list[str] | None = None,
    right_on: str | list[str] | None = None,
    suffixes: tuple[str, str] = ("_x", "_y"),
) -> Self:
    """Merge DataFrames using database-style join."""
    # Determine join keys
    if on is not None:
        if left_on is not None or right_on is not None:
            raise ValueError("Cannot specify both 'on' and 'left_on'/'right_on'")
        left_keys = [on] if isinstance(on, str) else on
        right_keys = left_keys
    elif left_on is not None and right_on is not None:
        left_keys = [left_on] if isinstance(left_on, str) else left_on
        right_keys = [right_on] if isinstance(right_on, str) else right_on
        if len(left_keys) != len(right_keys):
            raise ValueError("left_on and right_on must have same length")
    else:
        raise ValueError("Must specify either 'on' or both 'left_on' and 'right_on'")

    # Validate join keys exist
    for key in left_keys:
        if key not in self.columns:
            raise KeyError(f"Join key '{key}' not found in left DataFrame")
    for key in right_keys:
        if key not in other.columns:
            raise KeyError(f"Join key '{key}' not found in right DataFrame")

    # Get indices for join keys
    left_key_indices = [self.columns.index(k) for k in left_keys]
    right_key_indices = [other.columns.index(k) for k in right_keys]

    # Build lookup dict for right DataFrame: key_tuple -> list[row_indices]
    right_lookup: dict[tuple[Any, ...], list[int]] = {}
    for row_idx, row in enumerate(other.data):
        key_tuple = tuple(row[idx] for idx in right_key_indices)
        if key_tuple not in right_lookup:
            right_lookup[key_tuple] = []
        right_lookup[key_tuple].append(row_idx)

    # Determine result columns
    left_suffix, right_suffix = suffixes

    # Start with left DataFrame columns
    result_columns = self.columns.copy()

    # Add right DataFrame columns (excluding join keys if using 'on')
    for col in other.columns:
        if on is not None and col in left_keys:
            # Skip join key columns from right when using 'on'
            continue

        if col in result_columns:
            # Handle collision with suffix
            result_columns.append(f"{col}{right_suffix}")
            # Also need to rename left column
            left_col_idx = result_columns.index(col)
            result_columns[left_col_idx] = f"{col}{left_suffix}"
        else:
            result_columns.append(col)

    # Get indices of right columns to include
    right_col_indices = []
    for col in other.columns:
        if on is not None and col in right_keys:
            continue
        right_col_indices.append(other.columns.index(col))

    # Perform join
    result_data: list[list[Any]] = []
    matched_right_indices: set[int] = set()

    for left_row in self.data:
        # Extract key from left row
        left_key_tuple = tuple(left_row[idx] for idx in left_key_indices)

        # Find matching rows in right DataFrame
        right_matches = right_lookup.get(left_key_tuple, [])

        if right_matches:
            # Join matched rows
            for right_idx in right_matches:
                matched_right_indices.add(right_idx)
                right_row = other.data[right_idx]

                # Build result row: left columns + right columns (excluding join keys)
                result_row = left_row.copy()
                for col_idx in right_col_indices:
                    result_row.append(right_row[col_idx])

                result_data.append(result_row)
        else:
            # No match
            if how in ("left", "outer"):
                # Include left row with None for right columns
                result_row = left_row.copy()
                result_row.extend([None] * len(right_col_indices))
                result_data.append(result_row)

    # Handle right/outer joins - add unmatched right rows
    if how in ("right", "outer"):
        for right_idx, right_row in enumerate(other.data):
            if right_idx not in matched_right_indices:
                # Build row with None for left columns
                result_row = [None] * len(self.columns)

                # Fill in join key values if using 'on'
                if on is not None:
                    for left_idx, right_idx_key in zip(left_key_indices, right_key_indices):
                        result_row[left_idx] = right_row[right_idx_key]

                # Add right columns
                for col_idx in right_col_indices:
                    result_row.append(right_row[col_idx])

                result_data.append(result_row)

    return self.__class__(columns=result_columns, data=result_data)

transpose()

Transpose DataFrame by swapping rows and columns.

Source code in src/chapkit/data/dataframe.py
def transpose(self) -> Self:
    """Transpose DataFrame by swapping rows and columns."""
    if not self.data:
        # Empty DataFrame - return with swapped structure
        return self.__class__(columns=[], data=[])

    # First column becomes the new column names
    # Remaining columns become data rows
    if not self.columns:
        return self.__class__(columns=[], data=[])

    # Extract first column values as new column names
    # Convert to strings to ensure valid column names
    new_columns = [str(row[0]) for row in self.data]

    # Transpose the remaining columns
    num_original_cols = len(self.columns)
    if num_original_cols == 1:
        # Only one column (the index) - result is just column names as rows
        single_col_data = [[col] for col in self.columns]
        return self.__class__(columns=new_columns if new_columns else ["0"], data=single_col_data)

    # Build transposed data
    # Each original column (except first) becomes a row
    # Each original row becomes a column
    result_data: list[list[Any]] = []

    for col_idx in range(1, num_original_cols):
        # Original column name becomes first value in new row
        row = [self.columns[col_idx]]
        # Add values from each original row for this column
        for orig_row in self.data:
            row.append(orig_row[col_idx])
        result_data.append(row)

    # New columns: first is placeholder for original column names, rest are from first column
    result_columns = ["index"] + new_columns

    return self.__class__(columns=result_columns, data=result_data)

describe()

Generate statistical summary for numeric columns.

Source code in src/chapkit/data/dataframe.py
def describe(self) -> Self:
    """Generate statistical summary for numeric columns."""
    import statistics

    stats_rows: list[list[Any]] = []
    stat_names = ["count", "mean", "std", "min", "25%", "50%", "75%", "max"]

    for col_idx in range(len(self.columns)):
        # Extract numeric values (filter None and non-numeric)
        values = []
        for row in self.data:
            val = row[col_idx]
            if val is not None and isinstance(val, (int, float)) and not isinstance(val, bool):
                values.append(float(val))

        if not values:
            # Non-numeric column - fill with None
            stats_rows.append([None] * len(stat_names))
            continue

        # Calculate statistics
        count = len(values)
        mean = statistics.mean(values)
        std = statistics.stdev(values) if count > 1 else 0.0
        min_val = min(values)
        max_val = max(values)

        # Quantiles
        sorted_vals = sorted(values)
        try:
            q25 = statistics.quantiles(sorted_vals, n=4)[0] if count > 1 else sorted_vals[0]
            q50 = statistics.median(sorted_vals)
            q75 = statistics.quantiles(sorted_vals, n=4)[2] if count > 1 else sorted_vals[0]
        except statistics.StatisticsError:
            q25 = q50 = q75 = sorted_vals[0] if sorted_vals else 0.0

        stats_rows.append([count, mean, std, min_val, q25, q50, q75, max_val])

    # Transpose to make stats the rows and columns the columns
    transposed_data = [
        [stats_rows[col_idx][stat_idx] for col_idx in range(len(self.columns))]
        for stat_idx in range(len(stat_names))
    ]

    return self.__class__(columns=self.columns, data=transposed_data).add_column("stat", stat_names)

groupby(by)

Group DataFrame by column values.

Source code in src/chapkit/data/dataframe.py
def groupby(self, by: str) -> "GroupBy":
    """Group DataFrame by column values."""
    if by not in self.columns:
        raise KeyError(f"Column '{by}' not found in DataFrame")

    return GroupBy(self, by)

equals(other)

Check if two DataFrames are identical.

Source code in src/chapkit/data/dataframe.py
def equals(self, other: Any) -> bool:
    """Check if two DataFrames are identical."""
    if not isinstance(other, DataFrame):
        return False
    return self.columns == other.columns and self.data == other.data

deepcopy()

Create a deep copy of the DataFrame.

Source code in src/chapkit/data/dataframe.py
def deepcopy(self) -> Self:
    """Create a deep copy of the DataFrame."""
    import copy

    return self.__class__(columns=self.columns.copy(), data=copy.deepcopy(self.data))

isna()

Return DataFrame of booleans showing None locations.

Source code in src/chapkit/data/dataframe.py
def isna(self) -> Self:
    """Return DataFrame of booleans showing None locations."""
    null_data = [[val is None for val in row] for row in self.data]
    return self.__class__(columns=self.columns, data=null_data)

notna()

Return DataFrame of booleans showing non-None locations.

Source code in src/chapkit/data/dataframe.py
def notna(self) -> Self:
    """Return DataFrame of booleans showing non-None locations."""
    not_null_data = [[val is not None for val in row] for row in self.data]
    return self.__class__(columns=self.columns, data=not_null_data)

dropna(axis=0, how='any')

Drop rows or columns with None values.

Source code in src/chapkit/data/dataframe.py
def dropna(self, axis: Literal[0, 1] = 0, how: Literal["any", "all"] = "any") -> Self:
    """Drop rows or columns with None values."""
    if axis == 0:
        # Drop rows
        if how == "any":
            # Drop rows with any None
            new_data = [row for row in self.data if not any(val is None for val in row)]
        else:
            # Drop rows with all None
            new_data = [row for row in self.data if not all(val is None for val in row)]
        return self.__class__(columns=self.columns, data=new_data)
    else:
        # Drop columns (axis=1)
        cols_to_keep = []
        indices_to_keep = []

        for col_idx, col_name in enumerate(self.columns):
            col_values = [row[col_idx] for row in self.data]

            if how == "any":
                # Keep column if no None values
                if not any(val is None for val in col_values):
                    cols_to_keep.append(col_name)
                    indices_to_keep.append(col_idx)
            else:
                # Keep column if not all None
                if not all(val is None for val in col_values):
                    cols_to_keep.append(col_name)
                    indices_to_keep.append(col_idx)

        # Extract data for kept columns
        new_data = [[row[i] for i in indices_to_keep] for row in self.data]
        return self.__class__(columns=cols_to_keep, data=new_data)

nunique(column)

Count number of unique values in column.

Source code in src/chapkit/data/dataframe.py
def nunique(self, column: str) -> int:
    """Count number of unique values in column."""
    if column not in self.columns:
        raise KeyError(f"Column '{column}' not found in DataFrame")

    col_idx = self.columns.index(column)
    unique_values = set()
    for row in self.data:
        val = row[col_idx]
        # Count None as a unique value
        unique_values.add(val)
    return len(unique_values)

GroupBy

GroupBy

GroupBy helper for aggregations.

Source code in src/chapkit/data/dataframe.py
class GroupBy:
    """GroupBy helper for aggregations."""

    def __init__(self, dataframe: DataFrame, by: str):
        """Initialize GroupBy helper."""
        self.dataframe = dataframe
        self.by = by
        self.by_idx = dataframe.columns.index(by)

        # Build groups
        self.groups: dict[Any, list[list[Any]]] = {}
        for row in dataframe.data:
            key = row[self.by_idx]
            if key not in self.groups:
                self.groups[key] = []
            self.groups[key].append(row)

    def count(self) -> DataFrame:
        """Count rows per group."""
        data = [[key, len(rows)] for key, rows in self.groups.items()]
        return DataFrame(columns=[self.by, "count"], data=data)

    def sum(self, column: str) -> DataFrame:
        """Sum numeric column per group."""
        if column not in self.dataframe.columns:
            raise KeyError(f"Column '{column}' not found in DataFrame")

        col_idx = self.dataframe.columns.index(column)
        data = []

        for key, rows in self.groups.items():
            values = [
                row[col_idx] for row in rows if row[col_idx] is not None and isinstance(row[col_idx], (int, float))
            ]
            total = sum(values) if values else None
            data.append([key, total])

        return DataFrame(columns=[self.by, f"{column}_sum"], data=data)

    def mean(self, column: str) -> DataFrame:
        """Calculate mean of numeric column per group."""
        if column not in self.dataframe.columns:
            raise KeyError(f"Column '{column}' not found in DataFrame")

        import statistics

        col_idx = self.dataframe.columns.index(column)
        data = []

        for key, rows in self.groups.items():
            values = [
                row[col_idx] for row in rows if row[col_idx] is not None and isinstance(row[col_idx], (int, float))
            ]
            avg = statistics.mean(values) if values else None
            data.append([key, avg])

        return DataFrame(columns=[self.by, f"{column}_mean"], data=data)

    def min(self, column: str) -> DataFrame:
        """Find minimum of numeric column per group."""
        if column not in self.dataframe.columns:
            raise KeyError(f"Column '{column}' not found in DataFrame")

        col_idx = self.dataframe.columns.index(column)
        data = []

        for key, rows in self.groups.items():
            values = [
                row[col_idx] for row in rows if row[col_idx] is not None and isinstance(row[col_idx], (int, float))
            ]
            minimum = min(values) if values else None
            data.append([key, minimum])

        return DataFrame(columns=[self.by, f"{column}_min"], data=data)

    def max(self, column: str) -> DataFrame:
        """Find maximum of numeric column per group."""
        if column not in self.dataframe.columns:
            raise KeyError(f"Column '{column}' not found in DataFrame")

        col_idx = self.dataframe.columns.index(column)
        data = []

        for key, rows in self.groups.items():
            values = [
                row[col_idx] for row in rows if row[col_idx] is not None and isinstance(row[col_idx], (int, float))
            ]
            maximum = max(values) if values else None
            data.append([key, maximum])

        return DataFrame(columns=[self.by, f"{column}_max"], data=data)

Functions

__init__(dataframe, by)

Initialize GroupBy helper.

Source code in src/chapkit/data/dataframe.py
def __init__(self, dataframe: DataFrame, by: str):
    """Initialize GroupBy helper."""
    self.dataframe = dataframe
    self.by = by
    self.by_idx = dataframe.columns.index(by)

    # Build groups
    self.groups: dict[Any, list[list[Any]]] = {}
    for row in dataframe.data:
        key = row[self.by_idx]
        if key not in self.groups:
            self.groups[key] = []
        self.groups[key].append(row)

count()

Count rows per group.

Source code in src/chapkit/data/dataframe.py
def count(self) -> DataFrame:
    """Count rows per group."""
    data = [[key, len(rows)] for key, rows in self.groups.items()]
    return DataFrame(columns=[self.by, "count"], data=data)

sum(column)

Sum numeric column per group.

Source code in src/chapkit/data/dataframe.py
def sum(self, column: str) -> DataFrame:
    """Sum numeric column per group."""
    if column not in self.dataframe.columns:
        raise KeyError(f"Column '{column}' not found in DataFrame")

    col_idx = self.dataframe.columns.index(column)
    data = []

    for key, rows in self.groups.items():
        values = [
            row[col_idx] for row in rows if row[col_idx] is not None and isinstance(row[col_idx], (int, float))
        ]
        total = sum(values) if values else None
        data.append([key, total])

    return DataFrame(columns=[self.by, f"{column}_sum"], data=data)

mean(column)

Calculate mean of numeric column per group.

Source code in src/chapkit/data/dataframe.py
def mean(self, column: str) -> DataFrame:
    """Calculate mean of numeric column per group."""
    if column not in self.dataframe.columns:
        raise KeyError(f"Column '{column}' not found in DataFrame")

    import statistics

    col_idx = self.dataframe.columns.index(column)
    data = []

    for key, rows in self.groups.items():
        values = [
            row[col_idx] for row in rows if row[col_idx] is not None and isinstance(row[col_idx], (int, float))
        ]
        avg = statistics.mean(values) if values else None
        data.append([key, avg])

    return DataFrame(columns=[self.by, f"{column}_mean"], data=data)

min(column)

Find minimum of numeric column per group.

Source code in src/chapkit/data/dataframe.py
def min(self, column: str) -> DataFrame:
    """Find minimum of numeric column per group."""
    if column not in self.dataframe.columns:
        raise KeyError(f"Column '{column}' not found in DataFrame")

    col_idx = self.dataframe.columns.index(column)
    data = []

    for key, rows in self.groups.items():
        values = [
            row[col_idx] for row in rows if row[col_idx] is not None and isinstance(row[col_idx], (int, float))
        ]
        minimum = min(values) if values else None
        data.append([key, minimum])

    return DataFrame(columns=[self.by, f"{column}_min"], data=data)

max(column)

Find maximum of numeric column per group.

Source code in src/chapkit/data/dataframe.py
def max(self, column: str) -> DataFrame:
    """Find maximum of numeric column per group."""
    if column not in self.dataframe.columns:
        raise KeyError(f"Column '{column}' not found in DataFrame")

    col_idx = self.dataframe.columns.index(column)
    data = []

    for key, rows in self.groups.items():
        values = [
            row[col_idx] for row in rows if row[col_idx] is not None and isinstance(row[col_idx], (int, float))
        ]
        maximum = max(values) if values else None
        data.append([key, maximum])

    return DataFrame(columns=[self.by, f"{column}_max"], data=data)

Config Module

Key-value configuration storage with Pydantic schema validation.

Models

models

Config ORM models for key-value configuration storage and artifact linking.

Classes

Config

Bases: Entity

ORM model for configuration with JSON data storage.

Source code in src/chapkit/config/models.py
class Config(Entity):
    """ORM model for configuration with JSON data storage."""

    __tablename__ = "configs"

    name: Mapped[str] = mapped_column(index=True)
    _data_json: Mapped[dict[str, Any]] = mapped_column("data", JSON, nullable=False)

    @property
    def data(self) -> dict[str, Any]:
        """Return JSON data as dict."""
        return self._data_json

    @data.setter
    def data(self, value: BaseConfig | dict[str, Any]) -> None:
        """Serialize Pydantic model to JSON or store dict directly."""
        if isinstance(value, dict):
            self._data_json = value
        elif hasattr(value, "model_dump") and callable(value.model_dump):
            # BaseConfig or other Pydantic model
            self._data_json = value.model_dump(mode="json")
        else:
            raise TypeError(f"data must be a BaseConfig subclass or dict, got {type(value)}")
Attributes
data property writable

Return JSON data as dict.

ConfigArtifact

Bases: Base

Junction table linking Configs to root Artifacts.

Source code in src/chapkit/config/models.py
class ConfigArtifact(Base):
    """Junction table linking Configs to root Artifacts."""

    __tablename__ = "config_artifacts"

    config_id: Mapped[ULID] = mapped_column(
        ULIDType,
        ForeignKey("configs.id", ondelete="CASCADE"),
        primary_key=True,
    )

    artifact_id: Mapped[ULID] = mapped_column(
        ULIDType,
        ForeignKey("artifacts.id", ondelete="CASCADE"),
        primary_key=True,
        unique=True,
    )

    __table_args__ = (UniqueConstraint("artifact_id", name="uq_artifact_id"),)

Schemas

schemas

Config schemas for key-value configuration with JSON data.

Classes

BaseConfig

Bases: BaseModel

Base class for configuration schemas with arbitrary extra fields allowed.

Source code in src/chapkit/config/schemas.py
class BaseConfig(BaseModel):
    """Base class for configuration schemas with arbitrary extra fields allowed."""

    model_config = {"extra": "allow"}

ConfigIn

Bases: EntityIn

Input schema for creating or updating configurations.

Source code in src/chapkit/config/schemas.py
class ConfigIn[DataT: BaseConfig](EntityIn):
    """Input schema for creating or updating configurations."""

    name: str
    data: DataT

ConfigOut

Bases: EntityOut

Output schema for configuration entities.

Source code in src/chapkit/config/schemas.py
class ConfigOut[DataT: BaseConfig](EntityOut):
    """Output schema for configuration entities."""

    name: str
    data: DataT

    model_config = {"ser_json_timedelta": "float", "ser_json_bytes": "base64"}

    @field_validator("data", mode="before")
    @classmethod
    def convert_dict_to_model(cls, v: Any, info: ValidationInfo) -> Any:
        """Convert dict to BaseConfig model if data_cls is provided in validation context."""
        if isinstance(v, BaseConfig):
            return v
        if isinstance(v, dict):
            if info.context and "data_cls" in info.context:
                data_cls = info.context["data_cls"]
                return data_cls.model_validate(v)
        return v

    @field_serializer("data", when_used="json")
    def serialize_data(self, value: DataT) -> dict[str, Any]:
        """Serialize BaseConfig data to JSON dict."""
        if isinstance(value, BaseConfig):  # pyright: ignore[reportUnnecessaryIsInstance]
            return value.model_dump(mode="json")
        return value
Functions
convert_dict_to_model(v, info) classmethod

Convert dict to BaseConfig model if data_cls is provided in validation context.

Source code in src/chapkit/config/schemas.py
@field_validator("data", mode="before")
@classmethod
def convert_dict_to_model(cls, v: Any, info: ValidationInfo) -> Any:
    """Convert dict to BaseConfig model if data_cls is provided in validation context."""
    if isinstance(v, BaseConfig):
        return v
    if isinstance(v, dict):
        if info.context and "data_cls" in info.context:
            data_cls = info.context["data_cls"]
            return data_cls.model_validate(v)
    return v
serialize_data(value)

Serialize BaseConfig data to JSON dict.

Source code in src/chapkit/config/schemas.py
@field_serializer("data", when_used="json")
def serialize_data(self, value: DataT) -> dict[str, Any]:
    """Serialize BaseConfig data to JSON dict."""
    if isinstance(value, BaseConfig):  # pyright: ignore[reportUnnecessaryIsInstance]
        return value.model_dump(mode="json")
    return value

LinkArtifactRequest

Bases: BaseModel

Request schema for linking an artifact to a config.

Source code in src/chapkit/config/schemas.py
class LinkArtifactRequest(BaseModel):
    """Request schema for linking an artifact to a config."""

    artifact_id: ULID

UnlinkArtifactRequest

Bases: BaseModel

Request schema for unlinking an artifact from a config.

Source code in src/chapkit/config/schemas.py
class UnlinkArtifactRequest(BaseModel):
    """Request schema for unlinking an artifact from a config."""

    artifact_id: ULID

Repository

repository

Config repository for database access and artifact linking.

Classes

ConfigRepository

Bases: BaseRepository[Config, ULID]

Repository for Config entities with artifact linking operations.

Source code in src/chapkit/config/repository.py
class ConfigRepository(BaseRepository[Config, ULID]):
    """Repository for Config entities with artifact linking operations."""

    def __init__(self, session: AsyncSession) -> None:
        """Initialize config repository with database session."""
        super().__init__(session, Config)

    async def find_by_name(self, name: str) -> Config | None:
        """Find a config by its unique name."""
        result = await self.s.scalars(select(self.model).where(self.model.name == name))
        return result.one_or_none()

    async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
        """Link a config to a root artifact."""
        artifact = await self.s.get(Artifact, artifact_id)
        if artifact is None:
            raise ValueError(f"Artifact {artifact_id} not found")
        if artifact.parent_id is not None:
            raise ValueError(f"Artifact {artifact_id} is not a root artifact (parent_id={artifact.parent_id})")

        link = ConfigArtifact(config_id=config_id, artifact_id=artifact_id)
        self.s.add(link)

    async def unlink_artifact(self, artifact_id: ULID) -> None:
        """Unlink an artifact from its config."""
        stmt = sql_delete(ConfigArtifact).where(ConfigArtifact.artifact_id == artifact_id)
        await self.s.execute(stmt)

    async def delete_by_id(self, id: ULID) -> None:
        """Delete a config and cascade delete all linked artifact trees."""
        from chapkit.artifact.repository import ArtifactRepository

        linked_artifacts = await self.find_artifacts_for_config(id)

        artifact_repo = ArtifactRepository(self.s)
        for root_artifact in linked_artifacts:
            subtree = await artifact_repo.find_subtree(root_artifact.id)
            for artifact in subtree:
                await self.s.delete(artifact)

        await super().delete_by_id(id)

    async def find_by_root_artifact_id(self, artifact_id: ULID) -> Config | None:
        """Find the config linked to a root artifact."""
        stmt = (
            select(Config)
            .join(ConfigArtifact, Config.id == ConfigArtifact.config_id)
            .where(ConfigArtifact.artifact_id == artifact_id)
        )
        result = await self.s.scalars(stmt)
        return result.one_or_none()

    async def find_artifacts_for_config(self, config_id: ULID) -> list[Artifact]:
        """Find all root artifacts linked to a config."""
        stmt = (
            select(Artifact)
            .join(ConfigArtifact, Artifact.id == ConfigArtifact.artifact_id)
            .where(ConfigArtifact.config_id == config_id)
        )
        result = await self.s.scalars(stmt)
        return list(result.all())
Functions
__init__(session)

Initialize config repository with database session.

Source code in src/chapkit/config/repository.py
def __init__(self, session: AsyncSession) -> None:
    """Initialize config repository with database session."""
    super().__init__(session, Config)
find_by_name(name) async

Find a config by its unique name.

Source code in src/chapkit/config/repository.py
async def find_by_name(self, name: str) -> Config | None:
    """Find a config by its unique name."""
    result = await self.s.scalars(select(self.model).where(self.model.name == name))
    return result.one_or_none()

Link a config to a root artifact.

Source code in src/chapkit/config/repository.py
async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
    """Link a config to a root artifact."""
    artifact = await self.s.get(Artifact, artifact_id)
    if artifact is None:
        raise ValueError(f"Artifact {artifact_id} not found")
    if artifact.parent_id is not None:
        raise ValueError(f"Artifact {artifact_id} is not a root artifact (parent_id={artifact.parent_id})")

    link = ConfigArtifact(config_id=config_id, artifact_id=artifact_id)
    self.s.add(link)

Unlink an artifact from its config.

Source code in src/chapkit/config/repository.py
async def unlink_artifact(self, artifact_id: ULID) -> None:
    """Unlink an artifact from its config."""
    stmt = sql_delete(ConfigArtifact).where(ConfigArtifact.artifact_id == artifact_id)
    await self.s.execute(stmt)
delete_by_id(id) async

Delete a config and cascade delete all linked artifact trees.

Source code in src/chapkit/config/repository.py
async def delete_by_id(self, id: ULID) -> None:
    """Delete a config and cascade delete all linked artifact trees."""
    from chapkit.artifact.repository import ArtifactRepository

    linked_artifacts = await self.find_artifacts_for_config(id)

    artifact_repo = ArtifactRepository(self.s)
    for root_artifact in linked_artifacts:
        subtree = await artifact_repo.find_subtree(root_artifact.id)
        for artifact in subtree:
            await self.s.delete(artifact)

    await super().delete_by_id(id)
find_by_root_artifact_id(artifact_id) async

Find the config linked to a root artifact.

Source code in src/chapkit/config/repository.py
async def find_by_root_artifact_id(self, artifact_id: ULID) -> Config | None:
    """Find the config linked to a root artifact."""
    stmt = (
        select(Config)
        .join(ConfigArtifact, Config.id == ConfigArtifact.config_id)
        .where(ConfigArtifact.artifact_id == artifact_id)
    )
    result = await self.s.scalars(stmt)
    return result.one_or_none()
find_artifacts_for_config(config_id) async

Find all root artifacts linked to a config.

Source code in src/chapkit/config/repository.py
async def find_artifacts_for_config(self, config_id: ULID) -> list[Artifact]:
    """Find all root artifacts linked to a config."""
    stmt = (
        select(Artifact)
        .join(ConfigArtifact, Artifact.id == ConfigArtifact.artifact_id)
        .where(ConfigArtifact.config_id == config_id)
    )
    result = await self.s.scalars(stmt)
    return list(result.all())

Manager

manager

Config manager for CRUD operations and artifact linking.

Classes

ConfigManager

Bases: BaseManager[Config, ConfigIn[DataT], ConfigOut[DataT], ULID]

Manager for Config entities with artifact linking operations.

Source code in src/chapkit/config/manager.py
class ConfigManager[DataT: BaseConfig](BaseManager[Config, ConfigIn[DataT], ConfigOut[DataT], ULID]):
    """Manager for Config entities with artifact linking operations."""

    def __init__(self, repo: ConfigRepository, data_cls: type[DataT]) -> None:
        """Initialize config manager with repository and data class."""
        super().__init__(repo, Config, ConfigOut)
        self.repository: ConfigRepository = repo
        self.data_cls = data_cls

    async def find_by_name(self, name: str) -> ConfigOut[DataT] | None:
        """Find a config by its unique name."""
        config = await self.repository.find_by_name(name)
        if config:
            return self._to_output_schema(config)
        return None

    async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
        """Link a config to a root artifact."""
        await self.repository.link_artifact(config_id, artifact_id)
        await self.repository.commit()

    async def unlink_artifact(self, artifact_id: ULID) -> None:
        """Unlink an artifact from its config."""
        await self.repository.unlink_artifact(artifact_id)
        await self.repository.commit()

    async def get_config_for_artifact(
        self, artifact_id: ULID, artifact_repo: ArtifactRepository
    ) -> ConfigOut[DataT] | None:
        """Get the config for an artifact by traversing to its root."""
        root = await artifact_repo.get_root_artifact(artifact_id)
        if root is None:
            return None

        config = await self.repository.find_by_root_artifact_id(root.id)
        if config is None:
            return None

        return self._to_output_schema(config)

    async def get_linked_artifacts(self, config_id: ULID) -> list[ArtifactOut]:
        """Get all root artifacts linked to a config."""
        artifacts = await self.repository.find_artifacts_for_config(config_id)
        return [ArtifactOut.model_validate(artifact, from_attributes=True) for artifact in artifacts]

    def _to_output_schema(self, entity: Config) -> ConfigOut[DataT]:
        """Convert ORM entity to output schema with proper data class validation."""
        return ConfigOut[DataT].model_validate(entity, from_attributes=True, context={"data_cls": self.data_cls})
Functions
__init__(repo, data_cls)

Initialize config manager with repository and data class.

Source code in src/chapkit/config/manager.py
def __init__(self, repo: ConfigRepository, data_cls: type[DataT]) -> None:
    """Initialize config manager with repository and data class."""
    super().__init__(repo, Config, ConfigOut)
    self.repository: ConfigRepository = repo
    self.data_cls = data_cls
find_by_name(name) async

Find a config by its unique name.

Source code in src/chapkit/config/manager.py
async def find_by_name(self, name: str) -> ConfigOut[DataT] | None:
    """Find a config by its unique name."""
    config = await self.repository.find_by_name(name)
    if config:
        return self._to_output_schema(config)
    return None

Link a config to a root artifact.

Source code in src/chapkit/config/manager.py
async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
    """Link a config to a root artifact."""
    await self.repository.link_artifact(config_id, artifact_id)
    await self.repository.commit()

Unlink an artifact from its config.

Source code in src/chapkit/config/manager.py
async def unlink_artifact(self, artifact_id: ULID) -> None:
    """Unlink an artifact from its config."""
    await self.repository.unlink_artifact(artifact_id)
    await self.repository.commit()
get_config_for_artifact(artifact_id, artifact_repo) async

Get the config for an artifact by traversing to its root.

Source code in src/chapkit/config/manager.py
async def get_config_for_artifact(
    self, artifact_id: ULID, artifact_repo: ArtifactRepository
) -> ConfigOut[DataT] | None:
    """Get the config for an artifact by traversing to its root."""
    root = await artifact_repo.get_root_artifact(artifact_id)
    if root is None:
        return None

    config = await self.repository.find_by_root_artifact_id(root.id)
    if config is None:
        return None

    return self._to_output_schema(config)
get_linked_artifacts(config_id) async

Get all root artifacts linked to a config.

Source code in src/chapkit/config/manager.py
async def get_linked_artifacts(self, config_id: ULID) -> list[ArtifactOut]:
    """Get all root artifacts linked to a config."""
    artifacts = await self.repository.find_artifacts_for_config(config_id)
    return [ArtifactOut.model_validate(artifact, from_attributes=True) for artifact in artifacts]

Router

router

Config CRUD router with artifact linking operations.

Classes

ConfigRouter

Bases: CrudRouter[ConfigIn[BaseConfig], ConfigOut[BaseConfig]]

CRUD router for Config entities with artifact linking operations.

Source code in src/chapkit/config/router.py
class ConfigRouter(CrudRouter[ConfigIn[BaseConfig], ConfigOut[BaseConfig]]):
    """CRUD router for Config entities with artifact linking operations."""

    def __init__(
        self,
        prefix: str,
        tags: Sequence[str],
        manager_factory: Any,
        entity_in_type: type[ConfigIn[BaseConfig]],
        entity_out_type: type[ConfigOut[BaseConfig]],
        permissions: CrudPermissions | None = None,
        enable_artifact_operations: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize config router with entity types and manager factory."""
        self.enable_artifact_operations = enable_artifact_operations
        super().__init__(
            prefix=prefix,
            tags=list(tags),
            entity_in_type=entity_in_type,
            entity_out_type=entity_out_type,
            manager_factory=manager_factory,
            permissions=permissions,
            **kwargs,
        )

    def _register_schema_route(self) -> None:
        """Register JSON schema endpoint for the config data type only."""
        entity_out_type = self.entity_out_type

        async def get_schema() -> dict[str, Any]:
            """Return the config schema (data field) instead of the full ConfigOut schema."""
            full_schema = entity_out_type.model_json_schema()

            # Extract the config schema from the data field's $ref
            if "$defs" in full_schema and "data" in full_schema.get("properties", {}):
                data_prop = full_schema["properties"]["data"]
                if "$ref" in data_prop:
                    # Extract schema name from $ref (e.g., "#/$defs/DiseaseConfig")
                    ref_name = data_prop["$ref"].split("/")[-1]
                    if ref_name in full_schema["$defs"]:
                        return full_schema["$defs"][ref_name]

            # Fallback to full schema if extraction fails
            return full_schema

        self.register_collection_operation(
            name="schema",
            handler=get_schema,
            http_method="GET",
            response_model=dict[str, Any],
        )

    def _register_routes(self) -> None:
        """Register config CRUD routes and artifact linking operations."""
        super()._register_routes()

        if not self.enable_artifact_operations:
            return

        manager_factory = self.manager_factory

        async def link_artifact(
            entity_id: str,
            request: LinkArtifactRequest,
            manager: ConfigManager[BaseConfig] = Depends(manager_factory),
        ) -> None:
            config_id = self._parse_ulid(entity_id)

            try:
                await manager.link_artifact(config_id, request.artifact_id)
            except ValueError as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )

        async def unlink_artifact(
            entity_id: str,
            request: UnlinkArtifactRequest,
            manager: ConfigManager[BaseConfig] = Depends(manager_factory),
        ) -> None:
            try:
                await manager.unlink_artifact(request.artifact_id)
            except Exception as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )

        async def get_linked_artifacts(
            entity_id: str,
            manager: ConfigManager[BaseConfig] = Depends(manager_factory),
        ) -> list[ArtifactOut]:
            config_id = self._parse_ulid(entity_id)
            return await manager.get_linked_artifacts(config_id)

        self.register_entity_operation(
            "link-artifact",
            link_artifact,
            http_method="POST",
            status_code=status.HTTP_204_NO_CONTENT,
            summary="Link artifact to config",
            description="Link a config to a root artifact (parent_id IS NULL)",
        )

        self.register_entity_operation(
            "unlink-artifact",
            unlink_artifact,
            http_method="POST",
            status_code=status.HTTP_204_NO_CONTENT,
            summary="Unlink artifact from config",
            description="Remove the link between a config and an artifact",
        )

        self.register_entity_operation(
            "artifacts",
            get_linked_artifacts,
            http_method="GET",
            response_model=list[ArtifactOut],
            summary="Get linked artifacts",
            description="Get all root artifacts linked to this config",
        )
Functions
__init__(prefix, tags, manager_factory, entity_in_type, entity_out_type, permissions=None, enable_artifact_operations=False, **kwargs)

Initialize config router with entity types and manager factory.

Source code in src/chapkit/config/router.py
def __init__(
    self,
    prefix: str,
    tags: Sequence[str],
    manager_factory: Any,
    entity_in_type: type[ConfigIn[BaseConfig]],
    entity_out_type: type[ConfigOut[BaseConfig]],
    permissions: CrudPermissions | None = None,
    enable_artifact_operations: bool = False,
    **kwargs: Any,
) -> None:
    """Initialize config router with entity types and manager factory."""
    self.enable_artifact_operations = enable_artifact_operations
    super().__init__(
        prefix=prefix,
        tags=list(tags),
        entity_in_type=entity_in_type,
        entity_out_type=entity_out_type,
        manager_factory=manager_factory,
        permissions=permissions,
        **kwargs,
    )

ML Module

Train/predict workflows with artifact-based model storage and timing metadata.

Schemas

schemas

Pydantic schemas for ML train/predict operations.

Migration Note

TrainedModelArtifactData and PredictionArtifactData have been replaced by MLTrainingArtifactData and MLPredictionArtifactData from chapkit.artifact.data_schemas.

Key changes: - ml_type field renamed to type - model field moved to content - predictions field moved to content - Added nested metadata structure - Added content_type and content_size fields - Removed training_artifact_id (use parent_id instead) - Removed model_type and model_size_bytes (metadata only)

Classes

MLPredictionArtifactData

Bases: BaseArtifactData[MLMetadata]

Schema for ML prediction artifact data with results.

Source code in src/chapkit/artifact/schemas.py
class MLPredictionArtifactData(BaseArtifactData[MLMetadata]):
    """Schema for ML prediction artifact data with results."""

    type: Literal["ml_prediction"] = Field(default="ml_prediction", frozen=True)  # pyright: ignore[reportIncompatibleVariableOverride]
    metadata: MLMetadata

MLTrainingArtifactData

Bases: BaseArtifactData[MLMetadata]

Schema for ML training artifact data with trained model.

Source code in src/chapkit/artifact/schemas.py
class MLTrainingArtifactData(BaseArtifactData[MLMetadata]):
    """Schema for ML training artifact data with trained model."""

    type: Literal["ml_training"] = Field(default="ml_training", frozen=True)  # pyright: ignore[reportIncompatibleVariableOverride]
    metadata: MLMetadata

TrainRequest

Bases: BaseModel

Request schema for training a model.

Source code in src/chapkit/ml/schemas.py
class TrainRequest(BaseModel):
    """Request schema for training a model."""

    config_id: ULID = Field(description="ID of the config to use for training")
    data: DataFrame = Field(description="Training data as DataFrame")
    geo: FeatureCollection | None = Field(default=None, description="Optional geospatial data")

TrainResponse

Bases: BaseModel

Response schema for train operation submission.

Source code in src/chapkit/ml/schemas.py
class TrainResponse(BaseModel):
    """Response schema for train operation submission."""

    job_id: str = Field(description="ID of the training job in the scheduler")
    artifact_id: str = Field(description="ID that will contain the trained model artifact")
    message: str = Field(description="Human-readable message")

PredictRequest

Bases: BaseModel

Request schema for making predictions.

Source code in src/chapkit/ml/schemas.py
class PredictRequest(BaseModel):
    """Request schema for making predictions."""

    artifact_id: ULID = Field(description="ID of the artifact containing the trained model")
    historic: DataFrame = Field(description="Historic data as DataFrame")
    future: DataFrame = Field(description="Future/prediction data as DataFrame")
    geo: FeatureCollection | None = Field(default=None, description="Optional geospatial data")

PredictResponse

Bases: BaseModel

Response schema for predict operation submission.

Source code in src/chapkit/ml/schemas.py
class PredictResponse(BaseModel):
    """Response schema for predict operation submission."""

    job_id: str = Field(description="ID of the prediction job in the scheduler")
    artifact_id: str = Field(description="ID that will contain the prediction artifact")
    message: str = Field(description="Human-readable message")

ModelRunnerProtocol

Bases: Protocol[ConfigT]

Protocol defining the interface for model runners.

Source code in src/chapkit/ml/schemas.py
class ModelRunnerProtocol(Protocol[ConfigT]):
    """Protocol defining the interface for model runners."""

    async def on_train(
        self,
        config: ConfigT,
        data: DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model and return the trained model object (must be pickleable)."""
        ...

    async def on_predict(
        self,
        config: ConfigT,
        model: Any,
        historic: DataFrame,
        future: DataFrame,
        geo: FeatureCollection | None = None,
    ) -> DataFrame:
        """Make predictions using a trained model and return predictions as DataFrame."""
        ...
Functions
on_train(config, data, geo=None) async

Train a model and return the trained model object (must be pickleable).

Source code in src/chapkit/ml/schemas.py
async def on_train(
    self,
    config: ConfigT,
    data: DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model and return the trained model object (must be pickleable)."""
    ...
on_predict(config, model, historic, future, geo=None) async

Make predictions using a trained model and return predictions as DataFrame.

Source code in src/chapkit/ml/schemas.py
async def on_predict(
    self,
    config: ConfigT,
    model: Any,
    historic: DataFrame,
    future: DataFrame,
    geo: FeatureCollection | None = None,
) -> DataFrame:
    """Make predictions using a trained model and return predictions as DataFrame."""
    ...

Manager

manager

Manager for ML train/predict operations with artifact-based storage.

Classes

MLManager

Bases: Generic[ConfigT]

Manager for ML train/predict operations with job scheduling and artifact storage.

Source code in src/chapkit/ml/manager.py
class MLManager(Generic[ConfigT]):
    """Manager for ML train/predict operations with job scheduling and artifact storage."""

    def __init__(
        self,
        runner: ModelRunnerProtocol[ConfigT],
        scheduler: ChapkitScheduler,
        database: Database,
        config_schema: type[ConfigT],
    ) -> None:
        """Initialize ML manager with runner, scheduler, database, and config schema."""
        self.runner = runner
        self.scheduler = scheduler
        self.database = database
        self.config_schema = config_schema

    async def execute_train(self, request: TrainRequest) -> TrainResponse:
        """Submit a training job to the scheduler and return job/artifact IDs."""
        # Pre-allocate artifact ID for the trained model
        artifact_id = ULID()

        # Submit job to scheduler
        job_id = await self.scheduler.add_job(
            self._train_task,
            request,
            artifact_id,
        )

        return TrainResponse(
            job_id=str(job_id),
            artifact_id=str(artifact_id),
            message=f"Training job submitted. Job ID: {job_id}",
        )

    async def execute_predict(self, request: PredictRequest) -> PredictResponse:
        """Submit a prediction job to the scheduler and return job/artifact IDs."""
        # Pre-allocate artifact ID for predictions
        artifact_id = ULID()

        # Submit job to scheduler
        job_id = await self.scheduler.add_job(
            self._predict_task,
            request,
            artifact_id,
        )

        return PredictResponse(
            job_id=str(job_id),
            artifact_id=str(artifact_id),
            message=f"Prediction job submitted. Job ID: {job_id}",
        )

    async def _train_task(self, request: TrainRequest, artifact_id: ULID) -> ULID:
        """Execute training task and store trained model in artifact."""
        # Load config
        async with self.database.session() as session:
            config_repo = ConfigRepository(session)
            config_manager: ConfigManager[ConfigT] = ConfigManager(config_repo, self.config_schema)
            config = await config_manager.find_by_id(request.config_id)

            if config is None:
                raise ValueError(f"Config {request.config_id} not found")

        # Train model with timing
        training_started_at = datetime.datetime.now(datetime.UTC)
        trained_model = await self.runner.on_train(
            config=config.data,
            data=request.data,
            geo=request.geo,
        )
        training_completed_at = datetime.datetime.now(datetime.UTC)
        training_duration = (training_completed_at - training_started_at).total_seconds()

        # Store trained model in artifact with metadata
        async with self.database.session() as session:
            artifact_repo = ArtifactRepository(session)
            artifact_manager = ArtifactManager(artifact_repo)
            config_repo = ConfigRepository(session)

            # Create metadata
            from chapkit.artifact.schemas import MLMetadata, MLTrainingArtifactData

            metadata = MLMetadata(
                status="success",
                config_id=str(request.config_id),
                started_at=training_started_at.isoformat(),
                completed_at=training_completed_at.isoformat(),
                duration_seconds=round(training_duration, 2),
            )

            # Create and validate artifact data structure with Pydantic
            # Note: We validate but don't serialize to JSON because content contains Python objects
            artifact_data_model = MLTrainingArtifactData(
                type="ml_training",
                metadata=metadata,
                content=trained_model,
                content_type="application/x-pickle",
                content_size=None,
            )

            # Construct dict manually to preserve Python objects (database uses PickleType)
            artifact_data = {
                "type": artifact_data_model.type,
                "metadata": artifact_data_model.metadata.model_dump(),
                "content": trained_model,  # Keep as Python object
                "content_type": artifact_data_model.content_type,
                "content_size": artifact_data_model.content_size,
            }

            await artifact_manager.save(
                ArtifactIn(
                    id=artifact_id,
                    data=artifact_data,
                    parent_id=None,
                    level=0,
                )
            )

            # Link config to root artifact for tree traversal
            await config_repo.link_artifact(request.config_id, artifact_id)
            await config_repo.commit()

        return artifact_id

    async def _predict_task(self, request: PredictRequest, artifact_id: ULID) -> ULID:
        """Execute prediction task and store predictions in artifact."""
        # Load training artifact
        async with self.database.session() as session:
            artifact_repo = ArtifactRepository(session)
            artifact_manager = ArtifactManager(artifact_repo)
            training_artifact = await artifact_manager.find_by_id(request.artifact_id)

            if training_artifact is None:
                raise ValueError(f"Training artifact {request.artifact_id} not found")

        # Extract model and config_id from artifact
        training_data = training_artifact.data
        if not isinstance(training_data, dict) or training_data.get("type") != "ml_training":
            raise ValueError(f"Artifact {request.artifact_id} is not a training artifact")

        trained_model = training_data["content"]
        config_id = ULID.from_str(training_data["metadata"]["config_id"])

        # Load config
        async with self.database.session() as session:
            config_repo = ConfigRepository(session)
            config_manager: ConfigManager[ConfigT] = ConfigManager(config_repo, self.config_schema)
            config = await config_manager.find_by_id(config_id)

            if config is None:
                raise ValueError(f"Config {config_id} not found")

        # Make predictions with timing
        prediction_started_at = datetime.datetime.now(datetime.UTC)
        predictions = await self.runner.on_predict(
            config=config.data,
            model=trained_model,
            historic=request.historic,
            future=request.future,
            geo=request.geo,
        )
        prediction_completed_at = datetime.datetime.now(datetime.UTC)
        prediction_duration = (prediction_completed_at - prediction_started_at).total_seconds()

        # Store predictions in artifact with parent linkage
        async with self.database.session() as session:
            artifact_repo = ArtifactRepository(session)
            artifact_manager = ArtifactManager(artifact_repo)

            # Create metadata
            from chapkit.artifact.schemas import MLMetadata, MLPredictionArtifactData

            metadata = MLMetadata(
                status="success",
                config_id=str(config_id),
                started_at=prediction_started_at.isoformat(),
                completed_at=prediction_completed_at.isoformat(),
                duration_seconds=round(prediction_duration, 2),
            )

            # Create and validate artifact data structure with Pydantic
            # Note: We validate but don't serialize to JSON because content contains Python objects
            artifact_data_model = MLPredictionArtifactData(
                type="ml_prediction",
                metadata=metadata,
                content=predictions,
                content_type="application/vnd.chapkit.dataframe+json",
                content_size=None,
            )

            # Construct dict manually to preserve Python objects (database uses PickleType)
            artifact_data = {
                "type": artifact_data_model.type,
                "metadata": artifact_data_model.metadata.model_dump(),
                "content": predictions,  # Keep as Python object (DataFrame)
                "content_type": artifact_data_model.content_type,
                "content_size": artifact_data_model.content_size,
            }

            await artifact_manager.save(
                ArtifactIn(
                    id=artifact_id,
                    data=artifact_data,
                    parent_id=request.artifact_id,
                    level=1,
                )
            )

        return artifact_id
Functions
__init__(runner, scheduler, database, config_schema)

Initialize ML manager with runner, scheduler, database, and config schema.

Source code in src/chapkit/ml/manager.py
def __init__(
    self,
    runner: ModelRunnerProtocol[ConfigT],
    scheduler: ChapkitScheduler,
    database: Database,
    config_schema: type[ConfigT],
) -> None:
    """Initialize ML manager with runner, scheduler, database, and config schema."""
    self.runner = runner
    self.scheduler = scheduler
    self.database = database
    self.config_schema = config_schema
execute_train(request) async

Submit a training job to the scheduler and return job/artifact IDs.

Source code in src/chapkit/ml/manager.py
async def execute_train(self, request: TrainRequest) -> TrainResponse:
    """Submit a training job to the scheduler and return job/artifact IDs."""
    # Pre-allocate artifact ID for the trained model
    artifact_id = ULID()

    # Submit job to scheduler
    job_id = await self.scheduler.add_job(
        self._train_task,
        request,
        artifact_id,
    )

    return TrainResponse(
        job_id=str(job_id),
        artifact_id=str(artifact_id),
        message=f"Training job submitted. Job ID: {job_id}",
    )
execute_predict(request) async

Submit a prediction job to the scheduler and return job/artifact IDs.

Source code in src/chapkit/ml/manager.py
async def execute_predict(self, request: PredictRequest) -> PredictResponse:
    """Submit a prediction job to the scheduler and return job/artifact IDs."""
    # Pre-allocate artifact ID for predictions
    artifact_id = ULID()

    # Submit job to scheduler
    job_id = await self.scheduler.add_job(
        self._predict_task,
        request,
        artifact_id,
    )

    return PredictResponse(
        job_id=str(job_id),
        artifact_id=str(artifact_id),
        message=f"Prediction job submitted. Job ID: {job_id}",
    )

Router

router

REST API router for ML train/predict operations.

Classes

MLRouter

Bases: Router

Router with $train and $predict collection operations.

Source code in src/chapkit/ml/router.py
class MLRouter(Router):
    """Router with $train and $predict collection operations."""

    def __init__(
        self,
        prefix: str,
        tags: list[str],
        manager_factory: Any,
        **kwargs: Any,
    ) -> None:
        """Initialize ML router with manager factory."""
        self.manager_factory = manager_factory
        super().__init__(prefix=prefix, tags=tags, **kwargs)

    def _register_routes(self) -> None:
        """Register ML train and predict routes."""
        from fastapi import HTTPException

        manager_factory = self.manager_factory

        @self.router.post(
            "/$train",
            response_model=TrainResponse,
            status_code=status.HTTP_202_ACCEPTED,
            summary="Train model",
            description="Submit a training job to the scheduler",
        )
        async def train(
            request: TrainRequest,
            manager: MLManager = Depends(manager_factory),
        ) -> TrainResponse:
            """Train a model asynchronously and return job/artifact IDs."""
            try:
                response = await manager.execute_train(request)
                train_counter, _ = _get_counters()
                train_counter.add(1)
                return response
            except ValueError as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )
            except RuntimeError as e:
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail=str(e),
                )

        @self.router.post(
            "/$predict",
            response_model=PredictResponse,
            status_code=status.HTTP_202_ACCEPTED,
            summary="Make predictions",
            description="Submit a prediction job to the scheduler",
        )
        async def predict(
            request: PredictRequest,
            manager: MLManager = Depends(manager_factory),
        ) -> PredictResponse:
            """Make predictions asynchronously and return job/artifact IDs."""
            try:
                response = await manager.execute_predict(request)
                _, predict_counter = _get_counters()
                predict_counter.add(1)
                return response
            except ValueError as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )
            except RuntimeError as e:
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail=str(e),
                )
Functions
__init__(prefix, tags, manager_factory, **kwargs)

Initialize ML router with manager factory.

Source code in src/chapkit/ml/router.py
def __init__(
    self,
    prefix: str,
    tags: list[str],
    manager_factory: Any,
    **kwargs: Any,
) -> None:
    """Initialize ML router with manager factory."""
    self.manager_factory = manager_factory
    super().__init__(prefix=prefix, tags=tags, **kwargs)

Model Runners

Protocol and implementations for ML model training and prediction.

BaseModelRunner

BaseModelRunner

Bases: ABC, Generic[ConfigT]

Abstract base class for model runners with lifecycle hooks.

Source code in src/chapkit/ml/runner.py
class BaseModelRunner(ABC, Generic[ConfigT]):
    """Abstract base class for model runners with lifecycle hooks."""

    async def on_init(self) -> None:
        """Optional initialization hook called before training or prediction."""
        pass

    async def on_cleanup(self) -> None:
        """Optional cleanup hook called after training or prediction."""
        pass

    @abstractmethod
    async def on_train(
        self,
        config: ConfigT,
        data: DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model and return the trained model object (must be pickleable)."""
        ...

    @abstractmethod
    async def on_predict(
        self,
        config: ConfigT,
        model: Any,
        historic: DataFrame,
        future: DataFrame,
        geo: FeatureCollection | None = None,
    ) -> DataFrame:
        """Make predictions using a trained model and return predictions as DataFrame."""
        ...

Functions

on_init() async

Optional initialization hook called before training or prediction.

Source code in src/chapkit/ml/runner.py
async def on_init(self) -> None:
    """Optional initialization hook called before training or prediction."""
    pass

on_cleanup() async

Optional cleanup hook called after training or prediction.

Source code in src/chapkit/ml/runner.py
async def on_cleanup(self) -> None:
    """Optional cleanup hook called after training or prediction."""
    pass

on_train(config, data, geo=None) abstractmethod async

Train a model and return the trained model object (must be pickleable).

Source code in src/chapkit/ml/runner.py
@abstractmethod
async def on_train(
    self,
    config: ConfigT,
    data: DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model and return the trained model object (must be pickleable)."""
    ...

on_predict(config, model, historic, future, geo=None) abstractmethod async

Make predictions using a trained model and return predictions as DataFrame.

Source code in src/chapkit/ml/runner.py
@abstractmethod
async def on_predict(
    self,
    config: ConfigT,
    model: Any,
    historic: DataFrame,
    future: DataFrame,
    geo: FeatureCollection | None = None,
) -> DataFrame:
    """Make predictions using a trained model and return predictions as DataFrame."""
    ...

FunctionalModelRunner

FunctionalModelRunner

Bases: BaseModelRunner[ConfigT]

Functional model runner wrapping train and predict functions.

Source code in src/chapkit/ml/runner.py
class FunctionalModelRunner(BaseModelRunner[ConfigT]):
    """Functional model runner wrapping train and predict functions."""

    def __init__(
        self,
        on_train: TrainFunction[ConfigT],
        on_predict: PredictFunction[ConfigT],
    ) -> None:
        """Initialize functional runner with train and predict functions."""
        self._on_train = on_train
        self._on_predict = on_predict

    async def on_train(
        self,
        config: ConfigT,
        data: DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model and return the trained model object."""
        return await self._on_train(config, data, geo)

    async def on_predict(
        self,
        config: ConfigT,
        model: Any,
        historic: DataFrame,
        future: DataFrame,
        geo: FeatureCollection | None = None,
    ) -> DataFrame:
        """Make predictions using a trained model."""
        return await self._on_predict(config, model, historic, future, geo)

Functions

__init__(on_train, on_predict)

Initialize functional runner with train and predict functions.

Source code in src/chapkit/ml/runner.py
def __init__(
    self,
    on_train: TrainFunction[ConfigT],
    on_predict: PredictFunction[ConfigT],
) -> None:
    """Initialize functional runner with train and predict functions."""
    self._on_train = on_train
    self._on_predict = on_predict

on_train(config, data, geo=None) async

Train a model and return the trained model object.

Source code in src/chapkit/ml/runner.py
async def on_train(
    self,
    config: ConfigT,
    data: DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model and return the trained model object."""
    return await self._on_train(config, data, geo)

on_predict(config, model, historic, future, geo=None) async

Make predictions using a trained model.

Source code in src/chapkit/ml/runner.py
async def on_predict(
    self,
    config: ConfigT,
    model: Any,
    historic: DataFrame,
    future: DataFrame,
    geo: FeatureCollection | None = None,
) -> DataFrame:
    """Make predictions using a trained model."""
    return await self._on_predict(config, model, historic, future, geo)

ShellModelRunner

ShellModelRunner

Bases: BaseModelRunner[ConfigT]

Shell-based model runner that executes external scripts for train/predict operations.

Source code in src/chapkit/ml/runner.py
class ShellModelRunner(BaseModelRunner[ConfigT]):
    """Shell-based model runner that executes external scripts for train/predict operations."""

    def __init__(
        self,
        train_command: str,
        predict_command: str,
        model_format: str = "pickle",
    ) -> None:
        """Initialize shell runner with command templates for train/predict operations."""
        self.train_command = train_command
        self.predict_command = predict_command
        self.model_format = model_format

    async def on_train(
        self,
        config: ConfigT,
        data: DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model by executing external training script (model file creation is optional)."""
        temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_train_"))

        try:
            # Write config to YAML file
            config_file = temp_dir / "config.yml"
            config_file.write_text(yaml.safe_dump(config.model_dump(), indent=2))

            # Write training data to CSV
            data_file = temp_dir / "data.csv"
            data.to_csv(data_file)

            # Write geo data if provided
            geo_file = temp_dir / "geo.json" if geo else None
            if geo:
                assert geo_file is not None  # For type checker
                geo_file.write_text(geo.model_dump_json(indent=2))

            # Model file path
            model_file = temp_dir / f"model.{self.model_format}"

            # Substitute variables in command
            command = self.train_command.format(
                config_file=str(config_file),
                data_file=str(data_file),
                model_file=str(model_file),
                geo_file=str(geo_file) if geo_file else "",
            )

            logger.info("executing_train_script", command=command, temp_dir=str(temp_dir))

            # Execute subprocess
            result = await run_shell(command, cwd=str(temp_dir))
            stdout = result["stdout"]
            stderr = result["stderr"]

            if result["returncode"] != 0:
                logger.error("train_script_failed", exit_code=result["returncode"], stderr=stderr)
                raise RuntimeError(f"Training script failed with exit code {result['returncode']}: {stderr}")

            logger.info("train_script_completed", stdout=stdout[:500], stderr=stderr[:500])

            # Load trained model from file if it exists
            if model_file.exists():
                with open(model_file, "rb") as f:
                    model = pickle.load(f)
                return model
            else:
                # Return metadata placeholder when no model file is created
                logger.info("train_script_no_model_file", model_file=str(model_file))
                return {
                    "model_type": "no_file",
                    "stdout": stdout,
                    "stderr": stderr,
                    "temp_dir": str(temp_dir),
                }

        finally:
            # Cleanup temp files
            import shutil

            shutil.rmtree(temp_dir, ignore_errors=True)

    async def on_predict(
        self,
        config: ConfigT,
        model: Any,
        historic: DataFrame,
        future: DataFrame,
        geo: FeatureCollection | None = None,
    ) -> DataFrame:
        """Make predictions by executing external prediction script (skips model file if placeholder)."""
        temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_predict_"))

        try:
            # Write config to YAML file
            config_file = temp_dir / "config.yml"
            config_file.write_text(yaml.safe_dump(config.model_dump(), indent=2))

            # Write model to file only if it's not a placeholder
            is_placeholder = isinstance(model, dict) and model.get("model_type") == "no_file"
            if is_placeholder:
                logger.info("predict_script_no_model_file", reason="model is placeholder")
                model_file = None
            else:
                model_file = temp_dir / f"model.{self.model_format}"
                with open(model_file, "wb") as f:
                    pickle.dump(model, f)

            # Write historic data
            historic_file = temp_dir / "historic.csv"
            historic.to_csv(historic_file)

            # Write future data to CSV
            future_file = temp_dir / "future.csv"
            future.to_csv(future_file)

            # Write geo data if provided
            geo_file = temp_dir / "geo.json" if geo else None
            if geo:
                assert geo_file is not None  # For type checker
                geo_file.write_text(geo.model_dump_json(indent=2))

            # Output file path
            output_file = temp_dir / "predictions.csv"

            # Substitute variables in command
            command = self.predict_command.format(
                config_file=str(config_file),
                model_file=str(model_file) if model_file else "",
                historic_file=str(historic_file),
                future_file=str(future_file),
                output_file=str(output_file),
                geo_file=str(geo_file) if geo_file else "",
            )

            logger.info("executing_predict_script", command=command, temp_dir=str(temp_dir))

            # Execute subprocess
            result = await run_shell(command, cwd=str(temp_dir))
            stdout = result["stdout"]
            stderr = result["stderr"]

            if result["returncode"] != 0:
                logger.error("predict_script_failed", exit_code=result["returncode"], stderr=stderr)
                raise RuntimeError(f"Prediction script failed with exit code {result['returncode']}: {stderr}")

            logger.info("predict_script_completed", stdout=stdout[:500], stderr=stderr[:500])

            # Load predictions from file
            if not output_file.exists():
                raise RuntimeError(f"Prediction script did not create output file at {output_file}")

            predictions = DataFrame.from_csv(output_file)
            return predictions

        finally:
            # Cleanup temp files
            import shutil

            shutil.rmtree(temp_dir, ignore_errors=True)

Functions

__init__(train_command, predict_command, model_format='pickle')

Initialize shell runner with command templates for train/predict operations.

Source code in src/chapkit/ml/runner.py
def __init__(
    self,
    train_command: str,
    predict_command: str,
    model_format: str = "pickle",
) -> None:
    """Initialize shell runner with command templates for train/predict operations."""
    self.train_command = train_command
    self.predict_command = predict_command
    self.model_format = model_format

on_train(config, data, geo=None) async

Train a model by executing external training script (model file creation is optional).

Source code in src/chapkit/ml/runner.py
async def on_train(
    self,
    config: ConfigT,
    data: DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model by executing external training script (model file creation is optional)."""
    temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_train_"))

    try:
        # Write config to YAML file
        config_file = temp_dir / "config.yml"
        config_file.write_text(yaml.safe_dump(config.model_dump(), indent=2))

        # Write training data to CSV
        data_file = temp_dir / "data.csv"
        data.to_csv(data_file)

        # Write geo data if provided
        geo_file = temp_dir / "geo.json" if geo else None
        if geo:
            assert geo_file is not None  # For type checker
            geo_file.write_text(geo.model_dump_json(indent=2))

        # Model file path
        model_file = temp_dir / f"model.{self.model_format}"

        # Substitute variables in command
        command = self.train_command.format(
            config_file=str(config_file),
            data_file=str(data_file),
            model_file=str(model_file),
            geo_file=str(geo_file) if geo_file else "",
        )

        logger.info("executing_train_script", command=command, temp_dir=str(temp_dir))

        # Execute subprocess
        result = await run_shell(command, cwd=str(temp_dir))
        stdout = result["stdout"]
        stderr = result["stderr"]

        if result["returncode"] != 0:
            logger.error("train_script_failed", exit_code=result["returncode"], stderr=stderr)
            raise RuntimeError(f"Training script failed with exit code {result['returncode']}: {stderr}")

        logger.info("train_script_completed", stdout=stdout[:500], stderr=stderr[:500])

        # Load trained model from file if it exists
        if model_file.exists():
            with open(model_file, "rb") as f:
                model = pickle.load(f)
            return model
        else:
            # Return metadata placeholder when no model file is created
            logger.info("train_script_no_model_file", model_file=str(model_file))
            return {
                "model_type": "no_file",
                "stdout": stdout,
                "stderr": stderr,
                "temp_dir": str(temp_dir),
            }

    finally:
        # Cleanup temp files
        import shutil

        shutil.rmtree(temp_dir, ignore_errors=True)

on_predict(config, model, historic, future, geo=None) async

Make predictions by executing external prediction script (skips model file if placeholder).

Source code in src/chapkit/ml/runner.py
async def on_predict(
    self,
    config: ConfigT,
    model: Any,
    historic: DataFrame,
    future: DataFrame,
    geo: FeatureCollection | None = None,
) -> DataFrame:
    """Make predictions by executing external prediction script (skips model file if placeholder)."""
    temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_predict_"))

    try:
        # Write config to YAML file
        config_file = temp_dir / "config.yml"
        config_file.write_text(yaml.safe_dump(config.model_dump(), indent=2))

        # Write model to file only if it's not a placeholder
        is_placeholder = isinstance(model, dict) and model.get("model_type") == "no_file"
        if is_placeholder:
            logger.info("predict_script_no_model_file", reason="model is placeholder")
            model_file = None
        else:
            model_file = temp_dir / f"model.{self.model_format}"
            with open(model_file, "wb") as f:
                pickle.dump(model, f)

        # Write historic data
        historic_file = temp_dir / "historic.csv"
        historic.to_csv(historic_file)

        # Write future data to CSV
        future_file = temp_dir / "future.csv"
        future.to_csv(future_file)

        # Write geo data if provided
        geo_file = temp_dir / "geo.json" if geo else None
        if geo:
            assert geo_file is not None  # For type checker
            geo_file.write_text(geo.model_dump_json(indent=2))

        # Output file path
        output_file = temp_dir / "predictions.csv"

        # Substitute variables in command
        command = self.predict_command.format(
            config_file=str(config_file),
            model_file=str(model_file) if model_file else "",
            historic_file=str(historic_file),
            future_file=str(future_file),
            output_file=str(output_file),
            geo_file=str(geo_file) if geo_file else "",
        )

        logger.info("executing_predict_script", command=command, temp_dir=str(temp_dir))

        # Execute subprocess
        result = await run_shell(command, cwd=str(temp_dir))
        stdout = result["stdout"]
        stderr = result["stderr"]

        if result["returncode"] != 0:
            logger.error("predict_script_failed", exit_code=result["returncode"], stderr=stderr)
            raise RuntimeError(f"Prediction script failed with exit code {result['returncode']}: {stderr}")

        logger.info("predict_script_completed", stdout=stdout[:500], stderr=stderr[:500])

        # Load predictions from file
        if not output_file.exists():
            raise RuntimeError(f"Prediction script did not create output file at {output_file}")

        predictions = DataFrame.from_csv(output_file)
        return predictions

    finally:
        # Cleanup temp files
        import shutil

        shutil.rmtree(temp_dir, ignore_errors=True)

Scheduler

Chapkit job scheduler with artifact tracking for ML/task workflows.

scheduler

Chapkit-specific job scheduler with artifact tracking.

Classes

ChapkitJobRecord

Bases: JobRecord

Job record extended with artifact_id tracking for ML/task workflows.

Source code in src/chapkit/scheduler.py
class ChapkitJobRecord(JobRecord):
    """Job record extended with artifact_id tracking for ML/task workflows."""

    artifact_id: ULID | None = Field(default=None, description="ID of artifact created by job (if job returns a ULID)")

ChapkitScheduler

Bases: InMemoryScheduler, ABC

Abstract base class for Chapkit job schedulers with artifact tracking.

Source code in src/chapkit/scheduler.py
class ChapkitScheduler(InMemoryScheduler, ABC):
    """Abstract base class for Chapkit job schedulers with artifact tracking."""

    async def get_record(self, job_id: ULID) -> ChapkitJobRecord:
        """Get complete job record with artifact_id if available."""
        raise NotImplementedError

    async def list_records(
        self, *, status_filter: JobStatus | None = None, reverse: bool = False
    ) -> list[ChapkitJobRecord]:
        """List all job records with optional status filtering."""
        raise NotImplementedError
Functions
get_record(job_id) async

Get complete job record with artifact_id if available.

Source code in src/chapkit/scheduler.py
async def get_record(self, job_id: ULID) -> ChapkitJobRecord:
    """Get complete job record with artifact_id if available."""
    raise NotImplementedError
list_records(*, status_filter=None, reverse=False) async

List all job records with optional status filtering.

Source code in src/chapkit/scheduler.py
async def list_records(
    self, *, status_filter: JobStatus | None = None, reverse: bool = False
) -> list[ChapkitJobRecord]:
    """List all job records with optional status filtering."""
    raise NotImplementedError

InMemoryChapkitScheduler

Bases: ChapkitScheduler

In-memory scheduler with automatic artifact tracking for jobs that return ULIDs.

Source code in src/chapkit/scheduler.py
class InMemoryChapkitScheduler(ChapkitScheduler):
    """In-memory scheduler with automatic artifact tracking for jobs that return ULIDs."""

    # Override with ChapkitJobRecord type to support artifact_id tracking
    # dict is invariant, but we always use ChapkitJobRecord in this subclass
    _records: dict[ULID, ChapkitJobRecord] = PrivateAttr(default_factory=dict)  # type: ignore[assignment]  # pyright: ignore[reportIncompatibleVariableOverride]

    async def add_job(
        self,
        target: JobTarget,
        /,
        *args: Any,
        **kwargs: Any,
    ) -> ULID:
        """Add a job to the scheduler and return its ID."""
        now = datetime.now(timezone.utc)
        jid = ULID()

        record = ChapkitJobRecord(
            id=jid,
            status=JobStatus.pending,
            submitted_at=now,
        )

        async with self._lock:
            if jid in self._tasks:
                raise RuntimeError(f"Job {jid!r} already scheduled")
            self._records[jid] = record

        async def _execute_target() -> Any:
            if inspect.isawaitable(target):
                if args or kwargs:
                    # Close the coroutine to avoid "coroutine was never awaited" warning
                    if inspect.iscoroutine(target):
                        target.close()
                    raise TypeError("Args/kwargs not supported when target is an awaitable object.")
                return await target
            if inspect.iscoroutinefunction(target):
                return await target(*args, **kwargs)
            return await asyncio.to_thread(target, *args, **kwargs)

        async def _runner() -> Any:
            if self._sema:
                async with self._sema:
                    return await self._run_with_state(jid, _execute_target)
            else:
                return await self._run_with_state(jid, _execute_target)

        task = asyncio.create_task(_runner(), name=f"{self.name}-job-{jid}")

        def _drain(t: asyncio.Task[Any]) -> None:
            try:
                t.result()
            except Exception:
                pass

        task.add_done_callback(_drain)

        async with self._lock:
            self._tasks[jid] = task

        return jid

    async def get_record(self, job_id: ULID) -> ChapkitJobRecord:
        """Get complete job record with artifact_id if available."""
        async with self._lock:
            if job_id not in self._records:
                raise KeyError(f"Job {job_id} not found")
            return self._records[job_id]

    async def list_records(
        self, *, status_filter: JobStatus | None = None, reverse: bool = False
    ) -> list[ChapkitJobRecord]:
        """List all job records with optional status filtering."""
        async with self._lock:
            records = list(self._records.values())
            if status_filter:
                records = [r for r in records if r.status == status_filter]
            if reverse:
                records = list(reversed(records))
            return records

    async def _run_with_state(
        self,
        jid: ULID,
        exec_fn: JobExecutor,
    ) -> Any:
        """Execute job function and track artifact_id if result is a ULID."""
        async with self._lock:
            rec = self._records[jid]
            rec.status = JobStatus.running
            rec.started_at = datetime.now(timezone.utc)

        try:
            result = await exec_fn()

            # Track artifact_id if job returns a ULID
            artifact_id: ULID | None = result if isinstance(result, ULID) else None

            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.completed
                rec.finished_at = datetime.now(timezone.utc)
                rec.artifact_id = artifact_id
                self._results[jid] = result

            return result

        except asyncio.CancelledError:
            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.canceled
                rec.finished_at = datetime.now(timezone.utc)

            raise

        except Exception as e:
            tb = traceback.format_exc()
            # Extract clean error message (exception type and message only)
            error_lines = tb.strip().split("\n")
            clean_error = error_lines[-1] if error_lines else str(e)

            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.failed
                rec.finished_at = datetime.now(timezone.utc)
                rec.error = clean_error
                rec.error_traceback = tb

            raise
Functions
add_job(target, /, *args, **kwargs) async

Add a job to the scheduler and return its ID.

Source code in src/chapkit/scheduler.py
async def add_job(
    self,
    target: JobTarget,
    /,
    *args: Any,
    **kwargs: Any,
) -> ULID:
    """Add a job to the scheduler and return its ID."""
    now = datetime.now(timezone.utc)
    jid = ULID()

    record = ChapkitJobRecord(
        id=jid,
        status=JobStatus.pending,
        submitted_at=now,
    )

    async with self._lock:
        if jid in self._tasks:
            raise RuntimeError(f"Job {jid!r} already scheduled")
        self._records[jid] = record

    async def _execute_target() -> Any:
        if inspect.isawaitable(target):
            if args or kwargs:
                # Close the coroutine to avoid "coroutine was never awaited" warning
                if inspect.iscoroutine(target):
                    target.close()
                raise TypeError("Args/kwargs not supported when target is an awaitable object.")
            return await target
        if inspect.iscoroutinefunction(target):
            return await target(*args, **kwargs)
        return await asyncio.to_thread(target, *args, **kwargs)

    async def _runner() -> Any:
        if self._sema:
            async with self._sema:
                return await self._run_with_state(jid, _execute_target)
        else:
            return await self._run_with_state(jid, _execute_target)

    task = asyncio.create_task(_runner(), name=f"{self.name}-job-{jid}")

    def _drain(t: asyncio.Task[Any]) -> None:
        try:
            t.result()
        except Exception:
            pass

    task.add_done_callback(_drain)

    async with self._lock:
        self._tasks[jid] = task

    return jid
get_record(job_id) async

Get complete job record with artifact_id if available.

Source code in src/chapkit/scheduler.py
async def get_record(self, job_id: ULID) -> ChapkitJobRecord:
    """Get complete job record with artifact_id if available."""
    async with self._lock:
        if job_id not in self._records:
            raise KeyError(f"Job {job_id} not found")
        return self._records[job_id]
list_records(*, status_filter=None, reverse=False) async

List all job records with optional status filtering.

Source code in src/chapkit/scheduler.py
async def list_records(
    self, *, status_filter: JobStatus | None = None, reverse: bool = False
) -> list[ChapkitJobRecord]:
    """List all job records with optional status filtering."""
    async with self._lock:
        records = list(self._records.values())
        if status_filter:
            records = [r for r in records if r.status == status_filter]
        if reverse:
            records = list(reversed(records))
        return records

API Layer

FastAPI-specific components built on servicekit.

Dependencies

FastAPI dependency injection functions.

dependencies

Feature-specific FastAPI dependency injection for managers.

Classes

Functions

get_config_manager(session) async

Get a config manager instance for dependency injection.

Source code in src/chapkit/api/dependencies.py
async def get_config_manager(session: Annotated[AsyncSession, Depends(get_session)]) -> ConfigManager[BaseConfig]:
    """Get a config manager instance for dependency injection."""
    repo = ConfigRepository(session)
    return ConfigManager[BaseConfig](repo, BaseConfig)

get_artifact_manager(session) async

Get an artifact manager instance for dependency injection.

Source code in src/chapkit/api/dependencies.py
async def get_artifact_manager(session: Annotated[AsyncSession, Depends(get_session)]) -> ArtifactManager:
    """Get an artifact manager instance for dependency injection."""
    artifact_repo = ArtifactRepository(session)
    return ArtifactManager(artifact_repo)

get_ml_manager() async

Get an ML manager instance for dependency injection.

Note: This is a placeholder. The actual dependency is built by ServiceBuilder with the runner in closure, then overridden via app.dependency_overrides.

Source code in src/chapkit/api/dependencies.py
async def get_ml_manager() -> MLManager:
    """Get an ML manager instance for dependency injection.

    Note: This is a placeholder. The actual dependency is built by ServiceBuilder
    with the runner in closure, then overridden via app.dependency_overrides.
    """
    raise RuntimeError("ML manager dependency not configured. Use ServiceBuilder.with_ml() to enable ML operations.")

Alembic Helpers

Reusable migration helpers for chapkit database tables.

alembic_helpers

Reusable Alembic migration helpers for chapkit tables.

This module provides helper functions for creating and dropping chapkit's database tables in Alembic migrations. Using helpers instead of raw Alembic operations provides:

  • Reusability across migrations
  • Consistent table definitions
  • Clear documentation
  • Easier maintenance

Users can create their own helper modules following this pattern for custom tables.

Example

In your migration file

from chapkit.alembic_helpers import create_configs_table, drop_configs_table

def upgrade() -> None: create_configs_table(op)

def downgrade() -> None: drop_configs_table(op)

Creating Your Own Helpers

Follow the same pattern for your custom tables:

myapp/alembic_helpers.py

def create_users_table(op: Any) -> None: '''Create users table.''' op.create_table( 'users', sa.Column('email', sa.String(), nullable=False), sa.Column('name', sa.String(), nullable=False), sa.Column('id', servicekit.types.ULIDType(length=26), nullable=False), sa.Column('created_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False), sa.Column('updated_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False), sa.Column('tags', sa.JSON(), nullable=False, server_default='[]'), sa.PrimaryKeyConstraint('id'), ) op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=False)

def drop_users_table(op: Any) -> None: '''Drop users table.''' op.drop_index(op.f('ix_users_email'), table_name='users') op.drop_table('users')

See examples/custom_migrations/ for a complete working example.

Functions

create_artifacts_table(op)

Create artifacts table for hierarchical artifact storage.

Source code in src/chapkit/alembic_helpers.py
def create_artifacts_table(op: Any) -> None:
    """Create artifacts table for hierarchical artifact storage."""
    op.create_table(
        "artifacts",
        sa.Column("parent_id", servicekit.types.ULIDType(length=26), nullable=True),
        sa.Column("data", sa.PickleType(), nullable=False),
        sa.Column("level", sa.Integer(), nullable=False),
        sa.Column("id", servicekit.types.ULIDType(length=26), nullable=False),
        sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.Column("updated_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.Column("tags", sa.JSON(), nullable=False, server_default="[]"),
        sa.ForeignKeyConstraint(["parent_id"], ["artifacts.id"], ondelete="SET NULL"),
        sa.PrimaryKeyConstraint("id"),
    )
    op.create_index(op.f("ix_artifacts_level"), "artifacts", ["level"], unique=False)
    op.create_index(op.f("ix_artifacts_parent_id"), "artifacts", ["parent_id"], unique=False)

drop_artifacts_table(op)

Drop artifacts table.

Source code in src/chapkit/alembic_helpers.py
def drop_artifacts_table(op: Any) -> None:
    """Drop artifacts table."""
    op.drop_index(op.f("ix_artifacts_parent_id"), table_name="artifacts")
    op.drop_index(op.f("ix_artifacts_level"), table_name="artifacts")
    op.drop_table("artifacts")

create_configs_table(op)

Create configs table for configuration storage.

Source code in src/chapkit/alembic_helpers.py
def create_configs_table(op: Any) -> None:
    """Create configs table for configuration storage."""
    op.create_table(
        "configs",
        sa.Column("name", sa.String(), nullable=False),
        sa.Column("data", sa.JSON(), nullable=False),
        sa.Column("id", servicekit.types.ULIDType(length=26), nullable=False),
        sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.Column("updated_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.Column("tags", sa.JSON(), nullable=False, server_default="[]"),
        sa.PrimaryKeyConstraint("id"),
    )
    op.create_index(op.f("ix_configs_name"), "configs", ["name"], unique=False)

drop_configs_table(op)

Drop configs table.

Source code in src/chapkit/alembic_helpers.py
def drop_configs_table(op: Any) -> None:
    """Drop configs table."""
    op.drop_index(op.f("ix_configs_name"), table_name="configs")
    op.drop_table("configs")

create_config_artifacts_table(op)

Create config_artifacts junction table linking configs to artifacts.

Source code in src/chapkit/alembic_helpers.py
def create_config_artifacts_table(op: Any) -> None:
    """Create config_artifacts junction table linking configs to artifacts."""
    op.create_table(
        "config_artifacts",
        sa.Column("config_id", servicekit.types.ULIDType(length=26), nullable=False),
        sa.Column("artifact_id", servicekit.types.ULIDType(length=26), nullable=False),
        sa.ForeignKeyConstraint(["artifact_id"], ["artifacts.id"], ondelete="CASCADE"),
        sa.ForeignKeyConstraint(["config_id"], ["configs.id"], ondelete="CASCADE"),
        sa.PrimaryKeyConstraint("config_id", "artifact_id"),
        sa.UniqueConstraint("artifact_id"),
        sa.UniqueConstraint("artifact_id", name="uq_artifact_id"),
    )

drop_config_artifacts_table(op)

Drop config_artifacts junction table.

Source code in src/chapkit/alembic_helpers.py
def drop_config_artifacts_table(op: Any) -> None:
    """Drop config_artifacts junction table."""
    op.drop_table("config_artifacts")

create_tasks_table(op)

Create tasks table for task execution infrastructure.

Source code in src/chapkit/alembic_helpers.py
def create_tasks_table(op: Any) -> None:
    """Create tasks table for task execution infrastructure."""
    op.create_table(
        "tasks",
        sa.Column("command", sa.Text(), nullable=False),
        sa.Column("task_type", sa.Text(), nullable=False, server_default="shell"),
        sa.Column("parameters", sa.JSON(), nullable=True),
        sa.Column("enabled", sa.Boolean(), nullable=False, server_default="1"),
        sa.Column("id", servicekit.types.ULIDType(length=26), nullable=False),
        sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.Column("updated_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
        sa.Column("tags", sa.JSON(), nullable=False, server_default="[]"),
        sa.PrimaryKeyConstraint("id"),
    )

drop_tasks_table(op)

Drop tasks table.

Source code in src/chapkit/alembic_helpers.py
def drop_tasks_table(op: Any) -> None:
    """Drop tasks table."""
    op.drop_table("tasks")