118 lines
3.8 KiB
Python
118 lines
3.8 KiB
Python
"""arq job functions for async LLM processing."""
|
|
|
|
import hashlib
|
|
import json
|
|
|
|
import structlog
|
|
from arq.connections import RedisSettings
|
|
from config import REDIS_URL
|
|
|
|
logger = structlog.get_logger("aoc.jobs")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cache helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
CACHE_TTL_ASK = 3600 # 1 hour
|
|
CACHE_TTL_EXPLAIN = 86400 # 24 hours
|
|
|
|
|
|
def _ask_cache_key(question: str, filters: dict, events: list) -> str:
|
|
payload = json.dumps({"q": question, "f": filters, "e": [e.get("id") for e in events]}, sort_keys=True)
|
|
return f"aoc:cache:ask:{hashlib.md5(payload.encode()).hexdigest()}"
|
|
|
|
|
|
def _explain_cache_key(event_id: str) -> str:
|
|
return f"aoc:cache:explain:{event_id}"
|
|
|
|
|
|
async def get_cached_ask(redis, question: str, filters: dict, events: list) -> dict | None:
|
|
key = _ask_cache_key(question, filters, events)
|
|
raw = await redis.get(key)
|
|
if raw:
|
|
return json.loads(raw)
|
|
return None
|
|
|
|
|
|
async def set_cached_ask(redis, question: str, filters: dict, events: list, result: dict):
|
|
key = _ask_cache_key(question, filters, events)
|
|
await redis.setex(key, CACHE_TTL_ASK, json.dumps(result, default=str))
|
|
|
|
|
|
async def get_cached_explain(redis, event_id: str) -> dict | None:
|
|
key = _explain_cache_key(event_id)
|
|
raw = await redis.get(key)
|
|
if raw:
|
|
return json.loads(raw)
|
|
return None
|
|
|
|
|
|
async def set_cached_explain(redis, event_id: str, result: dict):
|
|
key = _explain_cache_key(event_id)
|
|
await redis.setex(key, CACHE_TTL_EXPLAIN, json.dumps(result, default=str))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# arq job functions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def process_ask_question(
|
|
ctx, question: str, filters: dict, events: list, total: int, excluded_services: list | None
|
|
):
|
|
"""Background job: call LLM for /api/ask and cache result."""
|
|
from routes.ask import _call_llm
|
|
|
|
redis = ctx["redis"]
|
|
try:
|
|
answer = await _call_llm(question, events, total=total, excluded_services=excluded_services)
|
|
result = {"status": "completed", "answer": answer, "llm_used": True, "llm_error": None}
|
|
except Exception as exc:
|
|
logger.warning("Async ask LLM failed", error=str(exc))
|
|
result = {"status": "failed", "answer": "", "llm_used": False, "llm_error": str(exc)}
|
|
|
|
await set_cached_ask(redis, question, filters, events, result)
|
|
return result
|
|
|
|
|
|
async def process_explain_event(ctx, event_id: str, event: dict, related: list):
|
|
"""Background job: call LLM for /api/events/{id}/explain and cache result."""
|
|
from routes.ask import _explain_event
|
|
|
|
redis = ctx["redis"]
|
|
try:
|
|
explanation = await _explain_event(event, related)
|
|
result = {"status": "completed", "explanation": explanation, "llm_used": True, "llm_error": None}
|
|
except Exception as exc:
|
|
logger.warning("Async explain LLM failed", error=str(exc))
|
|
result = {"status": "failed", "explanation": "", "llm_used": False, "llm_error": str(exc)}
|
|
|
|
await set_cached_explain(redis, event_id, result)
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# arq worker configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def startup(ctx):
|
|
from redis.asyncio import Redis
|
|
|
|
ctx["redis"] = Redis.from_url(REDIS_URL, decode_responses=True)
|
|
|
|
|
|
async def shutdown(ctx):
|
|
await ctx["redis"].close()
|
|
|
|
|
|
class WorkerSettings:
|
|
functions = [process_ask_question, process_explain_event]
|
|
redis_settings = RedisSettings.from_dsn(REDIS_URL)
|
|
on_startup = startup
|
|
on_shutdown = shutdown
|
|
max_jobs = 10
|
|
job_timeout = 120
|
|
keep_result = 3600
|
|
keep_result_forever = False
|