- Add async Redis client singleton (redis_client.py) for caching and arq pool
- Add arq job functions (jobs.py) for background LLM processing
- Cache ask/explain LLM responses with TTL (1h ask, 24h explain)
- Add async mode to /api/ask: enqueue job, return job_id, poll /api/jobs/{id}
- Add GET /api/jobs/{job_id} endpoint for job status polling
- Add arq worker service to docker-compose (dev + prod)
- Switch from Redis to Valkey (BSD fork) in Docker Compose
- Add REDIS_URL config setting
- Add tests for cache hit, async mode, and job status
37 lines
1.1 KiB
Python
37 lines
1.1 KiB
Python
"""Async Redis client singleton for caching and job queue."""
|
|
|
|
import redis.asyncio as aioredis
|
|
from arq import create_pool
|
|
from arq.connections import ArqRedis, RedisSettings
|
|
from config import REDIS_URL
|
|
|
|
_arq_pool: ArqRedis | None = None
|
|
_plain_redis: aioredis.Redis | None = None
|
|
|
|
|
|
async def get_arq_pool() -> ArqRedis:
|
|
"""Return a shared arq pool (ArqRedis extends redis.asyncio.Redis)."""
|
|
global _arq_pool
|
|
if _arq_pool is None:
|
|
_arq_pool = await create_pool(RedisSettings.from_dsn(REDIS_URL))
|
|
return _arq_pool
|
|
|
|
|
|
async def get_redis() -> aioredis.Redis:
|
|
"""Return a shared plain async Redis client."""
|
|
global _plain_redis
|
|
if _plain_redis is None:
|
|
_plain_redis = aioredis.from_url(REDIS_URL, decode_responses=True)
|
|
return _plain_redis
|
|
|
|
|
|
async def close_redis_connections():
|
|
"""Close all Redis connections (call on shutdown)."""
|
|
global _arq_pool, _plain_redis
|
|
if _arq_pool:
|
|
await _arq_pool.close()
|
|
_arq_pool = None
|
|
if _plain_redis:
|
|
await _plain_redis.close()
|
|
_plain_redis = None
|