refactor(cmdeploy/tests): don't use env vars but explicit pytest options to pass ssh info around.

Replace CHATMAIL_SSH env var with --ssh-host / --ssh-config pytest options.
Monkey-patch socket.getaddrinfo to resolve .localchat domains via
ssh-config mappings.  Add process cleanup to Remote, fix subprocess
stdin inheritance, and pass ssh_config through online tests.
This commit is contained in:
holger krekel
2026-03-30 08:19:04 +02:00
committed by j4n
parent fc382b1062
commit 386364ac70
5 changed files with 162 additions and 36 deletions

View File

@@ -20,7 +20,7 @@ def test_fastcgi_working(maildomain, chatmail_config):
@pytest.mark.filterwarnings("ignore::urllib3.exceptions.InsecureRequestWarning") @pytest.mark.filterwarnings("ignore::urllib3.exceptions.InsecureRequestWarning")
def test_newemail_configure(maildomain, rpc, chatmail_config): def test_newemail_configure(maildomain, maildomain_ip, rpc, chatmail_config):
"""Test configuring accounts by scanning a QR code works.""" """Test configuring accounts by scanning a QR code works."""
url = f"DCACCOUNT:https://{maildomain}/new" url = f"DCACCOUNT:https://{maildomain}/new"
for i in range(3): for i in range(3):
@@ -30,12 +30,15 @@ def test_newemail_configure(maildomain, rpc, chatmail_config):
# set_config_from_qr, so fetch credentials via requests instead # set_config_from_qr, so fetch credentials via requests instead
res = requests.post(f"https://{maildomain}/new", verify=False) res = requests.post(f"https://{maildomain}/new", verify=False)
data = res.json() data = res.json()
rpc.add_or_update_transport(account_id, { rpc.add_or_update_transport(
"addr": data["email"], account_id,
"password": data["password"], {
"imapServer": maildomain, "addr": data["email"],
"smtpServer": maildomain, "password": data["password"],
"certificateChecks": "acceptInvalidCertificates", "imapServer": maildomain_ip,
}) "smtpServer": maildomain_ip,
"certificateChecks": "acceptInvalidCertificates",
},
)
else: else:
rpc.add_transport_from_qr(account_id, url) rpc.add_transport_from_qr(account_id, url)

View File

@@ -12,8 +12,9 @@ from cmdeploy.cmdeploy import get_sshexec
class TestSSHExecutor: class TestSSHExecutor:
@pytest.fixture(scope="class") @pytest.fixture(scope="class")
def sshexec(self, sshdomain): def sshexec(self, sshdomain, pytestconfig):
return get_sshexec(sshdomain) ssh_config = pytestconfig.getoption("ssh_config")
return get_sshexec(sshdomain, ssh_config=ssh_config)
def test_ls(self, sshexec): def test_ls(self, sshexec):
out = sshexec(call=remote.rdns.shell, kwargs=dict(command="ls")) out = sshexec(call=remote.rdns.shell, kwargs=dict(command="ls"))
@@ -132,11 +133,10 @@ def test_authenticated_from(cmsetup, maildata):
@pytest.mark.parametrize("from_addr", ["fake@example.org", "fake@testrun.org"]) @pytest.mark.parametrize("from_addr", ["fake@example.org", "fake@testrun.org"])
def test_reject_missing_dkim(cmsetup, maildata, from_addr): def test_reject_missing_dkim(cmsetup, maildata, from_addr):
domain = cmsetup.maildomain domain = cmsetup.maildomain
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
try: try:
sock.connect((domain, 25)) sock = socket.create_connection((domain, 25), timeout=10)
except socket.timeout: sock.close()
except (socket.timeout, OSError):
pytest.skip(f"port 25 not reachable for {domain}") pytest.skip(f"port 25 not reachable for {domain}")
recipient = cmsetup.gen_users(1)[0] recipient = cmsetup.gen_users(1)[0]

View File

@@ -67,7 +67,7 @@ class TestEndToEndDeltaChat:
assert msg2.get_snapshot().text == "message0" assert msg2.get_snapshot().text == "message0"
def test_exceed_quota( def test_exceed_quota(
self, cmfactory, lp, tmpdir, remote, chatmail_config, sshdomain self, cmfactory, lp, tmpdir, remote, chatmail_config, sshdomain, pytestconfig
): ):
"""This is a very slow test as it needs to upload >100MB of mail data """This is a very slow test as it needs to upload >100MB of mail data
before quota is exceeded, and thus depends on the speed of the upload. before quota is exceeded, and thus depends on the speed of the upload.
@@ -92,7 +92,9 @@ class TestEndToEndDeltaChat:
lp.sec(f"filling remote inbox for {user}") lp.sec(f"filling remote inbox for {user}")
fn = f"7743102289.M843172P2484002.c20,S={quota},W=2398:2," fn = f"7743102289.M843172P2484002.c20,S={quota},W=2398:2,"
path = chatmail_config.mailboxes_dir.joinpath(user, "cur", fn) path = chatmail_config.mailboxes_dir.joinpath(user, "cur", fn)
sshexec = get_sshexec(sshdomain) sshexec = get_sshexec(
sshdomain, ssh_config=pytestconfig.getoption("ssh_config")
)
sshexec(call=rshell.write_numbytes, kwargs=dict(path=str(path), num=120)) sshexec(call=rshell.write_numbytes, kwargs=dict(path=str(path), num=120))
res = sshexec(call=rshell.dovecot_recalc_quota, kwargs=dict(user=user)) res = sshexec(call=rshell.dovecot_recalc_quota, kwargs=dict(user=user))
assert res["percent"] >= 100 assert res["percent"] >= 100

