Skip to content

Task Execution

Chapkit provides a registry-based task execution system for running Python functions synchronously with dependency injection and tag-based organization.

Quick Start

Basic Task Registration

from chapkit.task import TaskRegistry, TaskExecutor, TaskRouter
from chapkit.api import ServiceBuilder, ServiceInfo
from servicekit import Database
from servicekit.api.dependencies import get_database
from fastapi import Depends

# Clear registry on module reload (for development)
TaskRegistry.clear()

# Register Python task with tags
@TaskRegistry.register("greet_user", tags=["demo", "simple"])
async def greet_user(name: str = "World") -> dict[str, str]:
    """Simple task that returns a greeting."""
    return {"message": f"Hello, {name}!"}

# Task with dependency injection
@TaskRegistry.register("process_data", tags=["demo", "injection"])
async def process_data(database: Database) -> dict[str, object]:
    """Dependencies are automatically injected."""
    return {"status": "processed", "database_url": str(database.url)}

# Build service
info = ServiceInfo(display_name="Task Service")

def get_task_executor(database: Database = Depends(get_database)) -> TaskExecutor:
    """Provide task executor for dependency injection."""
    return TaskExecutor(database)

task_router = TaskRouter.create(
    prefix="/api/v1/tasks",
    tags=["Tasks"],
    executor_factory=get_task_executor,
)

app = (
    ServiceBuilder(info=info)
    .with_health()
    .include_router(task_router.router)
    .build()
)

Run: fastapi dev your_file.py


Architecture

Task Registration

Python Functions: - Registered with @TaskRegistry.register(name, tags=[]) - URL-safe names (alphanumeric, underscore, hyphen only) - Automatic dependency injection - Return dict with results - Async or sync functions supported

Execution Flow

1. Task Registered (in-memory)
   @TaskRegistry.register("task_name", tags=["tag1"])

2. Task Discovery
   GET /api/v1/tasks
   GET /api/v1/tasks/task_name

3. Task Execution
   POST /api/v1/tasks/task_name/$execute
   ├─> Dependencies injected
   ├─> Function executed synchronously
   └─> Result returned in response (200 OK)

Core Concepts

TaskRegistry

Global in-memory registry for Python task functions.

from chapkit.task import TaskRegistry

# Register with decorator
@TaskRegistry.register("my_task", tags=["processing", "etl"])
async def my_task(param: str) -> dict[str, object]:
    """Task docstring."""
    return {"result": param.upper()}

# Or register imperatively
def another_task(x: int) -> dict[str, int]:
    """Another task."""
    return {"doubled": x * 2}

TaskRegistry.register_function("double_it", another_task, tags=["math"])

# Check registration
assert TaskRegistry.has("my_task")

# Get task metadata
info = TaskRegistry.get_info("my_task")
print(info.signature)  # (param: str) -> dict[str, object]
print(info.tags)       # ["processing", "etl"]

# List all tasks
all_tasks = TaskRegistry.list_all()  # ["my_task", "double_it"]

# Filter by tags (requires ALL tags)
math_tasks = TaskRegistry.list_by_tags(["math"])  # ["double_it"]

Rules: - Task names must be URL-safe: ^[a-zA-Z0-9_-]+$ - Task names must be unique - Functions should return dict or None - Both async and sync functions supported - Parameters can have defaults

Tags

Tasks can be tagged for organization:

@TaskRegistry.register("extract_data", tags=["data", "etl", "extract"])
async def extract_data() -> dict:
    """Extract data from source."""
    return {"records": 100}

@TaskRegistry.register("transform_data", tags=["data", "etl", "transform"])
async def transform_data() -> dict:
    """Transform extracted data."""
    return {"transformed": True}

# Filter tasks that have ALL specified tags
etl_tasks = TaskRegistry.list_by_tags(["data", "etl"])
# Returns: ["extract_data", "transform_data"]

extract_tasks = TaskRegistry.list_by_tags(["etl", "extract"])
# Returns: ["extract_data"]

Dependency Injection

Tasks can request framework dependencies as function parameters:

from servicekit import Database
from sqlalchemy.ext.asyncio import AsyncSession

@TaskRegistry.register("with_dependencies")
async def with_dependencies(
    database: Database,
    session: AsyncSession,
    custom_param: str = "default"
) -> dict[str, object]:
    """Dependencies automatically injected at runtime."""
    # Framework types are injected, user params come from request
    return {"database_url": str(database.url), "custom_param": custom_param}

