Source code for chap_core.worker.interface

import dataclasses
from typing import TypeVar, Generic, Callable, Protocol, Any

ReturnType = TypeVar("ReturnType", covariant=True)


[docs] class Job(Generic[ReturnType], Protocol): @property def status(self) -> str: ... @property def result(self) -> ReturnType: ... @property def progress(self) -> float: ...
[docs] def cancel(self): ...
@property def is_finished(self) -> bool: ...
[docs] @dataclasses.dataclass class SeededJob: status: str = "ready" result: Any = None is_finished: bool = True progress: float = 1.0
[docs] class Worker(Generic[ReturnType], Protocol):
[docs] def queue(self, func: Callable[..., ReturnType], *args, **kwargs) -> Job[ReturnType]: ...