Source code for chap_core.rest_api.v1.debug

import logging
from typing import Optional

from celery.result import AsyncResult
from fastapi import APIRouter, Depends, HTTPException

import chap_core.rest_api.db_worker_functions as wf

from ..celery_tasks import CeleryPool, celery
from ..data_models import JobResponse
from .jobs import worker
from .routers.dependencies import get_database_url, get_settings

router = APIRouter(prefix="/debug", tags=["debug"])
logger = logging.getLogger(__name__)
cur_job = None
celery_pool = CeleryPool()


[docs] @router.get("/add-numbers") def run_add_numbers(a: int, b: int): """Trigger a Celery task to add two numbers.""" global cur_job logger.info(f"Adding {a} and {b}") # task = add_numbers.delay(a, b) # job= celery_pool._celery.send_task("celery_tasks.add_numbers", args=[a, b]) # cur_job = celery_pool.queue(_add_numbers, a, b) return None return {"task_id": cur_job.id, "status": "Task submitted"}
[docs] @router.post("/trigger-exception", response_model=JobResponse) def trigger_exception(database_url: str = Depends(get_database_url), worker_settings=Depends(get_settings)): job = worker.queue_db(wf.trigger_exception, database_url=database_url, worker_config=worker_settings) return JobResponse(id=job.id)
[docs] @router.get("/get-status") def get_status(task_id: Optional[str] = None) -> dict: """Get the status and result of a task.""" task_id = task_id or cur_job.id task_result = AsyncResult(task_id, app=celery) # Check if task is in a valid state logger.info(f"Task {task_id}: {task_result}") if task_result.state == "PENDING": raise HTTPException(status_code=404, detail="Task not found or still pending execution.") result = { "task_id": task_id, "status": task_result.state, "result": task_result.result if task_result.successful() else "", "error": str(task_result.result) if task_result.failed() else "", } return result