Available Injectable Types: - AsyncSession - Database session (always available) - Database - Database instance (always available) - ChapkitScheduler - Job scheduler (available when .with_jobs() is configured) - ArtifactManager - Artifact manager (available when .with_artifacts() is configured and passed to TaskExecutor)

Note: Parameters are categorized automatically: - Framework types (in INJECTABLE_TYPES) are injected when available - All other parameters must be provided in execution request

Simple setup (default):

def get_task_executor(database: Database = Depends(get_database)) -> TaskExecutor:
    return TaskExecutor(database)

Advanced setup (with scheduler + artifacts):

from servicekit.api.dependencies import get_scheduler
from chapkit.artifact import ArtifactHierarchy, ArtifactManager, ArtifactRepository

TASK_HIERARCHY = ArtifactHierarchy(name="task_results", level_labels={0: "task_run"})

async def get_task_executor(
    database: Database = Depends(get_database),
    scheduler = Depends(get_scheduler),
) -> TaskExecutor:
    async with database.session() as session:
        artifact_repo = ArtifactRepository(session)
        artifact_manager = ArtifactManager(artifact_repo, hierarchy=TASK_HIERARCHY)
        return TaskExecutor(database, scheduler, artifact_manager)

# Also add to service builder:
app = (
    ServiceBuilder(info=info)
    .with_jobs(max_concurrency=5)
    .with_artifacts(hierarchy=TASK_HIERARCHY)
    .include_router(task_router.router)
    .build()
)

TaskInfo Schema

When you retrieve task metadata, you get a TaskInfo object:

class TaskInfo(BaseModel):
    name: str                        # URL-safe task name
    docstring: str | None            # Function docstring
    signature: str                   # Function signature string
    parameters: list[ParameterInfo]  # Parameter metadata
    tags: list[str]                  # Task tags

class ParameterInfo(BaseModel):
    name: str                 # Parameter name
    annotation: str | None    # Type annotation as string
    default: str | None       # Default value as string
    required: bool            # Whether parameter is required

API Endpoints

GET /api/v1/tasks

List all registered tasks with metadata.

Response:

[
  {
    "name": "greet_user",
    "docstring": "Simple task that returns a greeting.",
    "signature": "(name: str = 'World') -> dict[str, str]",
    "parameters": [
      {
        "name": "name",
        "annotation": "<class 'str'>",
        "default": "'World'",
        "required": false
      }
    ],
    "tags": ["demo", "simple"]
  }
]

Example:

# List all tasks
curl http://localhost:8000/api/v1/tasks

GET /api/v1/tasks/{name}

Get task metadata by URL-safe name.

Response:

{
  "name": "greet_user",
  "docstring": "Simple task that returns a greeting.",
  "signature": "(name: str = 'World') -> dict[str, str]",
  "parameters": [
    {
      "name": "name",
      "annotation": "<class 'str'>",
      "default": "'World'",
      "required": false
    }
  ],
  "tags": ["demo", "simple"]
}

Errors: - 404 Not Found: Task not registered

POST /api/v1/tasks/{name}/$execute

Execute task by name with runtime parameters.

Request:

{
  "params": {
    "name": "Alice"
  }
}

Response (200 OK):

{
  "task_name": "greet_user",
  "params": {"name": "Alice"},
  "result": {"message": "Hello, Alice!"},
  "error": null
}

Response on Error (200 OK):

{
  "task_name": "greet_user",
  "params": {"name": "Alice"},
  "result": null,
  "error": {
    "type": "ValueError",
    "message": "Invalid parameter",
    "traceback": "Traceback (most recent call last)..."
  }
}

Errors: - 404 Not Found: Task not registered

Examples:

# Execute without parameters
curl -X POST http://localhost:8000/api/v1/tasks/greet_user/\$execute \
  -H "Content-Type: application/json" \
  -d '{}'

# Execute with parameters
curl -X POST http://localhost:8000/api/v1/tasks/greet_user/\$execute \
  -H "Content-Type: application/json" \
  -d '{"params": {"name": "Bob"}}'

# Execute task with multiple parameters
curl -X POST http://localhost:8000/api/v1/tasks/multiply_numbers/\$execute \
  -H "Content-Type: application/json" \
  -d '{"params": {"a": 6, "b": 7}}'


