Added periodic fetch
This commit is contained in:
@@ -2,3 +2,5 @@ TENANT_ID=your-tenant-id
|
|||||||
CLIENT_ID=your-client-id
|
CLIENT_ID=your-client-id
|
||||||
CLIENT_SECRET=your-client-secret
|
CLIENT_SECRET=your-client-secret
|
||||||
MONGO_URI=mongodb://root:example@mongo:27017/
|
MONGO_URI=mongodb://root:example@mongo:27017/
|
||||||
|
ENABLE_PERIODIC_FETCH=false
|
||||||
|
FETCH_INTERVAL_MINUTES=60
|
||||||
|
|||||||
@@ -8,3 +8,7 @@ CLIENT_ID = os.getenv("CLIENT_ID")
|
|||||||
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
|
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
|
||||||
MONGO_URI = os.getenv("MONGO_URI")
|
MONGO_URI = os.getenv("MONGO_URI")
|
||||||
DB_NAME = "micro_soc"
|
DB_NAME = "micro_soc"
|
||||||
|
|
||||||
|
# Optional periodic fetch settings
|
||||||
|
ENABLE_PERIODIC_FETCH = os.getenv("ENABLE_PERIODIC_FETCH", "false").lower() == "true"
|
||||||
|
FETCH_INTERVAL_MINUTES = int(os.getenv("FETCH_INTERVAL_MINUTES", "60"))
|
||||||
|
|||||||
@@ -1,10 +1,13 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
|
|
||||||
from routes.fetch import router as fetch_router
|
from routes.fetch import router as fetch_router, run_fetch
|
||||||
from routes.events import router as events_router
|
from routes.events import router as events_router
|
||||||
|
from config import ENABLE_PERIODIC_FETCH, FETCH_INTERVAL_MINUTES
|
||||||
|
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
|
|
||||||
@@ -15,3 +18,33 @@ app.include_router(events_router, prefix="/api")
|
|||||||
# works regardless of the working directory used to start uvicorn.
|
# works regardless of the working directory used to start uvicorn.
|
||||||
frontend_dir = Path(__file__).parent / "frontend"
|
frontend_dir = Path(__file__).parent / "frontend"
|
||||||
app.mount("/", StaticFiles(directory=frontend_dir, html=True), name="frontend")
|
app.mount("/", StaticFiles(directory=frontend_dir, html=True), name="frontend")
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger("aoc.fetcher")
|
||||||
|
|
||||||
|
|
||||||
|
async def _periodic_fetch():
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await asyncio.to_thread(run_fetch)
|
||||||
|
logger.info("Periodic fetch completed.")
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Periodic fetch failed: %s", exc)
|
||||||
|
await asyncio.sleep(FETCH_INTERVAL_MINUTES * 60)
|
||||||
|
|
||||||
|
|
||||||
|
@app.on_event("startup")
|
||||||
|
async def start_periodic_fetch():
|
||||||
|
if ENABLE_PERIODIC_FETCH:
|
||||||
|
app.state.fetch_task = asyncio.create_task(_periodic_fetch())
|
||||||
|
|
||||||
|
|
||||||
|
@app.on_event("shutdown")
|
||||||
|
async def stop_periodic_fetch():
|
||||||
|
task = getattr(app.state, "fetch_task", None)
|
||||||
|
if task:
|
||||||
|
task.cancel()
|
||||||
|
try:
|
||||||
|
await task
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|||||||
@@ -10,8 +10,7 @@ from models.event_model import normalize_event
|
|||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
@router.get("/fetch-audit-logs")
|
def run_fetch(hours: int = 168):
|
||||||
def fetch_logs(hours: int = 168):
|
|
||||||
window = max(1, min(hours, 720)) # cap to 30 days for sanity
|
window = max(1, min(hours, 720)) # cap to 30 days for sanity
|
||||||
logs = []
|
logs = []
|
||||||
errors = []
|
errors = []
|
||||||
@@ -38,3 +37,11 @@ def fetch_logs(hours: int = 168):
|
|||||||
ops.append(UpdateOne({"id": doc.get("id"), "timestamp": doc.get("timestamp")}, {"$set": doc}, upsert=True))
|
ops.append(UpdateOne({"id": doc.get("id"), "timestamp": doc.get("timestamp")}, {"$set": doc}, upsert=True))
|
||||||
events_collection.bulk_write(ops, ordered=False)
|
events_collection.bulk_write(ops, ordered=False)
|
||||||
return {"stored_events": len(normalized), "errors": errors}
|
return {"stored_events": len(normalized), "errors": errors}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/fetch-audit-logs")
|
||||||
|
def fetch_logs(hours: int = 168):
|
||||||
|
try:
|
||||||
|
return run_fetch(hours=hours)
|
||||||
|
except Exception as exc:
|
||||||
|
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||||
|
|||||||
Reference in New Issue
Block a user