Source code for chap_core.rest_api.v1.jobs
import logging
from typing import List, Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from chap_core.api_types import EvaluationResponse
from chap_core.log_config import initialize_logging
from chap_core.rest_api.celery_tasks import CeleryPool, JobDescription
from chap_core.rest_api.celery_tasks import r as redis
from chap_core.rest_api.data_models import FullPredictionResponse
initialize_logging(True, "logs/rest_api.log")
logger = logging.getLogger(__name__)
logger.info("Logging initialized")
router = APIRouter(prefix="/jobs", tags=["jobs"])
worker = CeleryPool()
[docs]
@router.get("")
def list_jobs(
ids: List[str] = Query(None), status: List[str] = Query(None), type: str = Query(None)
) -> List[JobDescription]:
"""
List all jobs currently in the queue.
Optionally filters by a list of job IDs, a list of statuses, and/or a job type.
Filtering order: IDs, then type, then status.
"""
jobs_to_return = worker.list_jobs()
if ids:
id_filter_set = set(ids)
jobs_to_return = [job for job in jobs_to_return if job.id in id_filter_set]
if type:
type_upper = type.upper()
jobs_to_return = [job for job in jobs_to_return if job.type and job.type.upper() == type_upper]
if status:
status_filter_set = set(s.upper() for s in status)
jobs_to_return = [job for job in jobs_to_return if job.status and job.status.upper() in status_filter_set]
return jobs_to_return
def _get_successful_job(job_id):
job = worker.get_job(job_id)
if job.status.lower() == "failed":
raise HTTPException(status_code=400, detail="Job failed. Check the exception endpoint for more information")
if not (job and job.is_finished):
raise HTTPException(status_code=400, detail="Job is still running, try again later")
return job
[docs]
@router.get("/{job_id}")
def get_job_status(job_id: str) -> str:
status = worker.get_job(job_id).status
logger.info(f"status of job {job_id}: {status}")
if status.lower() in ("failed", "failure"):
print(worker.get_job(job_id).exception_info)
print(worker.get_job(job_id).result)
print(worker.get_job(job_id).get_logs())
return status
[docs]
@router.delete("/{job_id}")
def delete_job(job_id: str) -> dict:
job = worker.get_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
job_status = job.status.lower()
if job_status in ["pending", "started", "running"]:
raise HTTPException(status_code=400, detail="Cannot delete a running job. Cancel it first.")
result = redis.delete(f"job_meta:{job_id}")
if result == 0:
raise HTTPException(status_code=404, detail="Job not found")
return {"message": f"Job {job_id} deleted successfully"}
[docs]
@router.post("/{job_id}/cancel")
def cancel_job(job_id: str) -> dict:
"""
Cancel a running job
"""
job = worker.get_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
job_status = job.status.lower()
if job_status in ["success", "failure", "revoked"]:
raise HTTPException(status_code=400, detail="Cannot cancel a job that has already finished or been cancelled")
if job_status not in ["pending", "started", "running"]:
raise HTTPException(status_code=400, detail=f"Cannot cancel job with status '{job.status}'")
job.cancel()
return {"message": f"Job {job_id} has been cancelled"}
[docs]
@router.get("/{job_id}/logs")
def get_logs(job_id: str) -> str:
job = worker.get_job(job_id)
logs = job.get_logs()
if logs is None:
raise HTTPException(status_code=400, detail=f"Log file not found for job ID '{job_id}'")
return logs
[docs]
@router.get("/{job_id}/prediction_result")
def get_prediction_result(job_id: str) -> FullPredictionResponse:
return _get_successful_job(job_id).result
[docs]
@router.get("/{job_id}/evaluation_result")
def get_evaluation_result(job_id: str) -> EvaluationResponse:
return _get_successful_job(job_id).result
[docs]
class DataBaseResponse(BaseModel):
id: int
[docs]
@router.get("/{job_id}/database_result")
def get_database_result(job_id: str) -> DataBaseResponse:
result = _get_successful_job(job_id).result
return DataBaseResponse(id=result)
"""
Datasets for evaluation are versioned and stored
Datasets for predictions are not necessarily versioned and stored, but sent in each request
Evaluation should be run once, then using continous monitoring after
Task-id
"""
# todo: move this
[docs]
class NaiveJob:
def __init__(self, func, *args, **kwargs):
# todo: init a root logger to capture all logs from the job
self._exception_info = ""
self._result = ""
self._status = ""
self._finished = False
logger.info("Starting naive job")
try:
self._result = func(*args, **kwargs)
self._status = "finished"
logger.info("Naive job finished successfully")
self._finished = True
except Exception as e:
self._exception_info = str(e)
logger.info("Naive job failed with exception: %s", e)
self._status = "failed"
self._result = ""
@property
def id(self):
return "naive_job"
@property
def status(self):
return self._status
@property
def exception_info(self):
return self._exception_info
@property
def progress(self):
return 1
@property
def result(self):
return self._result
@property
def is_finished(self):
return self._finished
[docs]
def get_logs(self, n_lines: Optional[int]):
"""Retrives logs from the current job"""
return ""
[docs]
class NaiveWorker:
job_class = NaiveJob
[docs]
def queue(self, func, *args, **kwargs):
# return self.job_class(func(*args, **kwargs))
return self.job_class(func, *args, **kwargs)