Task Patterns

Simple Task

@TaskRegistry.register("hello", tags=["demo"])
async def hello() -> dict[str, str]:
    """Simple hello world task."""
    return {"message": "Hello!"}

Task with Parameters

@TaskRegistry.register("add", tags=["math"])
async def add(a: int, b: int) -> dict[str, int]:
    """Add two numbers."""
    return {"result": a + b}

# Execute:
# POST /api/v1/tasks/add/$execute
# {"params": {"a": 5, "b": 3}}

Task with Optional Parameters

@TaskRegistry.register("greet", tags=["demo"])
async def greet(name: str = "World", greeting: str = "Hello") -> dict[str, str]:
    """Greet someone."""
    return {"message": f"{greeting}, {name}!"}

# Execute with defaults:
# POST /api/v1/tasks/greet/$execute
# {}

# Execute with custom values:
# POST /api/v1/tasks/greet/$execute
# {"params": {"name": "Alice", "greeting": "Hi"}}

Task with Dependency Injection

@TaskRegistry.register("store_result", tags=["storage"])
async def store_result(
    artifact_manager: ArtifactManager,
    data: dict
) -> dict[str, object]:
    """Store result in artifact."""
    artifact = await artifact_manager.save(ArtifactIn(data=data))
    return {"artifact_id": str(artifact.id)}

# Execute:
# POST /api/v1/tasks/store_result/$execute
# {"params": {"data": {"key": "value"}}}
# Note: artifact_manager is injected, only data needs to be provided

Database Query Task

@TaskRegistry.register("count_users", tags=["database", "reporting"])
async def count_users(database: Database) -> dict[str, int]:
    """Count users in database."""
    async with database.session() as session:
        from sqlalchemy import select, func
        from myapp.models import User

        stmt = select(func.count(User.id))
        result = await session.execute(stmt)
        count = result.scalar()

    return {"user_count": count}

File Processing Task

@TaskRegistry.register("process_csv", tags=["data", "processing"])
async def process_csv(filepath: str) -> dict[str, object]:
    """Process CSV file."""
    import pandas as pd

    df = pd.read_csv(filepath)
    summary = {
        "rows": len(df),
        "columns": list(df.columns),
        "summary": df.describe().to_dict()
    }

    return summary

Synchronous Task

@TaskRegistry.register("multiply", tags=["math"])
def multiply(a: int, b: int) -> dict[str, int]:
    """Synchronous task that multiplies numbers."""
    # Sync functions are automatically wrapped for async execution
    return {"result": a * b}

Shell Command Task

from chapkit.task import run_shell

@TaskRegistry.register("backup_database", tags=["admin", "backup"])
async def backup_database(database_url: str, s3_bucket: str) -> dict[str, object]:
    """Backup database to S3 using shell commands."""
    # Dump database
    dump_result = await run_shell(
        f"pg_dump {database_url} | gzip > /tmp/backup.sql.gz",
        timeout=300.0
    )

    if dump_result["returncode"] != 0:
        return {
            "status": "failed",
            "step": "dump",
            "error": dump_result["stderr"]
        }

    # Upload to S3
    upload_result = await run_shell(
        f"aws s3 cp /tmp/backup.sql.gz s3://{s3_bucket}/backup.sql.gz",
        timeout=60.0
    )

    if upload_result["returncode"] != 0:
        return {
            "status": "failed",
            "step": "upload",
            "error": upload_result["stderr"]
        }

    return {
        "status": "success",
        "size": len(dump_result["stdout"])
    }

# Simple shell command task
@TaskRegistry.register("run_command", tags=["demo", "subprocess"])
async def run_command(command: str) -> dict[str, object]:
    """Run a shell command."""
    return await run_shell(command)

# Shell command with custom working directory and timeout
@TaskRegistry.register("list_files", tags=["filesystem"])
async def list_files(directory: str = ".") -> dict[str, object]:
    """List files in directory."""
    result = await run_shell("ls -la", cwd=directory, timeout=5.0)
    return {
        "directory": directory,
        "output": result["stdout"],
        "success": result["returncode"] == 0
    }

run_shell() options: - command: str - Shell command to execute - timeout: float | None - Optional timeout in seconds - cwd: str | Path | None - Optional working directory - env: dict[str, str] | None - Optional environment variables

