Task Execution¶
Servicekit provides a task execution system for running Python functions and shell commands asynchronously with dependency injection, orphan detection, and artifact storage integration.
Quick Start¶
Python Task Functions¶
from chapkit.task import TaskRegistry, TaskManager, TaskIn, TaskRepository, TaskRouter
from servicekit.api import BaseServiceBuilder, ServiceInfo
from servicekit import Database
from chapkit.artifact import ArtifactManager
from fastapi import Depends
# Register Python task
@TaskRegistry.register("greet_user")
async def greet_user(name: str = "World") -> dict[str, str]:
"""Simple task that returns a greeting."""
return {"message": f"Hello, {name}!"}
# Task with dependency injection
@TaskRegistry.register("process_data")
async def process_data(database: Database, artifact_manager: ArtifactManager) -> dict[str, object]:
"""Dependencies are automatically injected."""
# Use injected database and artifact_manager
return {"status": "complete", "database_url": str(database.url)}
# Build service
app = (
BaseServiceBuilder(info=ServiceInfo(display_name="Task Service"))
.with_health()
.with_database("sqlite+aiosqlite:///./data.db")
.with_jobs(max_concurrency=5)
.build()
)
# Add task router
task_router = TaskRouter.create(
prefix="/api/v1/tasks",
tags=["Tasks"],
manager_factory=get_task_manager, # Dependency factory
entity_in_type=TaskIn,
entity_out_type=TaskOut,
)
app.include_router(task_router)
Shell Commands¶
import sys
# Execute shell commands
await manager.save(TaskIn(
command=f"{sys.executable} --version",
enabled=True
))
await manager.save(TaskIn(
command="echo 'Hello from shell!'",
enabled=True
))
Run: fastapi dev your_file.py
Architecture¶
Task Types¶
Python Functions:
- Registered with @TaskRegistry.register(name)
- Automatic dependency injection
- Return dict with results
- Async or sync functions
Shell Commands: - Any executable command - Stdout/stderr captured - Exit code handling - Environment variable support
Execution Flow¶
1. Task Submitted
POST /api/v1/tasks
2. Task Stored
├─> Database record created
├─> Enabled = True/False
└─> Metadata stored
3. Task Execution ($execute endpoint)
├─> Job created in scheduler
├─> Dependencies injected (Python tasks)
├─> Process spawned (shell tasks)
└─> Results returned
4. Results Storage
├─> Return value stored
├─> Stdout/stderr captured
└─> Optional artifact linking
Core Concepts¶
TaskRegistry¶
Global registry for Python task functions.
from chapkit.task import TaskRegistry
@TaskRegistry.register("my_task")
async def my_task(param: str) -> dict[str, object]:
"""Task docstring."""
return {"result": param.upper()}
# Verify registration
assert TaskRegistry.is_registered("my_task")
# Get task function
task_fn = TaskRegistry.get("my_task")
Rules: - Task names must be unique - Functions must return dict or None - Both async and sync functions supported - Parameters can have defaults
Dependency Injection¶
Tasks can request dependencies as function parameters:
@TaskRegistry.register("with_dependencies")
async def with_dependencies(
database: Database,
artifact_manager: ArtifactManager,
scheduler: JobScheduler,
custom_param: str = "default"
) -> dict[str, object]:
"""Dependencies automatically injected at runtime."""
# Use injected dependencies
artifact = await artifact_manager.save(ArtifactIn(data={"result": custom_param}))
return {"artifact_id": str(artifact.id)}
Available Injectables:
- database
: Database instance
- artifact_manager
: ArtifactManager instance
- scheduler
: JobScheduler instance
Note: Task parameters with defaults are treated as task arguments, not injected dependencies.
Task Model¶
from chapkit.task import TaskIn
task = TaskIn(
command="greet_user", # Function name or shell command
enabled=True, # Whether task can be executed
params={"name": "Alice"}, # Parameters for Python tasks
)
Fields:
- id
: ULID (auto-generated)
- command
: Task function name or shell command
- enabled
: Boolean flag for execution control
- params
: Optional dict of parameters (Python tasks only)
- created_at
, updated_at
: Timestamps
TaskManager¶
Business logic layer for task operations.
from chapkit.task import TaskManager
manager = TaskManager(repository, scheduler, database, artifact_manager)
# Create task
task = await manager.save(TaskIn(command="greet_user", enabled=True))
# Execute task
result = await manager.execute_task(task.id, params={"name": "Bob"})
# List all tasks
tasks = await manager.find_all()
# Find orphaned tasks (no job assigned)
orphans = await manager.find_orphaned_tasks()
API Endpoints¶
POST /api/v1/tasks¶
Create new task.
Request:
Response (201 Created):
{
"id": "01TASK123...",
"command": "greet_user",
"enabled": true,
"params": {
"name": "Alice"
},
"created_at": "2025-10-18T12:00:00Z",
"updated_at": "2025-10-18T12:00:00Z"
}
GET /api/v1/tasks¶
List all tasks with pagination.
GET /api/v1/tasks/{id}¶
Get task by ID.
PUT /api/v1/tasks/{id}¶
Update task.
Request:
DELETE /api/v1/tasks/{id}¶
Delete task.
POST /api/v1/tasks/{id}/$execute¶
Execute task asynchronously.
Request:
Response (202 Accepted):
GET /api/v1/tasks/$orphaned¶
Find orphaned tasks (tasks without assigned jobs).
Response:
{
"orphaned_tasks": [
{
"id": "01TASK789...",
"command": "greet_user",
"enabled": true
}
],
"count": 1
}
Python Task Patterns¶
Simple Task¶
@TaskRegistry.register("hello")
async def hello() -> dict[str, str]:
"""Simple hello world task."""
return {"message": "Hello!"}
Task with Parameters¶
@TaskRegistry.register("add")
async def add(a: int, b: int) -> dict[str, int]:
"""Add two numbers."""
return {"result": a + b}
# Execute with params
await manager.execute_task(task_id, params={"a": 5, "b": 3})
Task with Dependency Injection¶
@TaskRegistry.register("store_result")
async def store_result(
artifact_manager: ArtifactManager,
data: dict
) -> dict[str, object]:
"""Store result in artifact."""
artifact = await artifact_manager.save(ArtifactIn(data=data))
return {"artifact_id": str(artifact.id)}
# Execute with params
await manager.execute_task(task_id, params={"data": {"key": "value"}})
Database Query Task¶
@TaskRegistry.register("count_users")
async def count_users(database: Database) -> dict[str, int]:
"""Count users in database."""
async with database.session() as session:
from sqlalchemy import select, func
from myapp.models import User
stmt = select(func.count(User.id))
result = await session.execute(stmt)
count = result.scalar()
return {"user_count": count}
File Processing Task¶
@TaskRegistry.register("process_csv")
async def process_csv(filepath: str) -> dict[str, object]:
"""Process CSV file."""
import pandas as pd
df = pd.read_csv(filepath)
summary = {
"rows": len(df),
"columns": list(df.columns),
"summary": df.describe().to_dict()
}
return summary
Shell Command Patterns¶
Simple Commands¶
# Python version
await manager.save(TaskIn(
command=f"{sys.executable} --version",
enabled=True
))
# Echo command
await manager.save(TaskIn(
command="echo 'Processing complete'",
enabled=True
))
# List files
await manager.save(TaskIn(
command="ls -la /data",
enabled=True
))
Script Execution¶
# Execute Python script
await manager.save(TaskIn(
command=f"{sys.executable} scripts/process_data.py --input data.csv --output results.json",
enabled=True
))
# Execute shell script
await manager.save(TaskIn(
command="bash scripts/backup.sh /data /backups",
enabled=True
))
Data Pipeline¶
# Multi-step data processing
commands = [
"wget https://example.com/data.csv -O /tmp/data.csv",
f"{sys.executable} scripts/clean_data.py /tmp/data.csv /tmp/clean.csv",
f"{sys.executable} scripts/analyze.py /tmp/clean.csv /tmp/results.json"
]
for cmd in commands:
task = await manager.save(TaskIn(command=cmd, enabled=True))
await manager.execute_task(task.id)
Complete Workflow Example¶
# Start service
fastapi dev examples/task_execution_api.py
# Create Python task
TASK_ID=$(curl -s -X POST http://localhost:8000/api/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"command": "greet_user",
"enabled": true,
"params": {"name": "Alice"}
}' | jq -r '.id')
# Execute task
EXEC_RESPONSE=$(curl -s -X POST http://localhost:8000/api/v1/tasks/$TASK_ID/\$execute \
-H "Content-Type: application/json" \
-d '{"params": {"name": "Bob"}}')
JOB_ID=$(echo $EXEC_RESPONSE | jq -r '.job_id')
# Monitor job status
curl http://localhost:8000/api/v1/jobs/$JOB_ID | jq
# Stream job updates (SSE)
curl -N http://localhost:8000/api/v1/jobs/$JOB_ID/\$stream
# Check for orphaned tasks
curl http://localhost:8000/api/v1/tasks/\$orphaned | jq
Testing¶
Unit Tests¶
import pytest
from chapkit.task import TaskRegistry, TaskManager, TaskIn
@TaskRegistry.register("test_task")
async def test_task(value: str) -> dict[str, str]:
"""Test task."""
return {"result": value.upper()}
@pytest.mark.asyncio
async def test_task_execution(task_manager):
"""Test task execution."""
# Create task
task = await task_manager.save(TaskIn(
command="test_task",
enabled=True,
params={"value": "hello"}
))
# Execute
result = await task_manager.execute_task(task.id)
# Verify
assert result["result"] == "HELLO"
Integration Tests¶
from fastapi.testclient import TestClient
def test_task_workflow(client: TestClient):
"""Test complete task workflow."""
# Create task
response = client.post("/api/v1/tasks", json={
"command": "greet_user",
"enabled": True
})
assert response.status_code == 201
task_id = response.json()["id"]
# Execute task
exec_response = client.post(f"/api/v1/tasks/{task_id}/$execute", json={
"params": {"name": "Test"}
})
assert exec_response.status_code == 202
job_id = exec_response.json()["job_id"]
# Wait for completion
import time
time.sleep(1)
# Check job
job_response = client.get(f"/api/v1/jobs/{job_id}")
assert job_response.json()["status"] == "completed"
Production Considerations¶
Concurrency Control¶
Limit concurrent task execution:
app = (
BaseServiceBuilder(info=ServiceInfo(display_name="Task Service"))
.with_jobs(max_concurrency=5) # Max 5 concurrent tasks
.build()
)
Error Handling¶
Tasks should handle errors gracefully:
@TaskRegistry.register("safe_task")
async def safe_task(risky_param: str) -> dict[str, object]:
"""Task with error handling."""
try:
# Risky operation
result = process_risky_operation(risky_param)
return {"status": "success", "result": result}
except Exception as e:
return {"status": "error", "error": str(e)}
Long-Running Tasks¶
For long-running tasks, provide progress updates:
@TaskRegistry.register("long_task")
async def long_task() -> dict[str, object]:
"""Long-running task."""
import time
steps = 10
for i in range(steps):
# Do work
time.sleep(1)
# Log progress (captured in job logs)
print(f"Progress: {i+1}/{steps}")
return {"status": "complete"}
Orphan Detection¶
Regularly check for orphaned tasks:
Shell Command Security¶
Important: Validate shell commands to prevent injection:
import shlex
def validate_command(command: str) -> bool:
"""Validate shell command for safety."""
# Parse command safely
try:
args = shlex.split(command)
# Check against whitelist
allowed_executables = ["python", "bash", "ls", "echo"]
if args[0] not in allowed_executables:
return False
return True
except ValueError:
return False
# Use in task creation
if validate_command(task_command):
task = await manager.save(TaskIn(command=task_command, enabled=True))
Complete Example¶
See examples/task_execution_api.py
for a complete working example with Python tasks, shell commands, and dependency injection.
Next Steps¶
- Job Scheduler: Learn about job monitoring and concurrency control
- Artifact Storage: Store task results in artifacts
- Authentication: Secure task endpoints with API keys
- Monitoring: Track task execution with Prometheus metrics