View File

@@ -3,12 +3,15 @@ import os
from cmdeploy.cmdeploy import main from cmdeploy.cmdeploy import main
def test_status_cmd(chatmail_config, capsys, request): def test_status_cmd(chatmail_config, capsys, request, pytestconfig):
os.chdir(request.config.invocation_params.dir) os.chdir(request.config.invocation_params.dir)
command = ["status"] command = ["status"]
if os.getenv("CHATMAIL_SSH"): ssh_host = pytestconfig.getoption("ssh_host")
command.append("--ssh-host") if ssh_host:
command.append(os.getenv("CHATMAIL_SSH")) command.extend(["--ssh-host", ssh_host])
ssh_config = pytestconfig.getoption("ssh_config")
if ssh_config:
command.extend(["--ssh-config", ssh_config])
assert main(command) == 0 assert main(command) == 0
status_out = capsys.readouterr() status_out = capsys.readouterr()
print(status_out.out) print(status_out.out)

View File

@@ -2,7 +2,9 @@ import imaplib
import itertools import itertools
import os import os
import random import random
import re
import smtplib import smtplib
import socket
import ssl import ssl
import subprocess import subprocess
import time import time
@@ -18,6 +20,76 @@ def pytest_addoption(parser):
parser.addoption( parser.addoption(
"--slow", action="store_true", default=False, help="also run slow tests" "--slow", action="store_true", default=False, help="also run slow tests"
) )
parser.addoption(
"--ssh-host",
dest="ssh_host",
default=None,
help="SSH host (overrides mail_domain for SSH operations).",
)
parser.addoption(
"--ssh-config",
dest="ssh_config",
default=None,
help="Path to an SSH config file (e.g. lxconfigs/ssh-config).",
)
def _parse_ssh_config_hosts(path):
"""Parse an OpenSSH config file and return a dict of hostname -> IP."""
mapping = {}
current_names = []
for ln in Path(path).read_text().splitlines():
line = ln.strip()
m = re.match(r"^Host\s+(.+)", line)
if m:
current_names = m.group(1).split()
continue
m = re.match(r"^Hostname\s+(\S+)", line)
if m and current_names:
ip = m.group(1)
for name in current_names:
mapping[name] = ip
current_names = []
return mapping
_original_getaddrinfo = socket.getaddrinfo
def _make_patched_getaddrinfo(host_map):
"""Return a getaddrinfo that resolves hosts in host_map to their IPs."""
def patched_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
if host in host_map:
ip = host_map[host]
return _original_getaddrinfo(ip, port, family, type, proto, flags)
return _original_getaddrinfo(host, port, family, type, proto, flags)
return patched_getaddrinfo
@pytest.fixture(autouse=True, scope="session")
def _setup_localchat_dns(pytestconfig):
"""Monkey-patch socket.getaddrinfo to resolve .localchat via ssh-config."""
ssh_config = pytestconfig.getoption("ssh_config")
if not ssh_config or not Path(ssh_config).exists():
yield {}
return
host_map = _parse_ssh_config_hosts(ssh_config)
if not host_map:
yield {}
return
socket.getaddrinfo = _make_patched_getaddrinfo(host_map)
try:
yield host_map
finally:
socket.getaddrinfo = _original_getaddrinfo
@pytest.fixture(scope="session")
def ssh_config_host_map(_setup_localchat_dns):
"""Return the host-name → IP map parsed from ssh-config."""
return _setup_localchat_dns
def pytest_configure(config): def pytest_configure(config):
@@ -35,6 +107,11 @@ def pytest_runtest_setup(item):
def _get_chatmail_config(): def _get_chatmail_config():
ini = os.environ.get("CHATMAIL_INI")
if ini:
path = Path(ini).resolve()
if path.exists():
return read_config(path), path
current = Path().resolve() current = Path().resolve()
while 1: while 1:
path = current.joinpath("chatmail.ini").resolve() path = current.joinpath("chatmail.ini").resolve()
@@ -61,8 +138,14 @@ def maildomain(chatmail_config):
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def sshdomain(maildomain): def sshdomain(maildomain, pytestconfig):
return os.environ.get("CHATMAIL_SSH", maildomain) return pytestconfig.getoption("ssh_host") or maildomain
@pytest.fixture(scope="session")
def maildomain_ip(maildomain, ssh_config_host_map):
"""Return the IP for maildomain from ssh-config, or maildomain itself."""
return ssh_config_host_map.get(maildomain, maildomain)
@pytest.fixture @pytest.fixture
@@ -306,12 +389,22 @@ from deltachat_rpc_client import DeltaChat, Rpc
class ChatmailACFactory: class ChatmailACFactory:
"""RPC-based account factory for chatmail testing.""" """RPC-based account factory for chatmail testing."""
def __init__(self, rpc, maildomain, gencreds, chatmail_config): def __init__(
self,
rpc,
maildomain,
maildomain_ip,
gencreds,
chatmail_config,
ssh_config_host_map,
):
self.dc = DeltaChat(rpc) self.dc = DeltaChat(rpc)
self.rpc = rpc self.rpc = rpc
self._maildomain = maildomain self._maildomain = maildomain
self._maildomain_ip = maildomain_ip
self.gencreds = gencreds self.gencreds = gencreds
self.chatmail_config = chatmail_config self.chatmail_config = chatmail_config
self._ssh_config_host_map = ssh_config_host_map
def _make_transport(self, domain): def _make_transport(self, domain):
"""Build a transport config dict for the given domain.""" """Build a transport config dict for the given domain."""
@@ -319,11 +412,13 @@ class ChatmailACFactory:
transport = { transport = {
"addr": addr, "addr": addr,
"password": password, "password": password,
# Setting server explicitly skips requesting autoconfig XML,
# see https://datatracker.ietf.org/doc/draft-ietf-mailmaint-autoconfig/
"imapServer": domain,
"smtpServer": domain,
} }
# To support running against local relays without host DNS resolution
# we attempt resolving the domain via ssh-config
# because otherwise core fails to find the address
server = self._ssh_config_host_map.get(domain)
if server is not None:
transport.update({"imapServer": server, "smtpServer": server})
if self.chatmail_config.tls_cert_mode == "self": if self.chatmail_config.tls_cert_mode == "self":
transport["certificateChecks"] = "acceptInvalidCertificates" transport["certificateChecks"] = "acceptInvalidCertificates"
return transport return transport
@@ -376,39 +471,56 @@ def rpc(tmp_path_factory):
@pytest.fixture @pytest.fixture
def cmfactory(rpc, gencreds, maildomain, chatmail_config): def cmfactory(
rpc, gencreds, maildomain, maildomain_ip, chatmail_config, ssh_config_host_map
):
"""Return a ChatmailACFactory for creating online Delta Chat accounts.""" """Return a ChatmailACFactory for creating online Delta Chat accounts."""
return ChatmailACFactory( return ChatmailACFactory(
rpc=rpc, rpc=rpc,
maildomain=maildomain, maildomain=maildomain,
maildomain_ip=maildomain_ip,
gencreds=gencreds, gencreds=gencreds,
chatmail_config=chatmail_config, chatmail_config=chatmail_config,
ssh_config_host_map=ssh_config_host_map,
) )
@pytest.fixture @pytest.fixture
def remote(sshdomain): def remote(sshdomain, pytestconfig):
return Remote(sshdomain) r = Remote(sshdomain, ssh_config=pytestconfig.getoption("ssh_config"))
yield r
r.close()
class Remote: class Remote:
def __init__(self, sshdomain): def __init__(self, sshdomain, ssh_config=None):
self.sshdomain = sshdomain self.sshdomain = sshdomain
self.ssh_config = ssh_config
self._procs = []
def iter_output(self, logcmd="", ready=None): def iter_output(self, logcmd="", ready=None):
getjournal = "journalctl -f" if not logcmd else logcmd getjournal = "journalctl -f" if not logcmd else logcmd
print(self.sshdomain) print(self.sshdomain)
match self.sshdomain: match self.sshdomain:
case "@local": command = [] case "@local":
case "localhost": command = [] command = []
case _: command = ["ssh", f"root@{self.sshdomain}"] case "localhost":
command = []
case _:
command = ["ssh"]
if self.ssh_config:
command.extend(["-F", self.ssh_config])
command.append(f"root@{self.sshdomain}")
[command.append(arg) for arg in getjournal.split()] [command.append(arg) for arg in getjournal.split()]
self.popen = subprocess.Popen( popen = subprocess.Popen(
command, command,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
) )
self._procs.append(popen)
while 1: while 1:
line = self.popen.stdout.readline() line = popen.stdout.readline()
res = line.decode().strip().lower() res = line.decode().strip().lower()
if not res: if not res:
break break
@@ -417,6 +529,12 @@ class Remote:
ready = None ready = None
yield res yield res
def close(self):
while self._procs:
proc = self._procs.pop()
proc.kill()
proc.wait()
@pytest.fixture @pytest.fixture
def lp(request): def lp(request):