Returns dict with: - command: str - The command that was executed - stdout: str - Standard output (decoded) - stderr: str - Standard error (decoded) - returncode: int - Exit code (0 = success, -1 = timeout)

Note: run_shell() never raises exceptions for non-zero exit codes. Always check returncode in the result.


Advanced Patterns

When to Use Simple vs Advanced Setup

Use Simple Setup (default) when: - Tasks execute quickly (< 5 seconds) - Results can be returned directly in HTTP response - No need for background job scheduling - No need for persistent artifact storage

Use Advanced Setup when: - Tasks need to spawn background jobs for long-running operations - Tasks need to store results in artifact hierarchy for audit/retrieval - Tasks need job scheduling capabilities (retry, scheduling, etc.) - Building a system with complex task orchestration

Use ML Module instead when: - Building train/predict workflows - Need versioned model storage - Need experiment tracking - Need standardized ML pipeline

Task with Background Job Scheduling

When a task needs to spawn long-running background work:

from chapkit.scheduler import ChapkitScheduler

@TaskRegistry.register("spawn_background_job", tags=["admin", "background"])
async def spawn_background_job(
    scheduler: ChapkitScheduler,
    processing_time: int = 60
) -> dict[str, object]:
    """Task that spawns a background job for long-running work."""

    async def background_work():
        """The actual long-running work."""
        import asyncio
        await asyncio.sleep(processing_time)
        return {"status": "completed", "processing_time": processing_time}

    # Spawn background job
    job_id = await scheduler.spawn(
        background_work(),
        description=f"Background processing ({processing_time}s)"
    )

    return {
        "message": "Background job started",
        "job_id": str(job_id),
        "check_status": f"/api/v1/jobs/{job_id}"
    }

# Execute:
# POST /api/v1/tasks/spawn_background_job/$execute
# {"params": {"processing_time": 120}}
#
# Response (immediate):
# {
#   "task_name": "spawn_background_job",
#   "result": {
#     "message": "Background job started",
#     "job_id": "01234567-89ab-cdef-0123-456789abcdef",
#     "check_status": "/api/v1/jobs/01234567-89ab-cdef-0123-456789abcdef"
#   }
# }
#
# Then check job status:
# GET /api/v1/jobs/01234567-89ab-cdef-0123-456789abcdef

Task with Artifact Storage

When a task needs to store results in the artifact hierarchy:

from chapkit.artifact import ArtifactManager, ArtifactIn

@TaskRegistry.register("store_analysis_results", tags=["analytics", "storage"])
async def store_analysis_results(
    artifact_manager: ArtifactManager,
    dataset_name: str,
    analysis_type: str
) -> dict[str, object]:
    """Task that stores analysis results as artifacts."""

    # Perform analysis
    results = {
        "dataset": dataset_name,
        "type": analysis_type,
        "metrics": {
            "accuracy": 0.95,
            "precision": 0.92,
            "recall": 0.89
        },
        "timestamp": "2024-01-15T10:30:00Z"
    }

    # Store in artifact hierarchy
    artifact = await artifact_manager.save(
        ArtifactIn(
            data=results,
            metadata={
                "dataset": dataset_name,
                "analysis_type": analysis_type
            }
        )
    )

    return {
        "status": "stored",
        "artifact_id": str(artifact.id),
        "retrieve_url": f"/api/v1/artifacts/{artifact.id}"
    }

# Execute:
# POST /api/v1/tasks/store_analysis_results/$execute
# {"params": {"dataset_name": "customer_churn", "analysis_type": "classification"}}
#
# Response:
# {
#   "task_name": "store_analysis_results",
#   "result": {
#     "status": "stored",
#     "artifact_id": "01234567-89ab-cdef-0123-456789abcdef",
#     "retrieve_url": "/api/v1/artifacts/01234567-89ab-cdef-0123-456789abcdef"
#   }
# }

Task with Both Scheduler and Artifacts

Combining job scheduling with artifact storage for complex workflows:

