Some checks failed
CI / lint-and-test (push) Has been cancelled
- Cache Graph API tokens with expiry-aware reuse in graph/auth.py - Add tenacity-based retry/backoff wrapper (utils/http.py) and apply to all Graph/source API calls - Add Pydantic request/response models (models/api.py) and FastAPI query constraints - Add unit tests for event_model, auth and integration tests for API endpoints - Configure ruff linter/formatter in pyproject.toml - Add GitHub Actions CI pipeline (.github/workflows/ci.yml) - Add requirements-dev.txt with pytest, mongomock, httpx, ruff - Clean up typing imports and fix ruff linting across codebase
62 lines
1.5 KiB
Python
62 lines
1.5 KiB
Python
from unittest.mock import patch
|
|
|
|
import auth
|
|
import pytest
|
|
from auth import _allowed, require_auth
|
|
from fastapi import HTTPException
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_cache():
|
|
auth.JWKS_CACHE["keys"] = []
|
|
auth.JWKS_CACHE["exp"] = 0
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_jwks():
|
|
from Crypto.PublicKey import RSA
|
|
from jose.jwk import RSAKey
|
|
key = RSA.generate(2048)
|
|
rsa_key = RSAKey(key)
|
|
jwk_dict = {
|
|
"kty": "RSA",
|
|
"kid": "test-kid",
|
|
"n": rsa_key._key.n,
|
|
"e": rsa_key._key.e,
|
|
}
|
|
return rsa_key, jwk_dict
|
|
|
|
|
|
def test_allowed_no_restrictions():
|
|
assert _allowed({}, set(), set()) is True
|
|
|
|
|
|
def test_allowed_by_role():
|
|
assert _allowed({"roles": ["Admin"]}, {"Admin"}, set()) is True
|
|
assert _allowed({"roles": ["User"]}, {"Admin"}, set()) is False
|
|
|
|
|
|
def test_allowed_by_group():
|
|
assert _allowed({"groups": ["SecOps"]}, set(), {"SecOps"}) is True
|
|
assert _allowed({"groups": ["Users"]}, set(), {"SecOps"}) is False
|
|
|
|
|
|
@patch("auth.AUTH_ENABLED", False)
|
|
def test_require_auth_disabled():
|
|
claims = require_auth(None)
|
|
assert claims["sub"] == "anonymous"
|
|
|
|
|
|
@patch("auth.AUTH_ENABLED", True)
|
|
def test_require_auth_missing_header():
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
require_auth(None)
|
|
assert exc_info.value.status_code == 401
|
|
|
|
|
|
@patch("auth.AUTH_ENABLED", True)
|
|
def test_require_auth_invalid_bearer():
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
require_auth("Basic abc")
|
|
assert exc_info.value.status_code == 401
|