feat: service-level role gating for privacy-sensitive services (Option A)
All checks were successful
CI / lint-and-test (push) Successful in 25s
All checks were successful
CI / lint-and-test (push) Successful in 25s
- Add PRIVACY_SERVICES and PRIVACY_SERVICE_ROLES config variables
- Add user_can_access_privacy_services(claims) helper in auth.py
- /api/events filters out privacy services for users without required roles
- /api/filter-options excludes privacy services from dropdown options
- /api/ask excludes privacy services from NLQ queries
- /api/events/{id}/explain returns 403 for privacy events if unauthorized
- Teams added to default noisy service exclusion (frontend + backend)
- Update .env.example with privacy config documentation
- Add tests for event filtering, filter-options exclusion, and explain 403
This commit is contained in:
@@ -3,8 +3,9 @@ import re
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from audit_trail import log_action
|
||||
from auth import require_auth
|
||||
from auth import require_auth, user_can_access_privacy_services
|
||||
from bson import ObjectId
|
||||
from config import PRIVACY_SERVICES
|
||||
from database import events_collection
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from models.api import (
|
||||
@@ -44,6 +45,7 @@ def _build_query(
|
||||
cursor: str | None = None,
|
||||
include_tags: list[str] | None = None,
|
||||
exclude_tags: list[str] | None = None,
|
||||
exclude_services: list[str] | None = None,
|
||||
) -> dict:
|
||||
filters = []
|
||||
|
||||
@@ -51,6 +53,8 @@ def _build_query(
|
||||
filters.append({"service": service})
|
||||
if services:
|
||||
filters.append({"service": {"$in": services}})
|
||||
if exclude_services:
|
||||
filters.append({"service": {"$nin": exclude_services}})
|
||||
if actor:
|
||||
actor_safe = re.escape(actor)
|
||||
filters.append(
|
||||
@@ -125,6 +129,7 @@ def list_events(
|
||||
exclude_tags: list[str] | None = Query(default=None),
|
||||
user: dict = Depends(require_auth),
|
||||
):
|
||||
privacy_excluded = [] if user_can_access_privacy_services(user) else list(PRIVACY_SERVICES)
|
||||
query = _build_query(
|
||||
service=service,
|
||||
services=services,
|
||||
@@ -137,6 +142,7 @@ def list_events(
|
||||
cursor=cursor,
|
||||
include_tags=include_tags,
|
||||
exclude_tags=exclude_tags,
|
||||
exclude_services=privacy_excluded,
|
||||
)
|
||||
|
||||
safe_page_size = max(1, min(page_size, 500))
|
||||
@@ -202,6 +208,7 @@ def bulk_tags(
|
||||
exclude_tags: list[str] | None = Query(default=None),
|
||||
user: dict = Depends(require_auth),
|
||||
):
|
||||
privacy_excluded = [] if user_can_access_privacy_services(user) else list(PRIVACY_SERVICES)
|
||||
query = _build_query(
|
||||
service=service,
|
||||
services=services,
|
||||
@@ -213,6 +220,7 @@ def bulk_tags(
|
||||
search=search,
|
||||
include_tags=include_tags,
|
||||
exclude_tags=exclude_tags,
|
||||
exclude_services=privacy_excluded,
|
||||
)
|
||||
tags = [t.strip() for t in body.tags if t.strip()]
|
||||
if not tags:
|
||||
@@ -235,7 +243,10 @@ def bulk_tags(
|
||||
|
||||
|
||||
@router.get("/filter-options", response_model=FilterOptionsResponse)
|
||||
def filter_options(limit: int = Query(default=200, ge=1, le=1000)):
|
||||
def filter_options(
|
||||
limit: int = Query(default=200, ge=1, le=1000),
|
||||
user: dict = Depends(require_auth),
|
||||
):
|
||||
safe_limit = max(1, min(limit, 1000))
|
||||
try:
|
||||
services = sorted(events_collection.distinct("service"))[:safe_limit]
|
||||
@@ -247,6 +258,9 @@ def filter_options(limit: int = Query(default=200, ge=1, le=1000)):
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to load filter options: {exc}") from exc
|
||||
|
||||
if not user_can_access_privacy_services(user):
|
||||
services = [s for s in services if s not in PRIVACY_SERVICES]
|
||||
|
||||
return {
|
||||
"services": services,
|
||||
"operations": operations,
|
||||
|
||||
Reference in New Issue
Block a user