@TaskRegistry.register("orchestrate_pipeline", tags=["pipeline", "orchestration"])
async def orchestrate_pipeline(
    scheduler: ChapkitScheduler,
    artifact_manager: ArtifactManager,
    pipeline_config: dict
) -> dict[str, object]:
    """Task that orchestrates multi-step pipeline with job tracking and artifact storage."""

    # Store pipeline configuration as artifact
    config_artifact = await artifact_manager.save(
        ArtifactIn(
            data=pipeline_config,
            metadata={"type": "pipeline_config"}
        )
    )

    # Define pipeline steps as background jobs
    async def step_1():
        result = {"step": 1, "status": "completed", "data": [1, 2, 3]}
        # Store step result
        await artifact_manager.save(ArtifactIn(
            data=result,
            metadata={"step": 1, "pipeline_config_id": str(config_artifact.id)}
        ))
        return result

    async def step_2():
        result = {"step": 2, "status": "completed", "data": [4, 5, 6]}
        await artifact_manager.save(ArtifactIn(
            data=result,
            metadata={"step": 2, "pipeline_config_id": str(config_artifact.id)}
        ))
        return result

    # Spawn jobs for each step
    job_1 = await scheduler.spawn(step_1(), description="Pipeline Step 1")
    job_2 = await scheduler.spawn(step_2(), description="Pipeline Step 2")

    return {
        "message": "Pipeline started",
        "config_artifact_id": str(config_artifact.id),
        "jobs": {
            "step_1": str(job_1),
            "step_2": str(job_2)
        },
        "monitor": {
            "jobs": "/api/v1/jobs",
            "artifacts": f"/api/v1/artifacts/{config_artifact.id}/$expand"
        }
    }

Advanced Setup Configuration

Complete service setup with scheduler and artifact injection:

from fastapi import Depends, FastAPI
from servicekit import Database
from servicekit.api.dependencies import get_database, get_scheduler

from chapkit.api import ServiceBuilder, ServiceInfo
from chapkit.artifact import ArtifactHierarchy, ArtifactManager, ArtifactRepository
from chapkit.scheduler import ChapkitScheduler
from chapkit.task import TaskExecutor, TaskRouter

# Define artifact hierarchy for task results
TASK_HIERARCHY = ArtifactHierarchy(
    name="task_results",
    level_labels={0: "task_run"}
)

# Advanced executor factory with scheduler and artifacts
async def get_task_executor(
    database: Database = Depends(get_database),
    scheduler = Depends(get_scheduler),
) -> TaskExecutor:
    """Provide task executor with scheduler and artifact manager."""
    async with database.session() as session:
        artifact_repo = ArtifactRepository(session)
        artifact_manager = ArtifactManager(artifact_repo, hierarchy=TASK_HIERARCHY)
        if isinstance(scheduler, ChapkitScheduler):
            return TaskExecutor(database, scheduler, artifact_manager)
        return TaskExecutor(database)

# Create task router
task_router = TaskRouter.create(
    prefix="/api/v1/tasks",
    tags=["Tasks"],
    executor_factory=get_task_executor,
)

# Build service with jobs and artifacts
app: FastAPI = (
    ServiceBuilder(info=ServiceInfo(display_name="Advanced Task Service"))
    .with_landing_page()
    .with_logging()
    .with_health()
    .with_system()
    .with_jobs(max_concurrency=5)          # Enable job scheduler
    .with_artifacts(hierarchy=TASK_HIERARCHY)  # Enable artifact storage
    .include_router(task_router.router)
    .build()
)

Decision Guide: Tasks vs ML Module

Use Task Execution when: - Running general-purpose Python functions - Need flexible dependency injection - Building custom workflows - Tasks are ephemeral or self-contained - Mix of data processing, admin, ETL, etc.

Use ML Module when: - Specifically doing ML train/predict - Need standardized ML workflow - Need model versioning - Need experiment tracking - Want automatic model artifact management

Example: When to use each

# Use Task Execution for this:
@TaskRegistry.register("process_user_data")
async def process_user_data(database: Database, user_id: str):
    """Custom business logic."""
    # ... custom processing
    return {"processed": True}

# Use ML Module for this:
from chapkit.ml import MLManager
ml_manager = MLManager(...)
await ml_manager.train(TrainRequest(
    model_name="customer_churn",
    parameters={"n_estimators": 100}
))


Complete Workflow Example

# Start service
fastapi dev main.py

# List all tasks
curl http://localhost:8000/api/v1/tasks | jq

# Get task metadata
curl http://localhost:8000/api/v1/tasks/greet_user | jq

