mirror of
https://github.com/chatmail/relay.git
synced 2026-06-13 07:01:07 +00:00
ee435a7ef7
replaces #870 fix #851 * fix(dns): address possible IndexError * fix(dns): remove redundant docstring * fix(dns): don't make NS explicit if None * bump cmlxc to 0.13.5 which fixes a powerdns config issue * remove the unneccessary SOA mocks, simplify mock tests, and run ruff format Co-authored-by: holger krekel <holger@merlinux.eu>
233 lines
8.4 KiB
Python
233 lines
8.4 KiB
Python
from copy import deepcopy
|
|
|
|
import pytest
|
|
|
|
from cmdeploy import remote
|
|
from cmdeploy.dns import check_full_zone, check_initial_remote_data, parse_zone_records
|
|
from cmdeploy.remote.rdns import get_authoritative_ns
|
|
|
|
|
|
@pytest.fixture
|
|
def mockdns_base(monkeypatch):
|
|
qdict = {}
|
|
|
|
def shell(command, fail_ok=False, print=print):
|
|
if command.startswith("dig"):
|
|
if command == "dig":
|
|
return "."
|
|
if "with.public.soa" in command and "NS" in command:
|
|
return "domain.with.public.soa. 2419 IN NS ns1.first-ns.de."
|
|
if "with.hidden.soa" in command and "NS" in command:
|
|
return (
|
|
"domain.with.hidden.soa. 2137 IN NS ns1.desec.io.\n"
|
|
"domain.with.hidden.soa. 2137 IN NS ns2.desec.org."
|
|
)
|
|
if "NS" in command:
|
|
return "delta.chat. 21600 IN NS ns1.first-ns.de."
|
|
command_chunks = command.split()
|
|
domain, typ = command_chunks[4], command_chunks[6]
|
|
try:
|
|
return qdict[typ][domain]
|
|
except KeyError:
|
|
return ""
|
|
return remote.rshell.shell(command=command, fail_ok=fail_ok, print=print)
|
|
|
|
monkeypatch.setattr(remote.rdns, shell.__name__, shell)
|
|
return qdict
|
|
|
|
|
|
@pytest.fixture
|
|
def mockdns_expected():
|
|
return {
|
|
"A": {"some.domain": "1.1.1.1"},
|
|
"AAAA": {"some.domain": "fde5:cd7a:9e1c:3240:5a99:936f:cdac:53ae"},
|
|
"CNAME": {
|
|
"mta-sts.some.domain": "some.domain.",
|
|
"www.some.domain": "some.domain.",
|
|
},
|
|
}
|
|
|
|
|
|
@pytest.fixture(params=["plain", "with-dns-comments"])
|
|
def mockdns(request, mockdns_base, mockdns_expected):
|
|
mockdns_base.update(deepcopy(mockdns_expected))
|
|
match request.param:
|
|
case "plain":
|
|
pass
|
|
case "with-dns-comments":
|
|
for typ, data in mockdns_base.items():
|
|
for host, result in data.items():
|
|
mockdns_base[typ][host] = (
|
|
";; some unsuccessful attempt result\n"
|
|
"; and another with a single semicolon\n"
|
|
f"{result}"
|
|
)
|
|
return mockdns_base
|
|
|
|
|
|
class TestGetDkimEntry:
|
|
def test_dkim_entry_returns_tuple_on_success(self, mockdns):
|
|
entry, web_entry = remote.rdns.get_dkim_entry(
|
|
"some.domain", "", dkim_selector="opendkim"
|
|
)
|
|
# May return None,None if openssl not available, but should never crash
|
|
if entry is not None:
|
|
assert "opendkim._domainkey.some.domain" in entry
|
|
assert "opendkim._domainkey.some.domain" in web_entry
|
|
|
|
def test_dkim_entry_returns_none_tuple_on_error(self, monkeypatch):
|
|
"""CalledProcessError must return (None, None), not bare None."""
|
|
from subprocess import CalledProcessError
|
|
|
|
def failing_shell(command, fail_ok=False, print=print):
|
|
raise CalledProcessError(1, command)
|
|
|
|
monkeypatch.setattr(remote.rdns, "shell", failing_shell)
|
|
result = remote.rdns.get_dkim_entry("some.domain", "", dkim_selector="opendkim")
|
|
assert result == (None, None)
|
|
assert result[0] is None and result[1] is None
|
|
|
|
|
|
class TestPerformInitialChecks:
|
|
def test_perform_initial_checks_ok1(self, mockdns, mockdns_expected):
|
|
remote_data = remote.rdns.perform_initial_checks("some.domain")
|
|
assert remote_data["A"] == mockdns_expected["A"]["some.domain"]
|
|
assert remote_data["AAAA"] == mockdns_expected["AAAA"]["some.domain"]
|
|
assert (
|
|
remote_data["MTA_STS"] == mockdns_expected["CNAME"]["mta-sts.some.domain"]
|
|
)
|
|
assert remote_data["WWW"] == mockdns_expected["CNAME"]["www.some.domain"]
|
|
|
|
@pytest.mark.parametrize("drop", ["A", "AAAA"])
|
|
def test_perform_initial_checks_with_one_of_A_AAAA(self, mockdns, drop):
|
|
del mockdns[drop]
|
|
remote_data = remote.rdns.perform_initial_checks("some.domain")
|
|
assert not remote_data[drop]
|
|
|
|
l = []
|
|
res = check_initial_remote_data(remote_data, print=l.append)
|
|
assert res
|
|
assert not l
|
|
|
|
def test_perform_initial_checks_no_mta_sts(self, mockdns):
|
|
del mockdns["CNAME"]["mta-sts.some.domain"]
|
|
remote_data = remote.rdns.perform_initial_checks("some.domain")
|
|
assert not remote_data["MTA_STS"]
|
|
|
|
l = []
|
|
res = check_initial_remote_data(remote_data, print=l.append)
|
|
assert not res
|
|
assert len(l) == 2
|
|
|
|
def test_perform_initial_checks_no_mta_sts_self_signed(self, mockdns):
|
|
del mockdns["CNAME"]["mta-sts.some.domain"]
|
|
remote_data = remote.rdns.perform_initial_checks("some.domain")
|
|
assert not remote_data["MTA_STS"]
|
|
|
|
l = []
|
|
res = check_initial_remote_data(remote_data, strict_tls=False, print=l.append)
|
|
assert res
|
|
assert not l
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("domain", "ns"),
|
|
[
|
|
("domain.with.public.soa", "ns1.first-ns.de."),
|
|
("domain.with.hidden.soa", "ns1.desec.io."),
|
|
],
|
|
)
|
|
def test_get_authoritative_ns(domain, ns, mockdns):
|
|
assert get_authoritative_ns(domain) == ns
|
|
|
|
|
|
def test_parse_zone_records():
|
|
text = """
|
|
; This is a comment
|
|
some.domain. 3600 IN A 1.1.1.1
|
|
|
|
; Another comment
|
|
www.some.domain. 3600 IN CNAME some.domain.
|
|
|
|
; Multi-word rdata
|
|
some.domain. 3600 IN MX 10 mail.some.domain.
|
|
|
|
; DKIM record (single line, multi-word TXT rdata)
|
|
dkim._domainkey.some.domain. 3600 IN TXT "v=DKIM1;k=rsa;p=MIIBIjANBgkqhkiG" "9w0BAQEFAAOCAQ8AMIIBCgKCAQEA"
|
|
|
|
; Another TXT record
|
|
_dmarc.some.domain. 3600 IN TXT "v=DMARC1;p=reject"
|
|
"""
|
|
records = list(parse_zone_records(text))
|
|
assert records == [
|
|
("some.domain", "3600", "A", "1.1.1.1"),
|
|
("www.some.domain", "3600", "CNAME", "some.domain."),
|
|
("some.domain", "3600", "MX", "10 mail.some.domain."),
|
|
(
|
|
"dkim._domainkey.some.domain",
|
|
"3600",
|
|
"TXT",
|
|
'"v=DKIM1;k=rsa;p=MIIBIjANBgkqhkiG" "9w0BAQEFAAOCAQ8AMIIBCgKCAQEA"',
|
|
),
|
|
("_dmarc.some.domain", "3600", "TXT", '"v=DMARC1;p=reject"'),
|
|
]
|
|
|
|
|
|
def test_parse_zone_records_invalid_line():
|
|
text = "invalid line"
|
|
with pytest.raises(ValueError, match="Bad zone record line"):
|
|
list(parse_zone_records(text))
|
|
|
|
|
|
def parse_zonefile_into_dict(zonefile, mockdns_base, only_required=False):
|
|
if only_required:
|
|
zonefile = zonefile.split("; Recommended")[0]
|
|
for name, ttl, rtype, rdata in parse_zone_records(zonefile):
|
|
mockdns_base.setdefault(rtype, {})[name] = rdata
|
|
|
|
|
|
class MockSSHExec:
|
|
def logged(self, func, kwargs):
|
|
return func(**kwargs)
|
|
|
|
def call(self, func, kwargs):
|
|
return func(**kwargs)
|
|
|
|
|
|
class TestZonefileChecks:
|
|
def test_check_zonefile_all_ok(self, cm_data, mockdns_base):
|
|
zonefile = cm_data.get("zftest.zone")
|
|
parse_zonefile_into_dict(zonefile, mockdns_base)
|
|
required_diff, recommended_diff = remote.rdns.check_zonefile(zonefile)
|
|
assert not required_diff and not recommended_diff
|
|
|
|
def test_check_zonefile_recommended_not_set(self, cm_data, mockdns_base):
|
|
zonefile = cm_data.get("zftest.zone")
|
|
zonefile_mocked = zonefile.split("; Recommended")[0]
|
|
parse_zonefile_into_dict(zonefile_mocked, mockdns_base)
|
|
required_diff, recommended_diff = remote.rdns.check_zonefile(zonefile)
|
|
assert not required_diff
|
|
assert len(recommended_diff) == 8
|
|
|
|
def test_check_zonefile_output_required_fine(self, cm_data, mockdns_base, mockout):
|
|
zonefile = cm_data.get("zftest.zone")
|
|
zonefile_mocked = zonefile.split("; Recommended")[0]
|
|
parse_zonefile_into_dict(zonefile_mocked, mockdns_base, only_required=True)
|
|
mssh = MockSSHExec()
|
|
mockdns_base["mail_domain"] = "some.domain"
|
|
res = check_full_zone(mssh, mockdns_base, out=mockout, zonefile=zonefile)
|
|
assert res == 0
|
|
assert "WARNING" in mockout.captured_plain[0]
|
|
assert len(mockout.captured_plain) == 9
|
|
|
|
def test_check_zonefile_output_full(self, cm_data, mockdns_base, mockout):
|
|
zonefile = cm_data.get("zftest.zone")
|
|
parse_zonefile_into_dict(zonefile, mockdns_base)
|
|
mssh = MockSSHExec()
|
|
mockdns_base["mail_domain"] = "some.domain"
|
|
res = check_full_zone(mssh, mockdns_base, out=mockout, zonefile=zonefile)
|
|
assert res == 0
|
|
assert not mockout.captured_red
|
|
assert "correct" in mockout.captured_green[0]
|
|
assert not mockout.captured_red
|