207 lines
6.8 KiB
Python
207 lines
6.8 KiB
Python
import json
|
|
|
|
from mapping_loader import get_mapping
|
|
|
|
|
|
CATEGORY_LABELS = {
|
|
"ApplicationManagement": "Application",
|
|
"UserManagement": "User",
|
|
"GroupManagement": "Group",
|
|
"RoleManagement": "Role",
|
|
"Device": "Device",
|
|
"Policy": "Policy",
|
|
"ResourceManagement": "Resource",
|
|
}
|
|
|
|
|
|
def _actor_display(actor: dict, resolved: dict = None, owners=None) -> str:
|
|
"""Choose a human-readable actor label."""
|
|
if resolved and resolved.get("name"):
|
|
name = resolved["name"]
|
|
if resolved.get("type") == "servicePrincipal" and owners:
|
|
owners_str = ", ".join(owners[:3])
|
|
return f"{name} (owners: {owners_str})" if owners_str else name
|
|
return name
|
|
|
|
if not actor:
|
|
return "Unknown actor"
|
|
|
|
user = actor.get("user", {}) or {}
|
|
sp = actor.get("servicePrincipal", {}) or {}
|
|
app = actor.get("app", {}) or {}
|
|
upn = user.get("userPrincipalName") or user.get("mail")
|
|
display = user.get("displayName")
|
|
app_display = app.get("displayName")
|
|
|
|
if display and upn and display != upn:
|
|
return f"{display} ({upn})"
|
|
|
|
return (
|
|
display
|
|
or upn
|
|
or app_display
|
|
or sp.get("displayName")
|
|
or sp.get("appId")
|
|
or actor.get("ipAddress")
|
|
or user.get("id")
|
|
or sp.get("id")
|
|
or "Unknown actor"
|
|
)
|
|
|
|
|
|
def _target_displays(targets: list) -> list:
|
|
"""Best-effort display labels for targets."""
|
|
labels = []
|
|
for t in targets or []:
|
|
resolved = t.get("_resolved") or {}
|
|
label = (
|
|
resolved.get("name")
|
|
or resolved.get("id")
|
|
or t.get("displayName")
|
|
or t.get("userPrincipalName")
|
|
or t.get("logonId")
|
|
or t.get("id")
|
|
or ""
|
|
)
|
|
if label:
|
|
labels.append(label)
|
|
return labels
|
|
|
|
|
|
def _target_types(targets: list) -> list:
|
|
"""Collect target types for display mapping."""
|
|
types = []
|
|
for t in targets or []:
|
|
resolved = t.get("_resolved") or {}
|
|
t_type = (
|
|
resolved.get("type")
|
|
or t.get("type")
|
|
)
|
|
if t_type:
|
|
types.append(t_type)
|
|
return types
|
|
|
|
|
|
def _display_summary(operation: str, target_labels: list, actor_label: str, target_types: list, category: str) -> str:
|
|
action = operation or category or "Event"
|
|
target = target_labels[0] if target_labels else None
|
|
t_type = target_types[0] if target_types else None
|
|
|
|
target_piece = None
|
|
if target and t_type:
|
|
target_piece = f"{t_type.lower()}: {target}"
|
|
elif target:
|
|
target_piece = target
|
|
|
|
pieces = [p for p in [action, target_piece] if p]
|
|
if actor_label:
|
|
pieces.append(f"by {actor_label}")
|
|
return " | ".join(pieces)
|
|
|
|
|
|
def _render_summary(template: str, operation: str, actor: str, target: str, category: str, result: str, service: str) -> str:
|
|
try:
|
|
return template.format(
|
|
operation=operation or category or "Event",
|
|
actor=actor or "Unknown actor",
|
|
target=target or "target",
|
|
category=category or "Other",
|
|
result=result or "",
|
|
service=service or "",
|
|
)
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _make_dedupe_key(e: dict, normalized_fields: dict = None) -> str:
|
|
"""
|
|
Build a stable key to prevent duplicates across sources.
|
|
Preference order:
|
|
- source event id (id) + category
|
|
- fallback to timestamp + category + operation + first target label
|
|
"""
|
|
norm = normalized_fields or {}
|
|
eid = e.get("id") or e.get("_id") or norm.get("id")
|
|
ts = e.get("activityDateTime") or e.get("timestamp") or norm.get("timestamp")
|
|
category = e.get("category") or e.get("service") or norm.get("service")
|
|
op = e.get("activityDisplayName") or e.get("operation") or norm.get("operation")
|
|
target_labels = norm.get("target_displays") or []
|
|
target = target_labels[0] if target_labels else None
|
|
|
|
if eid:
|
|
return "|".join(filter(None, [eid, category]))
|
|
|
|
return "|".join(filter(None, [ts, category, op, target])) or None
|
|
|
|
|
|
def normalize_event(e):
|
|
actor = e.get("initiatedBy", {})
|
|
targets = e.get("targetResources", [])
|
|
resolved_actor = e.get("_resolvedActor")
|
|
actor_owners = e.get("_resolvedActorOwners", [])
|
|
target_labels = _target_displays(targets)
|
|
target_types = _target_types(targets)
|
|
actor_label = _actor_display(actor, resolved_actor, actor_owners)
|
|
actor_upn = (actor.get("user") or {}).get("userPrincipalName") or (actor.get("user") or {}).get("mail")
|
|
first_target_label = target_labels[0] if target_labels else None
|
|
category = e.get("category")
|
|
mapping = get_mapping()
|
|
category_labels = mapping.get("category_labels") or {}
|
|
summary_templates = mapping.get("summary_templates") or {}
|
|
display_mapping = mapping.get("display") or {}
|
|
display_category = category_labels.get(category, category or "Other")
|
|
|
|
operation = e.get("activityDisplayName")
|
|
template = summary_templates.get(category) or summary_templates.get("default")
|
|
summary = _render_summary(
|
|
template,
|
|
operation=operation,
|
|
actor=actor_label,
|
|
target=target_labels[0] if target_labels else None,
|
|
category=display_category,
|
|
result=e.get("result"),
|
|
service=e.get("loggedByService") or e.get("category"),
|
|
)
|
|
|
|
display_conf = display_mapping.get(category) or display_mapping.get("default", {})
|
|
actor_field_pref = display_conf.get("actor_field", "actor_display")
|
|
actor_label_text = display_conf.get("actor_label", "User")
|
|
|
|
if actor_field_pref == "actor_upn" and actor_upn:
|
|
display_actor_value = actor_upn
|
|
elif actor_field_pref == "target_display" and first_target_label:
|
|
display_actor_value = first_target_label
|
|
else:
|
|
display_actor_value = actor_label
|
|
|
|
dedupe_key = _make_dedupe_key(e, {
|
|
"id": e.get("id"),
|
|
"timestamp": e.get("activityDateTime"),
|
|
"service": e.get("category"),
|
|
"operation": e.get("activityDisplayName"),
|
|
"target_displays": target_labels,
|
|
})
|
|
|
|
return {
|
|
"id": e.get("id"),
|
|
"timestamp": e.get("activityDateTime"),
|
|
"service": e.get("category"),
|
|
"operation": e.get("activityDisplayName"),
|
|
"result": e.get("result"),
|
|
"actor": actor,
|
|
"actor_resolved": resolved_actor,
|
|
"actor_owner_names": actor_owners,
|
|
"actor_display": actor_label,
|
|
"actor_upn": actor_upn,
|
|
"display_actor_label": actor_label_text,
|
|
"display_actor_value": display_actor_value,
|
|
"targets": targets,
|
|
"target_displays": target_labels,
|
|
"target_types": target_types,
|
|
"display_category": display_category,
|
|
"display_summary": summary,
|
|
"raw": e,
|
|
"raw_text": json.dumps(e, default=str),
|
|
"dedupe_key": dedupe_key,
|
|
}
|