# Execute task and get result immediately
curl -s -X POST http://localhost:8000/api/v1/tasks/greet_user/\$execute \
  -H "Content-Type: application/json" \
  -d '{"params": {"name": "Alice"}}' | jq

# Expected response:
# {
#   "task_name": "greet_user",
#   "params": {"name": "Alice"},
#   "result": {"message": "Hello, Alice!"},
#   "error": null
# }

Result Storage

Task execution results are ephemeral - they are returned directly in the HTTP response and not persisted.

For task history/persistence needs: Use the ML module instead, which provides artifact-based storage for train/predict workflows.

Response structure for successful execution:

{
    "task_name": "greet_user",
    "params": {"name": "Alice"},
    "result": {"message": "Hello, Alice!"},
    "error": null
}

Response structure for failed execution:

{
    "task_name": "failing_task",
    "params": {},
    "result": null,
    "error": {
        "type": "ValueError",
        "message": "Something went wrong",
        "traceback": "Traceback (most recent call last)..."
    }
}


Testing

Unit Tests

import pytest
from servicekit import Database, SqliteDatabaseBuilder
from chapkit.task import TaskRegistry, TaskExecutor

# Clear registry before tests
TaskRegistry.clear()

@TaskRegistry.register("test_task", tags=["test"])
async def test_task(value: str) -> dict[str, str]:
    """Test task."""
    return {"result": value.upper()}

@pytest.fixture
async def database() -> Database:
    """Create in-memory database for testing."""
    db = SqliteDatabaseBuilder().in_memory().build()
    await db.init()
    return db

@pytest.fixture
def task_executor(database: Database) -> TaskExecutor:
    """Create task executor."""
    return TaskExecutor(database)

@pytest.mark.asyncio
async def test_task_execution(task_executor: TaskExecutor):
    """Test task execution returns result directly."""
    # Execute task
    result = await task_executor.execute("test_task", {"value": "hello"})

    # Verify result
    assert result["result"] == "HELLO"

@pytest.mark.asyncio
async def test_task_registry():
    """Test task registry."""
    # Verify registration
    assert TaskRegistry.has("test_task")

    # Get metadata
    info = TaskRegistry.get_info("test_task")
    assert info.name == "test_task"
    assert info.tags == ["test"]

@pytest.mark.asyncio
async def test_dependency_injection(database: Database, task_executor: TaskExecutor):
    """Test dependency injection."""
    @TaskRegistry.register("test_injection")
    async def task_with_db(db: Database, param: str) -> dict[str, str]:
        """Task with injected database."""
        return {"param": param, "db_injected": db is not None}

    result = await task_executor.execute("test_injection", {"param": "test"})
    assert result["param"] == "test"
    assert result["db_injected"] is True

Integration Tests

from fastapi.testclient import TestClient

def test_task_workflow(client: TestClient):
    """Test complete task workflow."""
    # List tasks
    response = client.get("/api/v1/tasks")
    assert response.status_code == 200
    tasks = response.json()
    assert len(tasks) > 0

    # Get task metadata
    task_name = tasks[0]["name"]
    response = client.get(f"/api/v1/tasks/{task_name}")
    assert response.status_code == 200
    task_info = response.json()
    assert task_info["name"] == task_name

    # Execute task - gets result immediately
    exec_response = client.post(
        f"/api/v1/tasks/{task_name}/$execute",
        json={"params": {"name": "Test"}}
    )
    assert exec_response.status_code == 200
    data = exec_response.json()

    # Verify response structure
    assert data["task_name"] == task_name
    assert data["params"] == {"name": "Test"}
    assert data["result"] is not None
    assert data["error"] is None

def test_task_error_handling(client: TestClient):
    """Test task error handling."""
    # Execute task that will fail
    response = client.post(
        "/api/v1/tasks/failing_task/$execute",
        json={"params": {}}
    )
    assert response.status_code == 200
    data = response.json()

    # Error captured in response
    assert data["result"] is None
    assert data["error"] is not None
    assert "type" in data["error"]
    assert "message" in data["error"]
    assert "traceback" in data["error"]

Production Considerations

Error Handling

Tasks should handle errors gracefully:

@TaskRegistry.register("safe_task", tags=["production"])
async def safe_task(risky_param: str) -> dict[str, object]:
    """Task with error handling."""
    try:
        result = process_risky_operation(risky_param)
        return {"status": "success", "result": result}
    except Exception as e:
        # Error will be captured in response automatically
        # but you can also return error status for app-level handling
        return {"status": "error", "error": str(e)}

