Source code for chap_core.worker.interface
import dataclasses
from typing import TypeVar, Generic, Callable, Protocol, Any
ReturnType = TypeVar("ReturnType", covariant=True)
[docs]
class Job(Generic[ReturnType], Protocol):
@property
def status(self) -> str: ...
@property
def result(self) -> ReturnType: ...
@property
def progress(self) -> float: ...
@property
def is_finished(self) -> bool: ...
[docs]
@dataclasses.dataclass
class SeededJob:
status: str = "ready"
result: Any = None
is_finished: bool = True
progress: float = 1.0
[docs]
class Worker(Generic[ReturnType], Protocol):
[docs]
def queue(self, func: Callable[..., ReturnType], *args, **kwargs) -> Job[ReturnType]: ...