chap_core.worker package

Submodules

chap_core.worker.background_tasks_worker module

class chap_core.worker.background_tasks_worker.BGTaskJob(state, job_id)[source]

Bases: Generic[ReturnType]

cancel()[source]
property is_finished
property progress
property result
property status
class chap_core.worker.background_tasks_worker.BGTaskWorker(background_tasks: BackgroundTasks, internal_state: InternalState, state)[source]

Bases: Generic[ReturnType]

new_id()[source]
queue(background_tasks, func, *args, **kwargs) BGTaskJob[ReturnType][source]
wrapper(func, job_id)[source]

chap_core.worker.interface module

class chap_core.worker.interface.Job(*args, **kwargs)[source]

Bases: Generic[ReturnType], Protocol

cancel()[source]
property is_finished: bool
property progress: float
property result: ReturnType
property status: str
class chap_core.worker.interface.SeededJob(status: str = 'ready', result: Any = None, is_finished: bool = True, progress: float = 1.0)[source]

Bases: object

is_finished: bool = True
progress: float = 1.0
result: Any = None
status: str = 'ready'
class chap_core.worker.interface.Worker(*args, **kwargs)[source]

Bases: Generic[ReturnType], Protocol

queue(func: Callable[[...], ReturnType], *args, **kwargs) Job[ReturnType][source]

chap_core.worker.rq_worker module

This needs a redis db and a redis queue worker running $ rq worker –with-scheduler

class chap_core.worker.rq_worker.RedisJob(job: Job)[source]

Bases: Generic[ReturnType]

Wrapper for a Redis Job

cancel()[source]
property exception_info: str
get_logs() str[source]
property is_finished: bool
property progress: float
property result: ReturnType | None
property status: str
class chap_core.worker.rq_worker.RedisQueue[source]

Bases: object

Simple abstraction for a Redis Queue

queue(func: Callable[[...], ReturnType], *args, **kwargs) RedisJob[ReturnType][source]
read_environment_variables()[source]

Module contents