- Add async Redis client singleton (redis_client.py) for caching and arq pool
- Add arq job functions (jobs.py) for background LLM processing
- Cache ask/explain LLM responses with TTL (1h ask, 24h explain)
- Add async mode to /api/ask: enqueue job, return job_id, poll /api/jobs/{id}
- Add GET /api/jobs/{job_id} endpoint for job status polling
- Add arq worker service to docker-compose (dev + prod)
- Switch from Redis to Valkey (BSD fork) in Docker Compose
- Add REDIS_URL config setting
- Add tests for cache hit, async mode, and job status
44 lines
1.3 KiB
Python
44 lines
1.3 KiB
Python
"""Job status endpoints for async LLM operations."""
|
|
|
|
from arq.jobs import Job, JobStatus
|
|
from auth import require_auth
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from redis_client import get_redis
|
|
|
|
router = APIRouter(dependencies=[Depends(require_auth)])
|
|
|
|
|
|
class JobStatusResponse(BaseModel):
|
|
job_id: str
|
|
status: str # queued, in_progress, complete, not_found, deferred
|
|
result: dict | None = None
|
|
error: str | None = None
|
|
|
|
|
|
@router.get("/jobs/{job_id}", response_model=JobStatusResponse)
|
|
async def get_job_status(job_id: str, user: dict = Depends(require_auth)):
|
|
"""Poll for the result of an async LLM job."""
|
|
redis = await get_redis()
|
|
job = Job(job_id, redis)
|
|
status = await job.status()
|
|
|
|
if status == JobStatus.not_found:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
result = None
|
|
error = None
|
|
if status == JobStatus.complete:
|
|
try:
|
|
result_data = await job.result(timeout=0)
|
|
result = result_data if isinstance(result_data, dict) else {"data": str(result_data)}
|
|
except Exception as exc:
|
|
error = str(exc)
|
|
|
|
return JobStatusResponse(
|
|
job_id=job_id,
|
|
status=status.value,
|
|
result=result,
|
|
error=error,
|
|
)
|