Source code for chap_core.worker.rq_worker
"""
This needs a redis db and a redis queue worker running
$ rq worker --with-scheduler
"""
from typing import Callable, Generic
from rq import Queue
from rq.job import Job
from redis import Redis
import os
from dotenv import load_dotenv, find_dotenv
import chap_core.log_config
from chap_core.worker.interface import ReturnType
import logging
logger = logging.getLogger(__name__)
chap_core.log_config.initialize_logging()
# logger.addHandler(logging.FileHandler('logs/rq_worker.log'))
# logger.info("Logging initialized")
[docs]
class RedisJob(Generic[ReturnType]):
"""Wrapper for a Redis Job"""
def __init__(self, job: Job):
self._job = job
@property
def status(self) -> str:
return self._job.get_status()
@property
def exception_info(self) -> str:
return self._job.exc_info
@property
def result(self) -> ReturnType | None:
value = self._job.return_value()
return value
@property
def progress(self) -> float:
return 0
[docs]
def get_logs(self) -> str:
return self._job.meta.get("stdout", "") + "\n" + self._job.meta.get("stderr", "")
[docs]
def cancel(self):
self._job.cancel()
@property
def is_finished(self) -> bool:
if self._job.get_status() == "queued":
logger.warning("Job is queued, maybe no worker is set up? Run `$ rq worker`")
return self._job.is_finished
[docs]
class RedisQueue:
"""Simple abstraction for a Redis Queue"""
def __init__(self):
# TODO: switch to using utils.load_redis()?
host, port = self.read_environment_variables()
logger.info("Connecting to Redis queue at %s:%s" % (host, port))
self.q = Queue(connection=Redis(host=host, port=int(port)), default_timeout=3600)
[docs]
def read_environment_variables(self):
load_dotenv(find_dotenv())
host = os.getenv("REDIS_HOST", "localhost") # default to localhost for backward compatibility
port = os.getenv("REDIS_PORT", "6379")
return host, port
[docs]
def queue(self, func: Callable[..., ReturnType], *args, **kwargs) -> RedisJob[ReturnType]:
return RedisJob(self.q.enqueue(func, *args, **kwargs, result_ttl=604800)) # keep result for a week
def __del__(self):
self.q.connection.close()