style: apply ruff formatting to all backend files
Some checks failed
CI / lint-and-test (push) Failing after 38s
Some checks failed
CI / lint-and-test (push) Failing after 38s
This commit is contained in:
@@ -9,10 +9,7 @@ def fetch_audit_logs(hours: int = 24, since: str | None = None, max_pages: int =
|
||||
"""Fetch paginated directory audit logs from Microsoft Graph and enrich with resolved names."""
|
||||
token = get_access_token()
|
||||
start_time = since or (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
|
||||
next_url = (
|
||||
"https://graph.microsoft.com/v1.0/"
|
||||
f"auditLogs/directoryAudits?$filter=activityDateTime ge {start_time}"
|
||||
)
|
||||
next_url = f"https://graph.microsoft.com/v1.0/auditLogs/directoryAudits?$filter=activityDateTime ge {start_time}"
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
events = []
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
from utils.http import get_with_retry
|
||||
|
||||
|
||||
@@ -48,7 +47,10 @@ def resolve_directory_object(object_id: str, token: str, cache: dict[str, dict])
|
||||
|
||||
probes = [
|
||||
("user", f"https://graph.microsoft.com/v1.0/users/{object_id}?$select=id,displayName,userPrincipalName,mail"),
|
||||
("servicePrincipal", f"https://graph.microsoft.com/v1.0/servicePrincipals/{object_id}?$select=id,displayName,appId,appDisplayName"),
|
||||
(
|
||||
"servicePrincipal",
|
||||
f"https://graph.microsoft.com/v1.0/servicePrincipals/{object_id}?$select=id,displayName,appId,appDisplayName",
|
||||
),
|
||||
("group", f"https://graph.microsoft.com/v1.0/groups/{object_id}?$select=id,displayName,mail"),
|
||||
("device", f"https://graph.microsoft.com/v1.0/devices/{object_id}?$select=id,displayName"),
|
||||
]
|
||||
@@ -82,12 +84,7 @@ def resolve_service_principal_owners(sp_id: str, token: str, cache: dict[str, li
|
||||
)
|
||||
payload = _request_json(url, token)
|
||||
for owner in (payload or {}).get("value", []):
|
||||
name = (
|
||||
owner.get("displayName")
|
||||
or owner.get("userPrincipalName")
|
||||
or owner.get("mail")
|
||||
or owner.get("id")
|
||||
)
|
||||
name = owner.get("displayName") or owner.get("userPrincipalName") or owner.get("mail") or owner.get("id")
|
||||
if name:
|
||||
owners.append(name)
|
||||
|
||||
|
||||
@@ -85,12 +85,14 @@ async def audit_middleware(request: Request, call_next):
|
||||
response = await call_next(request)
|
||||
if request.url.path.startswith("/api/") and request.method in ("POST", "PATCH", "PUT", "DELETE"):
|
||||
from auth import AUTH_ENABLED
|
||||
|
||||
user = "anonymous"
|
||||
if AUTH_ENABLED:
|
||||
auth_header = request.headers.get("authorization", "")
|
||||
if auth_header.lower().startswith("bearer "):
|
||||
try:
|
||||
from jose import jwt
|
||||
|
||||
token = auth_header.split(" ", 1)[1]
|
||||
claims = jwt.get_unverified_claims(token)
|
||||
user = claims.get("sub", "unknown")
|
||||
@@ -116,6 +118,7 @@ app.include_router(rules_router, prefix="/api")
|
||||
@app.get("/health")
|
||||
async def health_check():
|
||||
from database import db
|
||||
|
||||
try:
|
||||
db.command("ping")
|
||||
return {"status": "ok", "database": "connected"}
|
||||
|
||||
@@ -6,6 +6,7 @@ new display fields. Example:
|
||||
|
||||
python maintenance.py renormalize --limit 500
|
||||
"""
|
||||
|
||||
import argparse
|
||||
|
||||
from database import events_collection
|
||||
@@ -53,7 +54,9 @@ def dedupe(limit: int = None, batch_size: int = 500) -> int:
|
||||
"""
|
||||
Remove duplicate events based on dedupe_key. Keeps the first occurrence encountered.
|
||||
"""
|
||||
cursor = events_collection.find({}, projection={"_id": 1, "dedupe_key": 1, "raw": 1, "id": 1, "timestamp": 1}).sort("timestamp", 1)
|
||||
cursor = events_collection.find({}, projection={"_id": 1, "dedupe_key": 1, "raw": 1, "id": 1, "timestamp": 1}).sort(
|
||||
"timestamp", 1
|
||||
)
|
||||
if limit:
|
||||
cursor = cursor.limit(int(limit))
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
from prometheus_client import Counter, Histogram, generate_latest
|
||||
|
||||
REQUEST_DURATION = Histogram(
|
||||
|
||||
@@ -75,10 +75,7 @@ def _target_types(targets: list) -> list:
|
||||
types = []
|
||||
for t in targets or []:
|
||||
resolved = t.get("_resolved") or {}
|
||||
t_type = (
|
||||
resolved.get("type")
|
||||
or t.get("type")
|
||||
)
|
||||
t_type = resolved.get("type") or t.get("type")
|
||||
if t_type:
|
||||
types.append(t_type)
|
||||
return types
|
||||
@@ -101,7 +98,9 @@ def _display_summary(operation: str, target_labels: list, actor_label: str, targ
|
||||
return " | ".join(pieces)
|
||||
|
||||
|
||||
def _render_summary(template: str, operation: str, actor: str, target: str, category: str, result: str, service: str) -> str:
|
||||
def _render_summary(
|
||||
template: str, operation: str, actor: str, target: str, category: str, result: str, service: str
|
||||
) -> str:
|
||||
try:
|
||||
return template.format(
|
||||
operation=operation or category or "Event",
|
||||
@@ -177,13 +176,16 @@ def normalize_event(e):
|
||||
else:
|
||||
display_actor_value = actor_label
|
||||
|
||||
dedupe_key = _make_dedupe_key(e, {
|
||||
dedupe_key = _make_dedupe_key(
|
||||
e,
|
||||
{
|
||||
"id": e.get("id"),
|
||||
"timestamp": e.get("activityDateTime"),
|
||||
"service": e.get("category"),
|
||||
"operation": e.get("activityDisplayName"),
|
||||
"target_displays": target_labels,
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
return {
|
||||
"id": e.get("id"),
|
||||
|
||||
@@ -143,11 +143,7 @@ def list_events(
|
||||
|
||||
try:
|
||||
total = events_collection.count_documents(query) if not cursor else -1
|
||||
cursor_query = (
|
||||
events_collection.find(query)
|
||||
.sort([("timestamp", -1), ("_id", -1)])
|
||||
.limit(safe_page_size)
|
||||
)
|
||||
cursor_query = events_collection.find(query).sort([("timestamp", -1), ("_id", -1)]).limit(safe_page_size)
|
||||
events = list(cursor_query)
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to query events: {exc}") from exc
|
||||
@@ -160,10 +156,28 @@ def list_events(
|
||||
for e in events:
|
||||
e["_id"] = str(e["_id"])
|
||||
|
||||
log_action("list_events", "/api/events", {"filters": {k: v for k, v in {
|
||||
"service": service, "actor": actor, "operation": operation, "result": result,
|
||||
"start": start, "end": end, "search": search, "cursor": cursor, "page_size": page_size,
|
||||
}.items() if v is not None}}, user.get("sub", "anonymous"))
|
||||
log_action(
|
||||
"list_events",
|
||||
"/api/events",
|
||||
{
|
||||
"filters": {
|
||||
k: v
|
||||
for k, v in {
|
||||
"service": service,
|
||||
"actor": actor,
|
||||
"operation": operation,
|
||||
"result": result,
|
||||
"start": start,
|
||||
"end": end,
|
||||
"search": search,
|
||||
"cursor": cursor,
|
||||
"page_size": page_size,
|
||||
}.items()
|
||||
if v is not None
|
||||
}
|
||||
},
|
||||
user.get("sub", "anonymous"),
|
||||
)
|
||||
|
||||
return {
|
||||
"items": events,
|
||||
@@ -211,7 +225,12 @@ def bulk_tags(
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to update tags: {exc}") from exc
|
||||
|
||||
log_action("bulk_tags", "/api/events/bulk-tags", {"tags": tags, "mode": body.mode, "matched": result_obj.matched_count}, user.get("sub", "anonymous"))
|
||||
log_action(
|
||||
"bulk_tags",
|
||||
"/api/events/bulk-tags",
|
||||
{"tags": tags, "mode": body.mode, "matched": result_obj.matched_count},
|
||||
user.get("sub", "anonymous"),
|
||||
)
|
||||
return {"matched": result_obj.matched_count, "modified": result_obj.modified_count}
|
||||
|
||||
|
||||
|
||||
@@ -54,11 +54,14 @@ def run_fetch(hours: int = 168):
|
||||
if key:
|
||||
ops.append(UpdateOne({"dedupe_key": key}, {"$set": doc}, upsert=True))
|
||||
else:
|
||||
ops.append(UpdateOne({"id": doc.get("id"), "timestamp": doc.get("timestamp")}, {"$set": doc}, upsert=True))
|
||||
ops.append(
|
||||
UpdateOne({"id": doc.get("id"), "timestamp": doc.get("timestamp")}, {"$set": doc}, upsert=True)
|
||||
)
|
||||
events_collection.bulk_write(ops, ordered=False)
|
||||
|
||||
if ALERTS_ENABLED:
|
||||
from rules import evaluate_event
|
||||
|
||||
for doc in normalized:
|
||||
evaluate_event(doc)
|
||||
|
||||
@@ -75,7 +78,12 @@ def fetch_logs(
|
||||
):
|
||||
try:
|
||||
result = run_fetch(hours=hours)
|
||||
log_action("fetch_audit_logs", "/api/fetch-audit-logs", {"hours": hours, "stored": result["stored_events"]}, user.get("sub", "anonymous"))
|
||||
log_action(
|
||||
"fetch_audit_logs",
|
||||
"/api/fetch-audit-logs",
|
||||
{"hours": hours, "stored": result["stored_events"]},
|
||||
user.get("sub", "anonymous"),
|
||||
)
|
||||
return result
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
from auth import require_auth
|
||||
from fastapi import APIRouter, Depends
|
||||
from models.api import SourceHealthResponse
|
||||
@@ -19,17 +18,21 @@ def source_health():
|
||||
status = doc.get("status")
|
||||
if not status:
|
||||
status = "healthy" if doc.get("last_fetch_time") else "unknown"
|
||||
results.append({
|
||||
results.append(
|
||||
{
|
||||
"source": source,
|
||||
"last_fetch_time": doc.get("last_fetch_time"),
|
||||
"last_attempt_time": doc.get("last_attempt_time"),
|
||||
"status": status,
|
||||
})
|
||||
}
|
||||
)
|
||||
else:
|
||||
results.append({
|
||||
results.append(
|
||||
{
|
||||
"source": source,
|
||||
"last_fetch_time": None,
|
||||
"last_attempt_time": None,
|
||||
"status": "unknown",
|
||||
})
|
||||
}
|
||||
)
|
||||
return results
|
||||
|
||||
@@ -11,10 +11,7 @@ def fetch_intune_audit(hours: int = 24, since: str | None = None, max_pages: int
|
||||
"""
|
||||
token = get_access_token()
|
||||
start_time = since or (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
|
||||
url = (
|
||||
"https://graph.microsoft.com/v1.0/deviceManagement/auditEvents"
|
||||
f"?$filter=activityDateTime ge {start_time}"
|
||||
)
|
||||
url = f"https://graph.microsoft.com/v1.0/deviceManagement/auditEvents?$filter=activityDateTime ge {start_time}"
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
events = []
|
||||
@@ -69,7 +66,8 @@ def _normalize_intune(e: dict) -> dict:
|
||||
"targetResources": [
|
||||
{
|
||||
"id": target.get("id"),
|
||||
"displayName": target.get("displayName") or target.get("modifiedProperties", [{}])[0].get("displayName"),
|
||||
"displayName": target.get("displayName")
|
||||
or target.get("modifiedProperties", [{}])[0].get("displayName"),
|
||||
"type": target.get("type"),
|
||||
}
|
||||
]
|
||||
|
||||
@@ -25,7 +25,8 @@ def test_list_events_empty(client):
|
||||
|
||||
def test_list_events_cursor_pagination(client, mock_events_collection):
|
||||
for i in range(5):
|
||||
mock_events_collection.insert_one({
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": f"evt-{i}",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Directory",
|
||||
@@ -33,7 +34,8 @@ def test_list_events_cursor_pagination(client, mock_events_collection):
|
||||
"result": "success",
|
||||
"actor_display": f"Actor {i}",
|
||||
"raw_text": "",
|
||||
})
|
||||
}
|
||||
)
|
||||
response = client.get("/api/events?page_size=2")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
@@ -48,7 +50,8 @@ def test_list_events_cursor_pagination(client, mock_events_collection):
|
||||
|
||||
|
||||
def test_list_events_filter_by_service(client, mock_events_collection):
|
||||
mock_events_collection.insert_one({
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-1",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Exchange",
|
||||
@@ -56,8 +59,10 @@ def test_list_events_filter_by_service(client, mock_events_collection):
|
||||
"result": "success",
|
||||
"actor_display": "Alice",
|
||||
"raw_text": "",
|
||||
})
|
||||
mock_events_collection.insert_one({
|
||||
}
|
||||
)
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-2",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Directory",
|
||||
@@ -65,7 +70,8 @@ def test_list_events_filter_by_service(client, mock_events_collection):
|
||||
"result": "success",
|
||||
"actor_display": "Bob",
|
||||
"raw_text": "",
|
||||
})
|
||||
}
|
||||
)
|
||||
response = client.get("/api/events?service=Exchange")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
@@ -74,7 +80,8 @@ def test_list_events_filter_by_service(client, mock_events_collection):
|
||||
|
||||
|
||||
def test_list_events_filter_by_services(client, mock_events_collection):
|
||||
mock_events_collection.insert_one({
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-1",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Exchange",
|
||||
@@ -82,8 +89,10 @@ def test_list_events_filter_by_services(client, mock_events_collection):
|
||||
"result": "success",
|
||||
"actor_display": "Alice",
|
||||
"raw_text": "",
|
||||
})
|
||||
mock_events_collection.insert_one({
|
||||
}
|
||||
)
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-2",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Directory",
|
||||
@@ -91,8 +100,10 @@ def test_list_events_filter_by_services(client, mock_events_collection):
|
||||
"result": "success",
|
||||
"actor_display": "Bob",
|
||||
"raw_text": "",
|
||||
})
|
||||
mock_events_collection.insert_one({
|
||||
}
|
||||
)
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-3",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Teams",
|
||||
@@ -100,7 +111,8 @@ def test_list_events_filter_by_services(client, mock_events_collection):
|
||||
"result": "success",
|
||||
"actor_display": "Charlie",
|
||||
"raw_text": "",
|
||||
})
|
||||
}
|
||||
)
|
||||
response = client.get("/api/events?service=Exchange&service=Directory")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
@@ -117,7 +129,8 @@ def test_list_events_page_size_validation(client):
|
||||
|
||||
|
||||
def test_filter_options(client, mock_events_collection):
|
||||
mock_events_collection.insert_one({
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-1",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Intune",
|
||||
@@ -126,7 +139,8 @@ def test_filter_options(client, mock_events_collection):
|
||||
"actor_display": "Charlie",
|
||||
"actor_upn": "charlie@example.com",
|
||||
"raw_text": "",
|
||||
})
|
||||
}
|
||||
)
|
||||
response = client.get("/api/filter-options")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
@@ -168,7 +182,8 @@ def test_graph_webhook_notification(client):
|
||||
|
||||
|
||||
def test_update_tags(client, mock_events_collection):
|
||||
mock_events_collection.insert_one({
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-tags",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Directory",
|
||||
@@ -176,7 +191,8 @@ def test_update_tags(client, mock_events_collection):
|
||||
"result": "success",
|
||||
"actor_display": "Alice",
|
||||
"raw_text": "",
|
||||
})
|
||||
}
|
||||
)
|
||||
response = client.patch("/api/events/evt-tags/tags", json={"tags": ["investigating", "urgent"]})
|
||||
assert response.status_code == 200
|
||||
assert response.json()["tags"] == ["investigating", "urgent"]
|
||||
@@ -185,7 +201,8 @@ def test_update_tags(client, mock_events_collection):
|
||||
|
||||
|
||||
def test_add_comment(client, mock_events_collection):
|
||||
mock_events_collection.insert_one({
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-comment",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Directory",
|
||||
@@ -193,7 +210,8 @@ def test_add_comment(client, mock_events_collection):
|
||||
"result": "success",
|
||||
"actor_display": "Alice",
|
||||
"raw_text": "",
|
||||
})
|
||||
}
|
||||
)
|
||||
response = client.post("/api/events/evt-comment/comments", json={"text": "Looks suspicious"})
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
@@ -244,7 +262,8 @@ def test_rules_crud(client):
|
||||
|
||||
|
||||
def test_list_events_filter_by_include_tags(client, mock_events_collection):
|
||||
mock_events_collection.insert_one({
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-tagged",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Directory",
|
||||
@@ -253,8 +272,10 @@ def test_list_events_filter_by_include_tags(client, mock_events_collection):
|
||||
"actor_display": "Alice",
|
||||
"raw_text": "",
|
||||
"tags": ["backup", "auto"],
|
||||
})
|
||||
mock_events_collection.insert_one({
|
||||
}
|
||||
)
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-untagged",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Directory",
|
||||
@@ -263,7 +284,8 @@ def test_list_events_filter_by_include_tags(client, mock_events_collection):
|
||||
"actor_display": "Bob",
|
||||
"raw_text": "",
|
||||
"tags": [],
|
||||
})
|
||||
}
|
||||
)
|
||||
response = client.get("/api/events?include_tags=backup")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
@@ -272,7 +294,8 @@ def test_list_events_filter_by_include_tags(client, mock_events_collection):
|
||||
|
||||
|
||||
def test_bulk_tags_append(client, mock_events_collection):
|
||||
mock_events_collection.insert_one({
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-bulk",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Exchange",
|
||||
@@ -281,7 +304,8 @@ def test_bulk_tags_append(client, mock_events_collection):
|
||||
"actor_display": "Alice",
|
||||
"raw_text": "",
|
||||
"tags": ["existing"],
|
||||
})
|
||||
}
|
||||
)
|
||||
response = client.post("/api/events/bulk-tags?service=Exchange", json={"tags": ["backup"], "mode": "append"})
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
@@ -292,7 +316,8 @@ def test_bulk_tags_append(client, mock_events_collection):
|
||||
|
||||
|
||||
def test_bulk_tags_replace(client, mock_events_collection):
|
||||
mock_events_collection.insert_one({
|
||||
mock_events_collection.insert_one(
|
||||
{
|
||||
"id": "evt-bulk2",
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"service": "Exchange",
|
||||
@@ -301,7 +326,8 @@ def test_bulk_tags_replace(client, mock_events_collection):
|
||||
"actor_display": "Alice",
|
||||
"raw_text": "",
|
||||
"tags": ["old"],
|
||||
})
|
||||
}
|
||||
)
|
||||
response = client.post("/api/events/bulk-tags?service=Exchange", json={"tags": ["backup"], "mode": "replace"})
|
||||
assert response.status_code == 200
|
||||
doc = mock_events_collection.find_one({"id": "evt-bulk2"})
|
||||
|
||||
@@ -30,9 +30,7 @@ def test_normalize_event_basic():
|
||||
"userPrincipalName": "alice@example.com",
|
||||
}
|
||||
},
|
||||
"targetResources": [
|
||||
{"id": "t1", "displayName": "Bob", "type": "User"}
|
||||
],
|
||||
"targetResources": [{"id": "t1", "displayName": "Bob", "type": "User"}],
|
||||
}
|
||||
out = normalize_event(e)
|
||||
assert out["id"] == "abc"
|
||||
|
||||
@@ -33,11 +33,22 @@ def test_matches_after_hours():
|
||||
def test_evaluate_event_creates_alert(monkeypatch):
|
||||
from rules import alerts_collection
|
||||
|
||||
monkeypatch.setattr("rules.load_rules", lambda: [
|
||||
{"_id": "r1", "name": "Test rule", "enabled": True, "severity": "high", "conditions": [{"field": "operation", "op": "eq", "value": "Add user"}], "message": "Alert!"}
|
||||
])
|
||||
monkeypatch.setattr(
|
||||
"rules.load_rules",
|
||||
lambda: [
|
||||
{
|
||||
"_id": "r1",
|
||||
"name": "Test rule",
|
||||
"enabled": True,
|
||||
"severity": "high",
|
||||
"conditions": [{"field": "operation", "op": "eq", "value": "Add user"}],
|
||||
"message": "Alert!",
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
inserted = {}
|
||||
|
||||
def mock_insert(doc):
|
||||
inserted["doc"] = doc
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
import requests
|
||||
import structlog
|
||||
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
|
||||
@@ -18,12 +17,16 @@ RETRY_CONFIG = {
|
||||
|
||||
|
||||
@retry(**RETRY_CONFIG)
|
||||
def get_with_retry(url: str, headers: dict | None = None, params: dict | None = None, timeout: float = 20) -> requests.Response:
|
||||
def get_with_retry(
|
||||
url: str, headers: dict | None = None, params: dict | None = None, timeout: float = 20
|
||||
) -> requests.Response:
|
||||
res = requests.get(url, headers=headers, params=params, timeout=timeout)
|
||||
return res
|
||||
|
||||
|
||||
@retry(**RETRY_CONFIG)
|
||||
def post_with_retry(url: str, headers: dict | None = None, data: dict | None = None, params: dict | None = None, timeout: float = 15) -> requests.Response:
|
||||
def post_with_retry(
|
||||
url: str, headers: dict | None = None, data: dict | None = None, params: dict | None = None, timeout: float = 15
|
||||
) -> requests.Response:
|
||||
res = requests.post(url, headers=headers, data=data, params=params, timeout=timeout)
|
||||
return res
|
||||
|
||||
Reference in New Issue
Block a user