Note: Even if a task raises an exception, the TaskExecutor will catch it and return the error in the HTTP response with full traceback. Tasks execute synchronously and return results/errors directly.

Execution Timeout

Since tasks execute synchronously, consider the HTTP request timeout:

@TaskRegistry.register("quick_task", tags=["production"])
async def quick_task(data: dict) -> dict[str, object]:
    """Task should complete quickly (< 30s recommended)."""
    # Process data quickly
    result = fast_processing(data)
    return {"result": result}

# For longer operations, use advanced setup with scheduler
@TaskRegistry.register("long_operation", tags=["production"])
async def long_operation(scheduler: ChapkitScheduler, data: dict) -> dict[str, object]:
    """Spawn background job for long-running work."""
    async def background_work():
        return slow_processing(data)

    job_id = await scheduler.spawn(background_work(), description="Long operation")
    return {
        "status": "started",
        "job_id": str(job_id),
        "check_at": f"/api/v1/jobs/{job_id}"
    }

Concurrency Control (Advanced Setup Only)

When using advanced setup with .with_jobs(), you can limit concurrent background jobs:

app = (
    ServiceBuilder(info=ServiceInfo(display_name="Task Service"))
    .with_jobs(max_concurrency=5)  # Max 5 concurrent background jobs
    .with_artifacts(hierarchy=TASK_HIERARCHY)
    .build()
)

Note: This only limits background jobs spawned by tasks, not the synchronous task execution itself. FastAPI handles concurrent HTTP requests based on its own worker configuration.

Long-Running Tasks (Advanced Setup)

For tasks that need progress tracking, use advanced setup with artifacts:

from chapkit.artifact import ArtifactManager, ArtifactIn

@TaskRegistry.register("long_task", tags=["processing", "batch"])
async def long_task(artifact_manager: ArtifactManager) -> dict[str, object]:
    """Task with progress tracking via artifacts."""
    total_steps = 10
    results = []

    for i in range(total_steps):
        # Do work
        step_result = await process_step(i)
        results.append(step_result)

        # Store intermediate progress
        await artifact_manager.save(ArtifactIn(
            data={"step": i, "total": total_steps, "result": step_result}
        ))

    return {
        "status": "complete",
        "steps_completed": total_steps,
        "results": results
    }

Or spawn a background job:

@TaskRegistry.register("long_batch", tags=["processing", "batch"])
async def long_batch(
    scheduler: ChapkitScheduler,
    artifact_manager: ArtifactManager
) -> dict[str, object]:
    """Spawn background job for truly long operations."""

    async def batch_work():
        results = []
        for i in range(100):
            result = await process_step(i)
            results.append(result)
        # Store final result
        artifact = await artifact_manager.save(ArtifactIn(data={"results": results}))
        return {"artifact_id": str(artifact.id)}

    job_id = await scheduler.spawn(batch_work(), description="Batch processing")
    return {"job_id": str(job_id), "check_at": f"/api/v1/jobs/{job_id}"}

Task Organization with Tags

Use tags for effective task organization:

# By functionality
@TaskRegistry.register("extract_data", tags=["etl", "extract"])
@TaskRegistry.register("transform_data", tags=["etl", "transform"])
@TaskRegistry.register("load_data", tags=["etl", "load"])

# By environment
@TaskRegistry.register("dev_setup", tags=["dev", "setup"])
@TaskRegistry.register("prod_setup", tags=["prod", "setup"])

# By priority
@TaskRegistry.register("urgent_task", tags=["high-priority"])
@TaskRegistry.register("batch_task", tags=["low-priority", "batch"])

Hot Reload During Development

Clear the registry when your module reloads:

# At top of your main.py
TaskRegistry.clear()

# Then register tasks
@TaskRegistry.register("my_task")
async def my_task():
    ...

This prevents duplicate registration errors during development.


Complete Example

See examples/task_execution/main.py for a complete working example with: - Multiple task types (simple, with parameters, with injection) - Tag-based organization - Dependency injection - Artifact integration - Service configuration

Next Steps

  • Job Scheduler: Learn about job monitoring and concurrency control
  • Artifact Storage: Understand artifact hierarchies and result storage
  • Service Builder: Configure services with multiple features
  • Monitoring: Track task execution metrics