fix: DNS check timeout with IPv6-broken authoritative NS

Add IPv4 fallback for authoritative NS queries in query_dns().
When dig @ns returns no useful answer (e.g. IPv6 transport to
the NS times out), retry with -4 to force IPv4. Also limits
timeout and tries (+timeout=10 +tries=2) to avoid hanging.

Fixes #851
This commit is contained in:
holger krekel
2026-02-23 15:58:56 +01:00
parent 16b00da373
commit f65ecc23fa
2 changed files with 81 additions and 4 deletions

View File

@@ -64,11 +64,13 @@ def get_dkim_entry(mail_domain, pre_command, dkim_selector):
)
def query_dns(typ, domain):
def query_dns(typ, domain, shell_exec=None):
if shell_exec is None:
shell_exec = shell
# Get autoritative nameserver from the SOA record.
soa_answers = [
x.split()
for x in shell(
for x in shell_exec(
f"dig -r -q {domain} -t SOA +noall +authority +answer", print=log_progress
).split("\n")
]
@@ -78,8 +80,27 @@ def query_dns(typ, domain):
ns = soa[0][4]
# Query authoritative nameserver directly to bypass DNS cache.
res = shell(f"dig @{ns} -r -q {domain} -t {typ} +short", print=log_progress)
return next((line for line in res.split("\n") if not line.startswith(";")), "")
return _dig_authoritative(ns, domain, typ, shell_exec=shell_exec)
def _parse_dig_result(output):
"""Return first non-comment, non-empty line from dig output, or empty string."""
lines = [line for line in output.split("\n") if not line.startswith(";")]
return next((line for line in lines if line.strip()), "")
def _dig_authoritative(ns, domain, typ, shell_exec=None):
"""Query authoritative NS, falling back to IPv4-only if default fails."""
if shell_exec is None:
shell_exec = shell
# limit timeout and tries to not hang on a broken default NS
cmd = f"dig @{ns} -r -q {domain} -t {typ} +short +timeout=10 +tries=2"
result = _parse_dig_result(shell_exec(cmd, print=log_progress))
if result:
return result
# Fallback: force IPv4 transport (handles broken IPv6 to NS)
return _parse_dig_result(shell_exec(cmd + " -4", print=log_progress))
def check_zonefile(zonefile, verbose=True):

View File

@@ -214,3 +214,59 @@ class TestZonefileChecks:
assert not mockout.captured_red
assert "correct" in mockout.captured_green[0]
assert not mockout.captured_red
class TestDigAuthoritative:
@pytest.fixture
def dig_auth(self):
"""Helper that calls _dig_authoritative with a recording shell function."""
calls = []
def run(first_result, ipv4_result=None):
def shell(cmd, print=print):
calls.append(cmd)
if "-4" in cmd:
return ipv4_result or ""
return first_result
result = remote.rdns._dig_authoritative(
"ns1.example.", "example.com", "A", shell_exec=shell
)
return result, calls
return run
def test_ipv4_fallback_on_error(self, dig_auth):
"""Fallback to -4 when first query returns only error lines."""
result, calls = dig_auth(
first_result=(
";; communications error to 2a01:4f8:10a:1044::80#53: timed out\n"
";; communications error to 2a01:4f8:10a:1044::80#53: timed out\n"
),
ipv4_result="1.2.3.4",
)
assert len(calls) == 2
assert "-4" not in calls[0]
assert "-4" in calls[1]
assert result == "1.2.3.4"
def test_no_fallback_on_success(self, dig_auth):
"""No -4 fallback when first query returns a valid answer."""
result, calls = dig_auth(first_result="1.2.3.4")
assert result == "1.2.3.4"
assert len(calls) == 1
def test_fallback_with_mixed_error_lines(self, dig_auth):
"""Fallback triggers when output has only error lines mixed with empty."""
result, calls = dig_auth(
first_result=(
";; communications error to 2a01:4f8::80#53: timed out\n"
"\n"
";; communications error to 2a01:4f8::80#53: timed out\n"
),
ipv4_result=";; some warning\n1.2.3.4",
)
assert len(calls) == 2
assert result == "1.2.3.4"