feat: add LXC container support for local chatmail development

Add cmdeploy "lxc-test" command to run cmdeploy against local containers,
with supplementary lxc-start, lxc-stop and lxc-status subcommands.
See doc/source/lxc.rst for full documentation including prerequisites,
DNS setup, TLS handling, DNS-free testing, and known limitations.

Apart from adding lxc-specific docs, tests, and implementation files in the cmdeploy/lxc directory,
this PR adds the --ssh-config option to cmdeploy run/dns/status/test commands and pyinfra invocations,
and also to sshexec (Execnet) handling.  This allows for the host to need no DNS entries for a relay,
and route all resolution through ssh-config.  This is used by the "lxc-test" command, which performs
a completely local setup -- again, see docs for more details.

While working on DNS/SSH things i also unified all zone-file handling
to use actual BIND format as it is easy enough to parse back.
This commit is contained in:
holger krekel
2026-03-05 19:31:41 +01:00
parent ff541b81ea
commit 1abdc407af
24 changed files with 2029 additions and 142 deletions

View File

@@ -1,32 +0,0 @@
;
; Required DNS entries for chatmail servers
;
{% if A %}
{{ mail_domain }}. A {{ A }}
{% endif %}
{% if AAAA %}
{{ mail_domain }}. AAAA {{ AAAA }}
{% endif %}
{{ mail_domain }}. MX 10 {{ mail_domain }}.
{% if strict_tls %}
_mta-sts.{{ mail_domain }}. TXT "v=STSv1; id={{ sts_id }}"
mta-sts.{{ mail_domain }}. CNAME {{ mail_domain }}.
{% endif %}
www.{{ mail_domain }}. CNAME {{ mail_domain }}.
{{ dkim_entry }}
;
; Recommended DNS entries for interoperability and security-hardening
;
{{ mail_domain }}. TXT "v=spf1 a ~all"
_dmarc.{{ mail_domain }}. TXT "v=DMARC1;p=reject;adkim=s;aspf=s"
{% if acme_account_url %}
{{ mail_domain }}. CAA 0 issue "letsencrypt.org;accounturi={{ acme_account_url }}"
{% endif %}
_adsp._domainkey.{{ mail_domain }}. TXT "dkim=discardable"
_submission._tcp.{{ mail_domain }}. SRV 0 1 587 {{ mail_domain }}.
_submissions._tcp.{{ mail_domain }}. SRV 0 1 465 {{ mail_domain }}.
_imap._tcp.{{ mail_domain }}. SRV 0 1 143 {{ mail_domain }}.
_imaps._tcp.{{ mail_domain }}. SRV 0 1 993 {{ mail_domain }}.

View File

@@ -18,7 +18,23 @@ from packaging import version
from termcolor import colored
from . import dns, remote
from .sshexec import LocalExec, SSHExec
from .lxc.cli import ( # noqa: F401
lxc_start_cmd,
lxc_start_cmd_options,
lxc_status_cmd,
lxc_status_cmd_options,
lxc_stop_cmd,
lxc_stop_cmd_options,
lxc_test_cmd,
lxc_test_cmd_options,
)
from .sshexec import (
LocalExec,
SSHExec,
resolve_host_from_ssh_config,
resolve_key_from_ssh_config,
)
from .www import main as webdev_main
#
# cmdeploy sub commands and options
@@ -82,18 +98,21 @@ def run_cmd_options(parser):
help="disable checks nslookup for dns",
)
add_ssh_host_option(parser)
add_ssh_config_option(parser)
def run_cmd(args, out):
"""Deploy chatmail services on the remote server."""
ssh_host = args.ssh_host if args.ssh_host else args.config.mail_domain
sshexec = get_sshexec(ssh_host)
sshexec = get_sshexec(ssh_host, ssh_config=args.ssh_config)
require_iroh = args.config.enable_iroh_relay
strict_tls = args.config.tls_cert_mode == "acme"
if not args.dns_check_disabled:
remote_data = dns.get_initial_remote_data(sshexec, args.config.mail_domain)
if not dns.check_initial_remote_data(remote_data, strict_tls=strict_tls, print=out.red):
if not dns.check_initial_remote_data(
remote_data, strict_tls=strict_tls, print=out.red
):
return 1
env = os.environ.copy()
@@ -108,6 +127,18 @@ def run_cmd(args, out):
pyinf = "pyinfra --dry" if args.dry_run else "pyinfra"
cmd = f"{pyinf} --ssh-user root {ssh_host} {deploy_path} -y"
ssh_config = args.ssh_config
if ssh_config:
ssh_config = str(Path(ssh_config).resolve())
# Use pyinfra's native SSH data keys to configure the connection directly
# rather than relying on paramiko config parsing (see also sshexec.py)
ip = resolve_host_from_ssh_config(ssh_host, ssh_config)
key = resolve_key_from_ssh_config(ssh_host, ssh_config)
data_args = f"--data ssh_hostname={ip} --data ssh_known_hosts_file=/dev/null"
if key:
data_args += f" --data ssh_key={key}"
cmd = f"{pyinf} --ssh-user root {ssh_host} {deploy_path} -y {data_args}"
if ssh_host in ["localhost", "@docker"]:
if ssh_host == "@docker":
env["CHATMAIL_NOPORTCHECK"] = "True"
@@ -122,7 +153,11 @@ def run_cmd(args, out):
out.check_call(cmd, env=env)
if args.website_only:
out.green("Website deployment completed.")
elif not args.dns_check_disabled and strict_tls and not remote_data["acme_account_url"]:
elif (
not args.dns_check_disabled
and strict_tls
and not remote_data["acme_account_url"]
):
out.red("Deploy completed but letsencrypt not configured")
out.red("Run 'cmdeploy run' again")
else:
@@ -139,15 +174,16 @@ def dns_cmd_options(parser):
dest="zonefile",
type=pathlib.Path,
default=None,
help="write out a zonefile",
help="write DNS records in standard BIND format to the given file",
)
add_ssh_host_option(parser)
add_ssh_config_option(parser)
def dns_cmd(args, out):
"""Check DNS entries and optionally generate dns zone file."""
ssh_host = args.ssh_host if args.ssh_host else args.config.mail_domain
sshexec = get_sshexec(ssh_host, verbose=args.verbose)
sshexec = get_sshexec(ssh_host, verbose=args.verbose, ssh_config=args.ssh_config)
tls_cert_mode = args.config.tls_cert_mode
strict_tls = tls_cert_mode == "acme"
remote_data = dns.get_initial_remote_data(sshexec, args.config.mail_domain)
@@ -178,13 +214,14 @@ def dns_cmd(args, out):
def status_cmd_options(parser):
add_ssh_host_option(parser)
add_ssh_config_option(parser)
def status_cmd(args, out):
"""Display status for online chatmail instance."""
ssh_host = args.ssh_host if args.ssh_host else args.config.mail_domain
sshexec = get_sshexec(ssh_host, verbose=args.verbose)
sshexec = get_sshexec(ssh_host, verbose=args.verbose, ssh_config=args.ssh_config)
out.green(f"chatmail domain: {args.config.mail_domain}")
if args.config.privacy_mail:
@@ -204,14 +241,18 @@ def test_cmd_options(parser):
help="also run slow tests",
)
add_ssh_host_option(parser)
add_ssh_config_option(parser)
def test_cmd(args, out):
"""Run local and online tests for chatmail deployment."""
env = os.environ.copy()
env["CHATMAIL_INI"] = str(args.inipath.resolve())
if args.ssh_host:
env["CHATMAIL_SSH"] = args.ssh_host
if args.ssh_config:
env["CHATMAIL_SSH_CONFIG"] = str(Path(args.ssh_config).resolve())
pytest_path = shutil.which("pytest")
pytest_args = [
@@ -276,9 +317,7 @@ def bench_cmd(args, out):
def webdev_cmd(args, out):
"""Run local web development loop for static web pages."""
from .www import main
main()
webdev_main()
#
@@ -321,6 +360,16 @@ def add_ssh_host_option(parser):
)
def add_ssh_config_option(parser):
parser.add_argument(
"--ssh-config",
dest="ssh_config",
type=Path,
default=None,
help="Path to an SSH config file (e.g. lxconfigs/ssh-config).",
)
def add_config_option(parser):
parser.add_argument(
"--config",
@@ -330,6 +379,7 @@ def add_config_option(parser):
type=Path,
help="path to the chatmail.ini file",
)
parser.add_argument(
"--verbose",
"-v",
@@ -340,15 +390,16 @@ def add_config_option(parser):
)
def add_subcommand(subparsers, func):
def add_subcommand(subparsers, func, add_config=True):
name = func.__name__
assert name.endswith("_cmd")
name = name[:-4]
name = name[:-4].replace("_", "-")
doc = func.__doc__.strip()
help = doc.split("\n")[0].strip(".")
p = subparsers.add_parser(name, description=doc, help=help)
p.set_defaults(func=func)
add_config_option(p)
if add_config:
add_config_option(p)
return p
@@ -362,13 +413,15 @@ def get_parser():
"""Return an ArgumentParser for the 'cmdeploy' CLI"""
parser = argparse.ArgumentParser(description=description.strip())
parser.set_defaults(func=None, inipath=None)
subparsers = parser.add_subparsers(title="subcommands")
# find all subcommands in the module namespace
glob = globals()
for name, func in glob.items():
if name.endswith("_cmd"):
subparser = add_subcommand(subparsers, func)
needs_config = not name.startswith("lxc_")
subparser = add_subcommand(subparsers, func, add_config=needs_config)
addopts = glob.get(name + "_options")
if addopts is not None:
addopts(subparser)
@@ -376,26 +429,27 @@ def get_parser():
return parser
def get_sshexec(ssh_host: str, verbose=True):
def get_sshexec(ssh_host: str, verbose=True, ssh_config=None):
if ssh_host in ["localhost", "@local"]:
return LocalExec(verbose, docker=False)
elif ssh_host == "@docker":
return LocalExec(verbose, docker=True)
if verbose:
print(f"[ssh] login to {ssh_host}")
return SSHExec(ssh_host, verbose=verbose)
return SSHExec(ssh_host, verbose=verbose, ssh_config=ssh_config)
def main(args=None):
"""Provide main entry point for 'cmdeploy' CLI invocation."""
parser = get_parser()
args = parser.parse_args(args=args)
if not hasattr(args, "func"):
if args.func is None:
return parser.parse_args(["-h"])
out = Out()
kwargs = {}
if args.func.__name__ not in ("init_cmd", "fmt_cmd"):
if args.inipath is not None and args.func.__name__ not in ("init_cmd", "fmt_cmd"):
if not args.inipath.exists():
out.red(f"expecting {args.inipath} to exist, run init first?")
raise SystemExit(1)

View File

@@ -18,6 +18,7 @@ from pyinfra.facts.systemd import SystemdEnabled
from pyinfra.operations import apt, files, pip, server, systemd
from cmdeploy.cmdeploy import Out
from cmdeploy.util import get_version_string
from .acmetool import AcmetoolDeployer
from .basedeploy import (
@@ -271,8 +272,14 @@ class WebsiteDeployer(Deployer):
logger.warning("Web page build failed, skipping website deployment")
return
# if it is not a hugo page, upload it as is
files.rsync(
f"{www_path}/", "/var/www/html", flags=["-avz", "--chown=www-data"]
# pyinfra files.rsync (experimental) causes problems with ssh-config configuration
# the stable files.sync should do
files.sync(
src=str(www_path),
dest="/var/www/html",
user="www-data",
group="www-data",
delete=True,
)
@@ -532,17 +539,9 @@ class FcgiwrapDeployer(Deployer):
class GithashDeployer(Deployer):
def activate(self):
try:
git_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode()
except Exception:
git_hash = "unknown\n"
try:
git_diff = subprocess.check_output(["git", "diff"]).decode()
except Exception:
git_diff = ""
files.put(
name="Upload chatmail relay git commit hash",
src=StringIO(git_hash + git_diff),
src=StringIO(get_version_string()),
dest="/etc/chatmail-version",
mode="700",
)
@@ -586,11 +585,17 @@ def deploy_chatmail(config_path: Path, disable_mail: bool, website_only: bool) -
)
# Check if mtail_address interface is available (if configured)
if config.mtail_address and config.mtail_address not in ('127.0.0.1', '::1', 'localhost'):
if config.mtail_address and config.mtail_address not in (
"127.0.0.1",
"::1",
"localhost",
):
ipv4_addrs = host.get_fact(hardware.Ipv4Addrs)
all_addresses = [addr for addrs in ipv4_addrs.values() for addr in addrs]
if config.mtail_address not in all_addresses:
Out().red(f"Deploy failed: mtail_address {config.mtail_address} is not available (VPN up?).\n")
Out().red(
f"Deploy failed: mtail_address {config.mtail_address} is not available (VPN up?).\n"
)
exit(1)
if not os.environ.get("CHATMAIL_NOPORTCHECK"):

View File

@@ -1,11 +1,26 @@
import datetime
import importlib
from jinja2 import Template
from . import remote
def parse_zone_records(text):
"""Yield ``(name, ttl, rtype, rdata)`` from standard BIND-format text.
Skips comment lines (starting with ``;``) and blank lines.
Each record line must have the format ``name TTL IN type rdata``.
"""
for raw_line in text.strip().splitlines():
line = raw_line.strip()
if not line or line.startswith(";"):
continue
parts = line.split(None, 4)
if len(parts) < 5:
raise ValueError(f"Bad zone record line: {line}")
name = parts[0].rstrip(".")
# parts[2] is the IN class — ignored
yield name, parts[1], parts[3].upper(), parts[4]
def get_initial_remote_data(sshexec, mail_domain):
return sshexec.logged(
call=remote.rdns.perform_initial_checks, kwargs=dict(mail_domain=mail_domain)
@@ -31,13 +46,36 @@ def get_filled_zone_file(remote_data):
if not sts_id:
remote_data["sts_id"] = datetime.datetime.now().strftime("%Y%m%d%H%M")
template = importlib.resources.files(__package__).joinpath("chatmail.zone.j2")
content = template.read_text()
zonefile = Template(content).render(**remote_data)
lines = [x.strip() for x in zonefile.split("\n") if x.strip()]
d = remote_data["mail_domain"]
lines = ["; Required DNS entries"]
if remote_data.get("A"):
lines.append(f"{d}. 3600 IN A {remote_data['A']}")
if remote_data.get("AAAA"):
lines.append(f"{d}. 3600 IN AAAA {remote_data['AAAA']}")
lines.append(f"{d}. 3600 IN MX 10 {d}.")
if remote_data.get("strict_tls"):
lines.append(
f'_mta-sts.{d}. 3600 IN TXT "v=STSv1; id={remote_data["sts_id"]}"'
)
lines.append(f"mta-sts.{d}. 3600 IN CNAME {d}.")
lines.append(f"www.{d}. 3600 IN CNAME {d}.")
lines.append(remote_data["dkim_entry"])
lines.append("")
zonefile = "\n".join(lines)
return zonefile
lines.append("; Recommended DNS entries")
lines.append(f'{d}. 3600 IN TXT "v=spf1 a ~all"')
lines.append(f'_dmarc.{d}. 3600 IN TXT "v=DMARC1;p=reject;adkim=s;aspf=s"')
if remote_data.get("acme_account_url"):
lines.append(
f"{d}. 3600 IN CAA 0 issue"
f' "letsencrypt.org;accounturi={remote_data["acme_account_url"]}"'
)
lines.append(f'_adsp._domainkey.{d}. 3600 IN TXT "dkim=discardable"')
lines.append(f"_submission._tcp.{d}. 3600 IN SRV 0 1 587 {d}.")
lines.append(f"_submissions._tcp.{d}. 3600 IN SRV 0 1 465 {d}.")
lines.append(f"_imap._tcp.{d}. 3600 IN SRV 0 1 143 {d}.")
lines.append(f"_imaps._tcp.{d}. 3600 IN SRV 0 1 993 {d}.")
lines.append("")
return "\n".join(lines)
def check_full_zone(sshexec, remote_data, out, zonefile) -> int:

View File

@@ -0,0 +1,469 @@
"""lxc-start/stop/status/test subcommands for testing with local containers."""
import os
import subprocess
import time
from contextlib import contextmanager
from ..util import collapse, get_git_hash, get_version_string, shell
from .incus import Incus, RelayContainer
RELAY_NAMES = ("test0", "test1")
# -------------------------------------------------------------------
# lxc-start
# -------------------------------------------------------------------
def lxc_start_cmd_options(parser):
_add_name_args(
parser,
help_text="User relay name(s) to create (default: test0).",
)
parser.add_argument(
"--ipv4-only",
dest="ipv4_only",
action="store_true",
help="Create an IPv4-only container.",
)
parser.add_argument(
"--run",
action="store_true",
help="Run 'cmdeploy run' on each container after starting it.",
)
def lxc_start_cmd(args, out):
"""Create/Ensure and start LXC relay and DNS containers."""
ix = Incus()
relays = [ix.get_container(n) for n in args.names] or [
ix.get_container(RELAY_NAMES[0])
]
out.green("Ensuring DNS container (ns-localchat) ...")
dns_ct = ix.get_dns_container()
dns_ct.ensure()
dns_ip = dns_ct.ipv4
print(f" DNS container IP: {dns_ip}")
for ct in relays:
out.green(f"Ensuring container {ct.name!r} ({ct.domain}) ...")
ct.ensure()
ip = ct.ipv4
print(" Configuring container hostname ...")
ct.configure_hosts(ip)
print(f" Writing {ct.ini.name} ...")
ct.write_ini(disable_ipv6=args.ipv4_only)
print(f" Config: {ct.ini}")
if args.ipv4_only:
ct.disable_ipv6()
ipv6 = None
else:
output = ct.bash(
"ip -6 addr show scope global -deprecated"
" | grep -oP '(?<=inet6 )[^/]+'",
check=False,
)
ipv6 = output.strip() if output else None
print(f" {_format_addrs(ip, ipv6)}")
out.green(f" Container {ct.name!r} ready: {ct.domain} -> {ip}")
print()
# Reset DNS zones only for the containers we just started
started_cnames = {ct.name for ct in relays}
managed = ix.list_managed()
started = [c for c in managed if c["name"] in started_cnames]
if started:
print(
f"Resetting DNS zones for {len(started)} domain(s) (A + AAAA records) ..."
)
dns_ct.reset_dns_records(dns_ip, started)
for ct in relays:
if ct.name in started_cnames:
print(f" Configuring DNS in {ct.name} ...")
ct.configure_dns(dns_ip)
# Generate the unified SSH config
out.green("Writing ssh-config ...")
ssh_cfg = ix.write_ssh_config()
print(f" {ssh_cfg}")
# Verify SSH via the generated config
for ct in relays:
print(f" Verifying SSH to {ct.name} via ssh-config ...")
if ct.verify_ssh(ssh_cfg):
print(f" SSH OK: ssh -F lxconfigs/ssh-config {ct.domain}")
else:
out.red(f" WARNING: SSH verification failed for {ct.name}")
# Print integration suggestions
ssh_cfg = ix.ssh_config_path
if not ix.check_ssh_include():
out.green(
"\n (Optional) To use containers from any SSH client, add to ~/.ssh/config:"
)
out.green(f" Include {ssh_cfg}")
# Optionally run cmdeploy run on each relay
if args.run:
for ct in relays:
with _section(out, f"cmdeploy run: {ct.sname} ({ct.domain})"):
ret = _run_cmdeploy("run", ct, ix, extra=["--skip-dns-check"])
if ret:
out.red(f"Deploy to {ct.sname} failed (exit {ret})")
return ret
# -------------------------------------------------------------------
# lxc-stop
# -------------------------------------------------------------------
def lxc_stop_cmd_options(parser):
parser.add_argument(
"--destroy",
action="store_true",
help="Delete containers and their config files after stopping.",
)
parser.add_argument(
"--destroy-all",
dest="destroy_all",
action="store_true",
help="Like --destroy, but also remove the ns-localchat DNS container.",
)
_add_name_args(
parser,
help_text="Container name(s) to stop (default: test0 + test1).",
)
def lxc_stop_cmd(args, out):
"""Stop (and optionally destroy) local LXC relay containers."""
ix = Incus()
names = args.names or RELAY_NAMES
destroy = args.destroy or args.destroy_all
for ct in map(ix.get_container, names):
if destroy:
out.green(f"Destroying container {ct.name!r} ...")
ct.destroy()
else:
out.green(f"Stopping container {ct.name!r} ...")
ct.stop(force=True)
if args.destroy_all:
dns_ct = ix.get_dns_container()
out.green(f"Destroying DNS container {dns_ct.name!r} ...")
dns_ct.destroy()
ix.delete_images()
if destroy:
ix.write_ssh_config()
out.green("LXC containers destroyed.")
else:
out.green("LXC containers stopped.")
# -------------------------------------------------------------------
# lxc-test
# -------------------------------------------------------------------
def lxc_test_cmd_options(parser):
parser.add_argument(
"--one",
action="store_true",
help="Only deploy and test against test0 (skip test1).",
)
def lxc_test_cmd(args, out):
"""Run full LXC pipeline: start, deploy, DNS, zone files, and tests.
All commands run directly on the host using
``--ssh-config lxconfigs/ssh-config`` for SSH access.
"""
ix = Incus()
t_total = time.time()
relay_names = list(RELAY_NAMES)
if args.one:
relay_names = relay_names[:1]
local_hash = get_git_hash()
# Per-relay: start, deploy, then snapshot the first relay as a
# reusable image so the second relay launches pre-deployed.
ipv4_only_flags = {RELAY_NAMES[0]: False, RELAY_NAMES[1]: True}
for ct in map(ix.get_container, relay_names):
name = ct.sname
ipv4_only = ipv4_only_flags.get(name, False)
label = "IPv4-only" if ipv4_only else "dual-stack"
with _section(out, f"LXC: lxc-start {name} ({label})"):
args.names = [name]
args.ipv4_only = ipv4_only
args.run = False
ret = lxc_start_cmd(args, out)
if ret:
return ret
status = _deploy_status(ct, local_hash, ix)
if "IN-SYNC" in status:
_section_line(out, f"cmdeploy run: {name}{status}, skipping")
else:
with _section(out, f"cmdeploy run: {name} ({ct.domain})"):
ret = _run_cmdeploy("run", ct, ix, extra=["--skip-dns-check"])
if ret:
out.red(f"Deploy to {name} failed (exit {ret})")
return ret
# Snapshot the first relay so subsequent ones launch pre-deployed
if not ix.find_relay_image():
with _section(out, "LXC: publishing relay image"):
ct.publish_as_relay_image()
for ct in map(ix.get_container, relay_names):
with _section(out, f"cmdeploy dns: {ct.sname} ({ct.domain})"):
ret = _run_cmdeploy("dns", ct, ix, extra=["--zonefile", str(ct.zone)])
if ret:
out.red(f"DNS for {ct.sname} failed (exit {ret})")
return ret
with _section(out, "LXC: PowerDNS zone update"):
dns_ct = ix.get_dns_container()
for ct in map(ix.get_container, relay_names):
if ct.zone.exists():
zone_data = ct.zone.read_text()
print(f" Loading {ct.zone} into PowerDNS ...")
dns_ct.set_dns_records(zone_data)
with _section(out, "cmdeploy test"):
first = ix.get_container(relay_names[0])
env = None
if len(relay_names) > 1:
env = os.environ.copy()
env["CHATMAIL_DOMAIN2"] = ix.get_container(relay_names[1]).domain
ret = _run_cmdeploy("test", first, ix, **({"env": env} if env else {}))
if ret:
out.red(f"Tests failed (exit {ret})")
return ret
elapsed = time.time() - t_total
_section_line(out, f"lxc-test complete ({elapsed:.1f}s)")
return 0
# -------------------------------------------------------------------
# lxc-status
# -------------------------------------------------------------------
def lxc_status_cmd_options(parser):
pass
def lxc_status_cmd(args, out):
"""Show status of local LXC chatmail containers."""
ix = Incus()
containers = ix.list_managed()
if not containers:
out.red("No LXC containers found. Run 'cmdeploy lxc-start' first.")
return 1
local_hash = get_git_hash()
# Get storage pool path for display
storage_path = None
data = ix.run_json(["storage", "show", "default"], check=False)
if data:
storage_path = data.get("config", {}).get("source")
if storage_path:
out.green(f"Containers: ({storage_path})")
else:
out.green("Containers:")
dns_ip = None
for c in containers:
_print_container_status(c, ix, local_hash)
if c["name"] == ix.get_dns_container().name:
dns_ip = c["ip"]
_print_ssh_status(out, ix)
_print_dns_forwarding_status(out, dns_ip)
return 0
def _print_container_status(c, ix, local_hash):
"""Print name/status, domain/IPs, and RAM for one container."""
cname = c["name"]
is_running = c.get("status") == "Running"
ct = ix.get_container(cname)
# First line: name + running/STOPPED + deploy status
if not is_running:
tag = "STOPPED"
elif not isinstance(ct, RelayContainer):
tag = "running"
else:
tag = f"running {_deploy_status(ct, local_hash, ix)}"
print(f" {cname:20s} {tag}")
# Second line: domain, IPv4, IPv6
domain = c.get("domain", "")
ip = c.get("ip") or "?"
ipv6 = c.get("ipv6")
print(f" {domain:20s} {_format_addrs(ip, ipv6)}")
# Third line: RAM (RSS), config
indent = " " * 21
try:
used, total = ct.rss_mib()
except Exception:
ram_str = "RSS ?"
else:
ram_str = f"RSS {used}/{total} MiB ({used * 100 // total}%)"
if isinstance(ct, RelayContainer):
detail = f"{ram_str}, config: {os.path.relpath(ct.ini)}"
else:
detail = ram_str
print(f" {indent}{detail}")
print()
def _print_ssh_status(out, ix):
"""Print SSH integration status."""
print()
ssh_cfg = ix.ssh_config_path
if ix.check_ssh_include():
out.green("SSH: ~/.ssh/config includes lxconfigs/ssh-config ✓")
else:
out.red("SSH: ~/.ssh/config does NOT include lxconfigs/ssh-config")
print(" Add to ~/.ssh/config:")
print(f" Include {ssh_cfg}")
def _print_dns_forwarding_status(out, dns_ip):
"""Print host DNS forwarding status for .localchat."""
if not dns_ip:
out.red("DNS: ns-localchat container not found")
return
try:
rv = shell("resolvectl status incusbr0", timeout=5)
dns_ok = dns_ip in rv.stdout and "localchat" in rv.stdout
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
dns_ok = None
if dns_ok is True:
out.green(f"DNS: .localchat forwarding to {dns_ip}")
elif dns_ok is False:
out.red("DNS: .localchat forwarding NOT configured")
print(" Run:")
print(f" sudo resolvectl dns incusbr0 {dns_ip}")
print(" sudo resolvectl domain incusbr0 ~localchat")
else:
print(" DNS: .localchat forwarding status UNKNOWN")
# -------------------------------------------------------------------
# Internal helpers
# -------------------------------------------------------------------
def _format_addrs(ip, ipv6=None):
parts = [f"IPv4 {ip}"]
if ipv6:
parts.append(f"IPv6 {ipv6}")
return ", ".join(parts)
SECTION_WIDTH = 72
@contextmanager
def _section(out, title):
bar = "\u2501" * (SECTION_WIDTH - len(title) - 5)
out.green(f"\u2501\u2501\u2501 {title} {bar}")
t0 = time.time()
yield
elapsed = time.time() - t0
print(f"{'':>{SECTION_WIDTH - 10}}({elapsed:.1f}s)")
print()
def _section_line(out, title):
bar = "\u2501" * (SECTION_WIDTH - len(title) - 5)
out.green(f"\u2501\u2501\u2501 {title} {bar}")
print()
def _deploy_status(ct, local_hash, ix):
"""Return a human-readable deploy status string.
Compares the full deployed version (hash + diff) against
the local state built by :func:`~cmdeploy.util.get_version_string`.
"""
deployed = ct.deployed_version()
if deployed is None:
return "NOT DEPLOYED"
# A container launched from the relay image has the same
# git hash but a different domain — always redeploy.
deployed_domain = ct.deployed_domain()
if deployed_domain and deployed_domain != ct.domain:
return f"DOMAIN-MISMATCH (deployed: {deployed_domain})"
deployed_lines = deployed.splitlines()
deployed_hash = deployed_lines[0] if deployed_lines else ""
short = deployed_hash[:12]
if not local_hash:
return f"UNKNOWN (deployed: {short})"
local_short = local_hash[:12]
if deployed_hash != local_hash:
return f"STALE (deployed: {short}, local: {local_short})"
# Hash matches — check for uncommitted diffs
local_version = get_version_string()
if deployed != local_version:
return f"DIRTY ({local_short}, undeployed changes)"
return f"IN-SYNC ({short})"
def _add_name_args(parser, help_text=None):
"""Add optional positional NAME arguments."""
parser.add_argument(
"names",
nargs="*",
metavar="NAME",
help=help_text or "Relay name(s) to operate on.",
)
def _run_cmdeploy(subcmd, ct, ix, extra=None, **kwargs):
"""Run ``cmdeploy <subcmd>`` with standard --config/--ssh flags.
*ct* is a Container (uses ``ct.ini`` and ``ct.domain``).
Returns the subprocess exit code.
"""
extra_str = " ".join(extra) if extra else ""
cmd = f"""\
cmdeploy {subcmd}
--config {ct.ini}
--ssh-config {ix.ssh_config_path}
--ssh-host {ct.domain}
{extra_str}
"""
if "cwd" not in kwargs:
kwargs["cwd"] = str(ix.project_root)
cmd = collapse(cmd)
print(f" [$ {cmd}]")
return shell(cmd, capture_output=False, **kwargs).returncode

View File

@@ -0,0 +1,638 @@
"""Core Incus operations for local chatmail LXC containers."""
import json
import subprocess
import textwrap
import time
from pathlib import Path
from ..util import shell
LABEL_KEY = "user.localchat-managed"
SSH_KEY_NAME = "id_localchat"
DOMAIN_SUFFIX = ".localchat"
UPSTREAM_IMAGE = "images:debian/12"
BASE_IMAGE_ALIAS = "localchat-base"
BASE_SETUP_NAME = "localchat-base-setup"
RELAY_IMAGE_ALIAS = "localchat-relay"
DNS_CONTAINER_NAME = "ns-localchat"
DNS_DOMAIN = "ns.localchat"
def _extract_ip(net_data, family="inet"):
"""Extract the first global-scope IP of *family* from network state data.
*net_data* is the ``state.network`` dict from ``incus list --format=json``.
*family* is ``"inet"`` for IPv4 or ``"inet6"`` for IPv6.
Returns the address string, or None.
"""
for iface_name, iface in net_data.items():
if iface_name == "lo":
continue
for addr in iface.get("addresses", []):
if addr["family"] == family and addr["scope"] == "global":
return addr["address"]
return None
class Incus:
"""Gateway for all Incus container operations.
Instantiated once per CLI command and passed around so that
all modules share a single entry point for Incus interactions.
"""
def __init__(self):
self.project_root = Path(__file__).resolve().parent.parent.parent.parent.parent
self.lxconfigs_dir = self.project_root / "lxconfigs"
self.lxconfigs_dir.mkdir(exist_ok=True)
self.ssh_key_path = self.lxconfigs_dir / SSH_KEY_NAME
if not self.ssh_key_path.exists():
shell(
f"ssh-keygen -t ed25519 -f {self.ssh_key_path} -N '' -C localchat",
check=True,
)
self.ssh_config_path = self.lxconfigs_dir / "ssh-config"
def write_ssh_config(self):
"""Write ``lxconfigs/ssh-config`` mapping all containers to their IPs.
Each Host block maps the container name, the domain name, and the
short relay name (e.g. ``_test0``) to the container's IP, using the
shared localchat SSH key. Returns the path to the file.
"""
containers = self.list_managed()
key_path = self.ssh_key_path
lines = ["# Auto-generated by cmdeploy lxc-start — do not edit\n"]
for c in containers:
hosts = [c["name"]]
domain = c.get("domain", "")
if domain and domain != c["name"]:
hosts.append(domain)
short = domain.split(".")[0]
if short and short not in hosts:
hosts.append(short)
lines.append(f"\nHost {' '.join(hosts)}\n")
lines.append(f" Hostname {c['ip']}\n")
lines.append(" User root\n")
lines.append(f" IdentityFile {key_path}\n")
lines.append(" IdentitiesOnly yes\n")
lines.append(" StrictHostKeyChecking accept-new\n")
lines.append(" UserKnownHostsFile /dev/null\n")
lines.append(" LogLevel ERROR\n")
path = self.ssh_config_path
path.write_text("".join(lines))
return path
def check_ssh_include(self):
"""Check if the user's ~/.ssh/config already includes our ssh-config."""
user_ssh_config = Path.home() / ".ssh" / "config"
if not user_ssh_config.exists():
return False
lines = filter(None, map(str.strip, user_ssh_config.open("r")))
return f"Include {self.ssh_config_path}" in lines
def run(self, args, check=True, capture=True, input=None):
"""Run an incus command."""
cmd = ["incus"] + list(args)
kwargs = dict(check=check, text=True, input=input)
if capture:
kwargs["capture_output"] = True
else:
kwargs["stdout"] = None
kwargs["stderr"] = None
return subprocess.run(cmd, **kwargs) # noqa: PLW1510
def run_json(self, args, check=True):
"""Run an incus command with ``--format=json``.
Returns the parsed JSON on success.
When *check* is True raises ``subprocess.CalledProcessError``
on non-zero exit; when False returns *None* instead.
"""
result = self.run(
list(args) + ["--format=json"],
check=check,
)
if result.returncode != 0:
return None
return json.loads(result.stdout)
def run_output(self, args, check=True):
"""Run an incus command and return its stdout.
When *check* is False, returns *None* on non-zero exit
instead of raising.
"""
result = self.run(args, check=check)
if result.returncode != 0:
return None
return result.stdout
def _find_image(self, alias):
"""Return *alias* if an image with that alias exists, else None."""
images = self.run_json(["image", "list"], check=False) or []
for img in images:
for a in img.get("aliases", []):
if a.get("name") == alias:
return alias
return None
def find_relay_image(self):
"""Return the relay image alias if it exists, else None."""
return self._find_image(RELAY_IMAGE_ALIAS)
def delete_images(self):
"""Delete the cached base and relay images."""
for alias in (RELAY_IMAGE_ALIAS, BASE_IMAGE_ALIAS):
self.run(["image", "delete", alias], check=False)
def list_managed(self):
"""Return list of dicts with name, ip, ipv6, domain, status, memory_usage."""
containers = []
for ct in self.run_json(["list"]):
config = ct.get("config", {})
if config.get(LABEL_KEY) != "true":
continue
name = ct["name"]
state = ct.get("state", {})
net = state.get("network") or {}
containers.append(
{
"name": name,
"ip": _extract_ip(net, "inet"),
"ipv6": _extract_ip(net, "inet6"),
"domain": config.get(
"user.localchat-domain", f"{name}{DOMAIN_SUFFIX}"
),
"status": ct.get("status", "Unknown"),
"memory_usage": state.get("memory", {}).get("usage", 0),
}
)
return containers
def ensure_base_image(self):
"""Build and cache a base image with openssh and the SSH key.
The image is published as a local incus image with alias
'localchat-base'. Subsequent container launches use this
image instead of the upstream Debian 12, skipping the
slow apt-get install step.
Returns the image alias.
"""
if self._find_image(BASE_IMAGE_ALIAS):
return BASE_IMAGE_ALIAS
print(" Building base image (one-time setup) ...")
self.run(["delete", BASE_SETUP_NAME, "--force"], check=False)
self.run(["image", "delete", BASE_IMAGE_ALIAS], check=False)
self.run(["launch", UPSTREAM_IMAGE, BASE_SETUP_NAME])
ct = Container(self, BASE_SETUP_NAME)
ct.wait_ready()
key_path = self.ssh_key_path
pub_key = key_path.with_suffix(".pub").read_text().strip()
ct.bash(f"""\
apt-get -o DPkg::Lock::Timeout=60 update
DEBIAN_FRONTEND=noninteractive apt-get install -y openssh-server python3
systemctl enable ssh
apt-get clean
mkdir -p /root/.ssh
chmod 700 /root/.ssh
echo '{pub_key}' > /root/.ssh/authorized_keys
chmod 600 /root/.ssh/authorized_keys
""")
self.run(["stop", BASE_SETUP_NAME])
self.run(["publish", BASE_SETUP_NAME, f"--alias={BASE_IMAGE_ALIAS}"])
self.run(["delete", BASE_SETUP_NAME, "--force"])
print(f" Base image '{BASE_IMAGE_ALIAS}' ready.")
return BASE_IMAGE_ALIAS
def get_container(self, name):
"""Return a container handle for the given name.
Accepts both short relay names (``test0``) and full Incus
container names (``test0-localchat``). Returns
``DNSContainer`` for the DNS container and
``RelayContainer`` for everything else.
"""
if name == DNS_CONTAINER_NAME:
return DNSContainer(self)
return RelayContainer(self, name.removesuffix("-localchat"))
def get_dns_container(self):
"""Return a DNSContainer handle."""
return DNSContainer(self)
class Container:
"""Lightweight handle for an Incus container.
Carries the container *name* and provides convenience methods
for running commands, managing lifecycle, and extracting state
so callers don't repeat the name everywhere.
"""
def __init__(self, incus, name, domain=None, memory="100MiB"):
self.incus = incus
self.name = name
self.domain = domain or f"{name}{DOMAIN_SUFFIX}"
self.memory = memory
self.ipv4 = None
self.ipv6 = None
def bash(self, script, check=True):
"""Returns stdout from executing ``bash -ec <script>`` inside this container.
*script* is dedented and stripped so callers can use triple-quoted strings.
When *check* is False, returns *None* on non-zero exit instead of raising.
"""
cmd = ["exec", self.name, "--", "bash", "-ec", textwrap.dedent(script).strip()]
return self.incus.run_output(cmd, check=check)
def run_cmd(self, *args, check=True):
"""Return stdout from running a command directly in the container (no shell).
When *check* is False, returns *None* on non-zero exit instead of raising.
"""
return self.incus.run_output(
["exec", self.name, "--", *args],
check=check,
)
def start(self):
self.incus.run(["start", self.name])
def stop(self, force=False):
cmd = ["stop", self.name]
if force:
cmd.append("--force")
self.incus.run(cmd, check=False)
def launch(self):
"""Launch from the best available image, return the alias used."""
image = self.incus.find_relay_image() or self.incus.ensure_base_image()
print(f" Launching from '{image}' image ...")
cfg = []
cfg += ("-c", f"{LABEL_KEY}=true")
cfg += ("-c", f"user.localchat-domain={self.domain}")
cfg += ("-c", f"limits.memory={self.memory}")
self.incus.run(["launch", image, self.name, *cfg])
return image
def ensure(self):
"""Create/start this container from the cached base image.
On first call, builds the base image (~30s).
Subsequent containers launch in ~2s from the cached image.
Returns ``self`` for chaining.
"""
data = self.incus.run_json(["list", self.name], check=False) or []
existing = [c for c in data if c["name"] == self.name]
if existing:
if existing[0]["status"] != "Running":
self.start()
else:
self.launch()
self.wait_ready()
return self
def destroy(self):
"""Stop, delete, and clean up config files."""
self.stop(force=True)
self.incus.run(["delete", self.name, "--force"], check=False)
def push_file_content(self, dest_path, content):
"""Write *content* to *dest_path* inside the container.
*content* is dedented and stripped so callers can use
indented triple-quoted strings.
"""
content = textwrap.dedent(content).strip() + "\n"
self.incus.run(
["file", "push", "-", f"{self.name}{dest_path}"],
input=content,
)
self.bash(f"chmod 644 {dest_path}")
def wait_ready(self, timeout=60):
"""Wait until the container is running with an IPv4 address.
Sets ``self.ipv4`` and ``self.ipv6`` (may be *None*),
or raises ``TimeoutError``.
"""
deadline = time.time() + timeout
while time.time() < deadline:
data = self.incus.run_json(
["list", self.name],
check=False,
)
if data and data[0].get("status") == "Running":
net = data[0].get("state", {}).get("network", {})
self.ipv4 = _extract_ip(net, "inet")
self.ipv6 = _extract_ip(net, "inet6")
if self.ipv4:
return
time.sleep(1)
raise TimeoutError(
f"Container {self.name!r} did not become ready within {timeout}s"
)
def rss_mib(self):
"""Return ``(used, total)`` memory from container (or None if unobtainable)."""
output = self.bash("free -m", check=False)
if output:
for line in output.splitlines():
if line.startswith("Mem:"):
parts = line.split()
return int(parts[2]), int(parts[1])
class RelayContainer(Container):
"""Container handle for a chatmail relay.
Accepts the short relay name (e.g. ``test0``) and derives
the Incus container name and mail domain automatically.
"""
def __init__(self, incus, name):
super().__init__(
incus,
f"{name}-localchat",
domain=f"_{name}{DOMAIN_SUFFIX}",
memory="500MiB",
)
self.sname = name
self.ini = incus.lxconfigs_dir / f"chatmail-{name}.ini"
self.zone = incus.lxconfigs_dir / f"{name}.zone"
def launch(self):
"""Launch (from a potentially cached image) and clear inherited chatmail-version."""
image = super().launch()
self.bash("rm -f /etc/chatmail-version")
return image
def destroy(self):
"""Stop, delete, and clean up config files."""
super().destroy()
if self.ini.exists():
self.ini.unlink()
def disable_ipv6(self):
"""Disable IPv6 inside the container via sysctl."""
self.bash("""\
sysctl -w net.ipv6.conf.all.disable_ipv6=1
sysctl -w net.ipv6.conf.default.disable_ipv6=1
mkdir -p /etc/sysctl.d
printf 'net.ipv6.conf.all.disable_ipv6=1\\n
net.ipv6.conf.default.disable_ipv6=1\\n'
> /etc/sysctl.d/99-disable-ipv6.conf
""")
def configure_hosts(self, ip):
"""Set hostname and /etc/hosts inside the container."""
self.bash(f"""\
echo '{self.name}' > /etc/hostname
hostname {self.name}
echo '{ip} {self.name} {self.domain}' >> /etc/hosts
""")
def publish_as_relay_image(self):
"""Publish this container as a reusable relay image.
Stops the container, publishes it as 'localchat-relay',
then restarts it.
"""
if self.incus.find_relay_image():
return
print(f" Publishing {self.name!r} as '{RELAY_IMAGE_ALIAS}' image ...")
self.incus.run(
["publish", self.name, f"--alias={RELAY_IMAGE_ALIAS}", "--force"]
)
self.wait_ready()
print(f" Relay image '{RELAY_IMAGE_ALIAS}' ready.")
def deployed_version(self):
"""Read /etc/chatmail-version, or None if absent."""
output = self.bash("cat /etc/chatmail-version", check=False)
return output.strip() if output else None
def deployed_domain(self):
"""Read the domain deployed on the container (postfix myhostname)."""
output = self.bash(
"postconf -h myhostname 2>/dev/null",
check=False,
)
return output.strip() if output else None
def verify_ssh(self, ssh_config):
"""Verify SSH connectivity to this container."""
cmd = f"ssh -F {ssh_config} -o ConnectTimeout=10 root@{self.domain} hostname"
return shell(cmd, timeout=15).returncode == 0
def configure_dns(self, dns_ip):
"""Point this container's resolver at *dns_ip*.
Disables systemd-resolved to free port 53 and writes
a static /etc/resolv.conf. Also configures unbound
(if present) to forward .localchat queries.
"""
self.bash(f"""\
systemctl disable --now systemd-resolved 2>/dev/null || true
rm -f /etc/resolv.conf
echo 'nameserver {dns_ip}' > /etc/resolv.conf
mkdir -p /etc/unbound/unbound.conf.d
printf 'server:\\n domain-insecure: "localchat"\\n\\n
forward-zone:\\n name: "localchat"\\n
forward-addr: {dns_ip}\\n'
> /etc/unbound/unbound.conf.d/localchat-forward.conf
systemctl restart unbound 2>/dev/null || true
""")
def write_ini(self, disable_ipv6=False):
"""Generate a chatmail.ini config file in lxconfigs/."""
from chatmaild.config import write_initial_config
overrides = {
"max_user_send_per_minute": 600,
"max_user_send_burst_size": 100,
"mtail_address": "127.0.0.1",
}
if disable_ipv6:
overrides["disable_ipv6"] = "True"
write_initial_config(self.ini, self.domain, overrides)
return self.ini
class DNSContainer(Container):
"""Specialised container handle for the PowerDNS name server."""
def __init__(self, incus):
super().__init__(incus, DNS_CONTAINER_NAME, domain=DNS_DOMAIN)
def pdnsutil(self, *args, check=True):
"""Run ``pdnsutil <args>`` inside the DNS container."""
return self.run_cmd("pdnsutil", *args, check=check)
def replace_rrset(self, zone, name, rtype, ttl, rdata):
"""Shortcut for ``pdnsutil replace-rrset``."""
self.pdnsutil("replace-rrset", zone, name, rtype, ttl, rdata)
def restart_services(self):
"""Restart pdns and pdns-recursor."""
self.bash("""\
systemctl restart pdns
systemctl restart pdns-recursor || true
""")
def ensure(self):
"""Create the DNS container with PowerDNS if needed.
Calls ``super().ensure()`` to create/start the container
and set up SSH, then installs PowerDNS and configures
the Incus bridge to use this container as DNS.
"""
super().ensure()
self._install_powerdns()
self.incus.run(
["network", "set", "incusbr0", "dns.mode=none"],
check=False,
)
self.incus.run(
["network", "set", "incusbr0", f"raw.dnsmasq=dhcp-option=6,{self.ipv4}"],
check=False,
)
def _install_powerdns(self):
"""Install and configure PowerDNS if not already present."""
if self.run_cmd("which", "pdns_server", check=False) is not None:
return
self.bash("""\
systemctl disable --now systemd-resolved 2>/dev/null || true
rm -f /etc/resolv.conf
echo 'nameserver 9.9.9.9' > /etc/resolv.conf
apt-get -o DPkg::Lock::Timeout=60 update
DEBIAN_FRONTEND=noninteractive apt-get install -y \
pdns-server pdns-backend-sqlite3 sqlite3 pdns-recursor dnsutils
systemctl stop pdns pdns-recursor || true
mkdir -p /var/lib/powerdns
sqlite3 /var/lib/powerdns/pdns.sqlite3 \
</usr/share/doc/pdns-backend-sqlite3/schema.sqlite3.sql
chown -R pdns:pdns /var/lib/powerdns
""")
self.push_file_content(
"/etc/powerdns/pdns.conf",
"""\
launch=gsqlite3
gsqlite3-database=/var/lib/powerdns/pdns.sqlite3
local-address=127.0.0.1
local-port=5353
""",
)
self.push_file_content(
"/etc/powerdns/recursor.conf",
"""\
local-address=0.0.0.0
local-port=53
forward-zones=localchat=127.0.0.1:5353
forward-zones-recurse=.=9.9.9.9;149.112.112.112
allow-from=0.0.0.0/0
dont-query=
dnssec=off
""",
)
self.bash("""\
systemctl start pdns
systemctl start pdns-recursor
echo 'nameserver 127.0.0.1' > /etc/resolv.conf
""")
def reset_dns_records(self, dns_ip, domains):
"""Create DNS zones with initial A records via pdnsutil.
Only sets SOA, NS, and A records as the minimal set
needed for SSH connectivity. Full records (MX, TXT, SRV,
CNAME, DKIM) are added later by ``cmdeploy dns``.
Args:
dns_ip: IP of the DNS container
domains: list of dicts with 'name', 'domain', 'ip'
"""
for d in domains:
domain = d["domain"]
ip = d["ip"]
print(f" {domain} -> {ip}")
# Delete and recreate zone fresh (removes stale records)
self.pdnsutil("delete-zone", domain, check=False)
self.pdnsutil("create-zone", domain, f"ns.{domain}")
serial = str(int(time.time()))
soa = f"ns.{domain} hostmaster.{domain} {serial} 3600 900 604800 300"
self.replace_rrset(domain, ".", "SOA", "3600", soa)
self.replace_rrset(domain, ".", "NS", "3600", f"ns.{domain}.")
self.replace_rrset(domain, ".", "A", "3600", ip)
self.replace_rrset(domain, "ns", "A", "3600", dns_ip)
# AAAA (domain -> container IPv6, if available)
ipv6 = d.get("ipv6")
if ipv6:
self.replace_rrset(domain, ".", "AAAA", "3600", ipv6)
print(f" zone reset: SOA, NS, A, AAAA ({ip}, {ipv6})")
else:
# Remove any stale AAAA record
self.pdnsutil("delete-rrset", domain, ".", "AAAA", check=False)
print(f" zone reset: SOA, NS, A ({ip}, IPv4-only)")
self.restart_services()
def set_dns_records(self, text):
"""Add or overwrite DNS records from standard BIND format.
Uses ``cmdeploy.dns.parse_zone_records`` to parse.
Zones are created automatically from the record names.
"""
from ..dns import parse_zone_records
zones_seen = set()
for name, ttl, rtype, rdata in parse_zone_records(text):
# Derive zone from name: find top-level .localchat domain
name_parts = name.split(".")
zone = name # fallback
for i in range(len(name_parts) - 1):
if name_parts[i + 1 :] == ["localchat"]:
zone = ".".join(name_parts[i:])
break
# Create zone if first time seeing it
if zone not in zones_seen:
self.pdnsutil(
"create-zone",
zone,
f"ns.{zone}",
check=False,
)
zones_seen.add(zone)
# Figure out the record name relative to zone
if name == zone:
relative = "."
elif name.endswith(f".{zone}"):
relative = name[: -(len(zone) + 1)]
else:
relative = name
self.replace_rrset(zone, relative, rtype, ttl, rdata)
if zones_seen:
self.restart_services()

View File

@@ -58,8 +58,8 @@ def get_dkim_entry(mail_domain, pre_command, dkim_selector):
dkim_value = '" "'.join(re.findall(".{1,255}", dkim_value_raw))
web_dkim_value = "".join(re.findall(".{1,255}", dkim_value_raw))
return (
f'{dkim_selector}._domainkey.{mail_domain}. TXT "{dkim_value}"',
f'{dkim_selector}._domainkey.{mail_domain}. TXT "{web_dkim_value}"',
f'{dkim_selector}._domainkey.{mail_domain}. 3600 IN TXT "{dkim_value}"',
f'{dkim_selector}._domainkey.{mail_domain}. 3600 IN TXT "{web_dkim_value}"',
)
@@ -94,9 +94,11 @@ def check_zonefile(zonefile, verbose=True):
if not zf_line.strip() or zf_line.startswith(";"):
continue
print(f"dns-checking {zf_line!r}") if verbose else log_progress("")
zf_domain, zf_typ, zf_value = zf_line.split(maxsplit=2)
zf_domain = zf_domain.rstrip(".")
zf_value = zf_value.strip()
parts = zf_line.split(None, 4)
zf_domain = parts[0].rstrip(".")
# parts[1]=TTL, parts[2]=IN, parts[3]=type, parts[4]=rdata
zf_typ = parts[3]
zf_value = parts[4].strip()
query_value = query_dns(zf_typ, zf_domain)
if zf_value != query_value:
assert zf_typ in ("A", "AAAA", "CNAME", "CAA", "SRV", "MX", "TXT"), zf_line

View File

@@ -12,13 +12,27 @@ def openssl_selfsigned_args(domain, cert_path, key_path, days=36500):
``www.<domain>`` and ``mta-sts.<domain>``.
"""
return [
"openssl", "req", "-x509",
"-newkey", "ec", "-pkeyopt", "ec_paramgen_curve:P-256",
"-noenc", "-days", str(days),
"-keyout", str(key_path),
"-out", str(cert_path),
"-subj", f"/CN={domain}",
"-addext", "extendedKeyUsage=serverAuth,clientAuth",
"openssl",
"req",
"-x509",
"-newkey",
"ec",
"-pkeyopt",
"ec_paramgen_curve:P-256",
"-noenc",
"-days",
str(days),
"-keyout",
str(key_path),
"-out",
str(cert_path),
"-subj",
f"/CN={domain}",
# Mark as end-entity cert so it cannot be used as a CA to sign others.
"-addext",
"basicConstraints=critical,CA:FALSE",
"-addext",
"extendedKeyUsage=serverAuth,clientAuth",
"-addext",
f"subjectAltName=DNS:{domain},DNS:www.{domain},DNS:mta-sts.{domain}",
]
@@ -40,7 +54,9 @@ class SelfSignedTlsDeployer(Deployer):
def configure(self):
args = openssl_selfsigned_args(
self.mail_domain, self.cert_path, self.key_path,
self.mail_domain,
self.cert_path,
self.key_path,
)
cmd = shlex.join(args)
server.shell(

View File

@@ -49,8 +49,13 @@ class SSHExec:
RemoteError = execnet.RemoteError
FuncError = FuncError
def __init__(self, host, verbose=False, python="python3", timeout=60):
self.gateway = execnet.makegateway(f"ssh=root@{host}//python={python}")
def __init__(
self, host, verbose=False, python="python3", timeout=60, ssh_config=None
):
spec = f"ssh=root@{host}//python={python}"
if ssh_config:
spec += f"//ssh_config={ssh_config}"
self.gateway = execnet.makegateway(spec)
self._remote_cmdloop_channel = bootstrap_remote(self.gateway, remote)
self.timeout = timeout
self.verbose = verbose
@@ -113,3 +118,46 @@ class LocalExec:
res = self(call, kwargs, log_callback=remote.rshell.log_progress)
print_stderr()
return res
# pyinfra exposes a ``ssh_config_file`` data key that *should* let
# paramiko parse an SSH config file directly. In practice it silently
# fails to connect (zero hosts / zero operations), so we resolve the
# hostname and identity-file ourselves and pass them via
# ``--data ssh_hostname`` / ``--data ssh_key`` instead.
# Execnet uses ssh natively (and not paramiko) and doesn't have this problem.
def _get_from_ssh_config(host, ssh_config_path, key):
"""Internal helper to parse a value for a specific key from ssh-config."""
current_hosts = []
found_value = None
with open(ssh_config_path) as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split(None, 1)
if not parts:
continue
directive = parts[0].lower()
if directive == "host":
if host in current_hosts and found_value:
return found_value
current_hosts = parts[1].split()
found_value = None
elif directive == key.lower():
found_value = parts[1]
if host in current_hosts and found_value:
return found_value
return None
def resolve_host_from_ssh_config(host, ssh_config_path):
"""Resolve a host alias to its IP from an ssh-config file."""
return _get_from_ssh_config(host, ssh_config_path, "Hostname") or host
def resolve_key_from_ssh_config(host, ssh_config_path):
"""Resolve a host alias to its IdentityFile from an ssh-config file."""
return _get_from_ssh_config(host, ssh_config_path, "IdentityFile")

View File

@@ -1,17 +1,18 @@
; Required DNS entries for chatmail servers
zftest.testrun.org. A 135.181.204.127
zftest.testrun.org. AAAA 2a01:4f9:c012:52f4::1
zftest.testrun.org. MX 10 zftest.testrun.org.
_mta-sts.zftest.testrun.org. TXT "v=STSv1; id=202403211706"
mta-sts.zftest.testrun.org. CNAME zftest.testrun.org.
www.zftest.testrun.org. CNAME zftest.testrun.org.
opendkim._domainkey.zftest.testrun.org. TXT "v=DKIM1;k=rsa;p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoYt82CVUyz2ouaqjX2kB+5J80knAyoOU3MGU5aWppmwUwwTvj/oSTSpkc5JMtVTRmKKr8NUDWAL1Yw7dfGqqPHdHfwwjS3BIvDzYx+hzgtz62RnfNgV+/2MAoNpfX7cAFIHdRzEHNtwugc3RDLquqPoupAE3Y2YRw2T5zG5fILh4vwIcJZL5Uq6B92j8wwJqOex" "33n+vm1NKQ9rxo/UsHAmZlJzpooXcG/4igTBxJyJlamVSRR6N7Nul1v//YJb7J6v2o0iPHW6uE0StzKaPPNC2IVosSRFbD9H2oqppltptFSNPlI0E+t0JBWHem6YK7xcugiO3ImMCaaU8g6Jt/wIDAQAB;s=email;t=s"
; Required DNS entries
zftest.testrun.org. 3600 IN A 135.181.204.127
zftest.testrun.org. 3600 IN AAAA 2a01:4f9:c012:52f4::1
zftest.testrun.org. 3600 IN MX 10 zftest.testrun.org.
_mta-sts.zftest.testrun.org. 3600 IN TXT "v=STSv1; id=202403211706"
mta-sts.zftest.testrun.org. 3600 IN CNAME zftest.testrun.org.
www.zftest.testrun.org. 3600 IN CNAME zftest.testrun.org.
opendkim._domainkey.zftest.testrun.org. 3600 IN TXT "v=DKIM1;k=rsa;p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoYt82CVUyz2ouaqjX2kB+5J80knAyoOU3MGU5aWppmwUwwTvj/oSTSpkc5JMtVTRmKKr8NUDWAL1Yw7dfGqqPHdHfwwjS3BIvDzYx+hzgtz62RnfNgV+/2MAoNpfX7cAFIHdRzEHNtwugc3RDLquqPoupAE3Y2YRw2T5zG5fILh4vwIcJZL5Uq6B92j8wwJqOex" "33n+vm1NKQ9rxo/UsHAmZlJzpooXcG/4igTBxJyJlamVSRR6N7Nul1v//YJb7J6v2o0iPHW6uE0StzKaPPNC2IVosSRFbD9H2oqppltptFSNPlI0E+t0JBWHem6YK7xcugiO3ImMCaaU8g6Jt/wIDAQAB;s=email;t=s"
; Recommended DNS entries
_submission._tcp.zftest.testrun.org. SRV 0 1 587 zftest.testrun.org.
_submissions._tcp.zftest.testrun.org. SRV 0 1 465 zftest.testrun.org.
_imap._tcp.zftest.testrun.org. SRV 0 1 143 zftest.testrun.org.
_imaps._tcp.zftest.testrun.org. SRV 0 1 993 zftest.testrun.org.
zftest.testrun.org. CAA 0 issue "letsencrypt.org;accounturi=https://acme-v02.api.letsencrypt.org/acme/acct/1371472956"
zftest.testrun.org. TXT "v=spf1 a:zftest.testrun.org ~all"
_dmarc.zftest.testrun.org. TXT "v=DMARC1;p=reject;adkim=s;aspf=s"
_adsp._domainkey.zftest.testrun.org. TXT "dkim=discardable"
zftest.testrun.org. 3600 IN TXT "v=spf1 a ~all"
_dmarc.zftest.testrun.org. 3600 IN TXT "v=DMARC1;p=reject;adkim=s;aspf=s"
zftest.testrun.org. 3600 IN CAA 0 issue "letsencrypt.org;accounturi=https://acme-v02.api.letsencrypt.org/acme/acct/1371472956"
_adsp._domainkey.zftest.testrun.org. 3600 IN TXT "dkim=discardable"
_submission._tcp.zftest.testrun.org. 3600 IN SRV 0 1 587 zftest.testrun.org.
_submissions._tcp.zftest.testrun.org. 3600 IN SRV 0 1 465 zftest.testrun.org.
_imap._tcp.zftest.testrun.org. 3600 IN SRV 0 1 143 zftest.testrun.org.
_imaps._tcp.zftest.testrun.org. 3600 IN SRV 0 1 993 zftest.testrun.org.

View File

@@ -1,4 +1,3 @@
import time
def test_tls_imap(benchmark, imap):
def imap_connect():
imap.connect()

View File

@@ -20,7 +20,7 @@ def test_fastcgi_working(maildomain, chatmail_config):
@pytest.mark.filterwarnings("ignore::urllib3.exceptions.InsecureRequestWarning")
def test_newemail_configure(maildomain, rpc, chatmail_config):
def test_newemail_configure(maildomain, maildomain_ip, rpc, chatmail_config):
"""Test configuring accounts by scanning a QR code works."""
url = f"DCACCOUNT:https://{maildomain}/new"
for i in range(3):
@@ -30,12 +30,15 @@ def test_newemail_configure(maildomain, rpc, chatmail_config):
# set_config_from_qr, so fetch credentials via requests instead
res = requests.post(f"https://{maildomain}/new", verify=False)
data = res.json()
rpc.add_or_update_transport(account_id, {
"addr": data["email"],
"password": data["password"],
"imapServer": maildomain,
"smtpServer": maildomain,
"certificateChecks": "acceptInvalidCertificates",
})
rpc.add_or_update_transport(
account_id,
{
"addr": data["email"],
"password": data["password"],
"imapServer": maildomain_ip,
"smtpServer": maildomain_ip,
"certificateChecks": "acceptInvalidCertificates",
},
)
else:
rpc.add_transport_from_qr(account_id, url)

View File

@@ -1,4 +1,5 @@
import datetime
import os
import smtplib
import socket
import subprocess
@@ -13,7 +14,8 @@ from cmdeploy.cmdeploy import get_sshexec
class TestSSHExecutor:
@pytest.fixture(scope="class")
def sshexec(self, sshdomain):
return get_sshexec(sshdomain)
ssh_config = os.environ.get("CHATMAIL_SSH_CONFIG")
return get_sshexec(sshdomain, ssh_config=ssh_config)
def test_ls(self, sshexec):
out = sshexec(call=remote.rdns.shell, kwargs=dict(command="ls"))
@@ -132,11 +134,10 @@ def test_authenticated_from(cmsetup, maildata):
@pytest.mark.parametrize("from_addr", ["fake@example.org", "fake@testrun.org"])
def test_reject_missing_dkim(cmsetup, maildata, from_addr):
domain = cmsetup.maildomain
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
try:
sock.connect((domain, 25))
except socket.timeout:
sock = socket.create_connection((domain, 25), timeout=10)
sock.close()
except (socket.timeout, OSError):
pytest.skip(f"port 25 not reachable for {domain}")
recipient = cmsetup.gen_users(1)[0]

View File

@@ -1,4 +1,5 @@
import ipaddress
import os
import re
import time
@@ -6,8 +7,8 @@ import imap_tools
import pytest
import requests
from cmdeploy.remote import rshell
from cmdeploy.cmdeploy import get_sshexec
from cmdeploy.remote import rshell
@pytest.fixture
@@ -92,7 +93,9 @@ class TestEndToEndDeltaChat:
lp.sec(f"filling remote inbox for {user}")
fn = f"7743102289.M843172P2484002.c20,S={quota},W=2398:2,"
path = chatmail_config.mailboxes_dir.joinpath(user, "cur", fn)
sshexec = get_sshexec(sshdomain)
sshexec = get_sshexec(
sshdomain, ssh_config=os.environ.get("CHATMAIL_SSH_CONFIG")
)
sshexec(call=rshell.write_numbytes, kwargs=dict(path=str(path), num=120))
res = sshexec(call=rshell.dovecot_recalc_quota, kwargs=dict(user=user))
assert res["percent"] >= 100

View File

@@ -9,6 +9,9 @@ def test_status_cmd(chatmail_config, capsys, request):
if os.getenv("CHATMAIL_SSH"):
command.append("--ssh-host")
command.append(os.getenv("CHATMAIL_SSH"))
if os.getenv("CHATMAIL_SSH_CONFIG"):
command.append("--ssh-config")
command.append(os.getenv("CHATMAIL_SSH_CONFIG"))
assert main(command) == 0
status_out = capsys.readouterr()
print(status_out.out)

View File

@@ -2,7 +2,9 @@ import imaplib
import itertools
import os
import random
import re
import smtplib
import socket
import ssl
import subprocess
import time
@@ -20,6 +22,64 @@ def pytest_addoption(parser):
)
def _parse_ssh_config_hosts(path):
"""Parse an OpenSSH config file and return a dict of hostname -> IP."""
mapping = {}
current_names = []
for ln in Path(path).read_text().splitlines():
line = ln.strip()
m = re.match(r"^Host\s+(.+)", line)
if m:
current_names = m.group(1).split()
continue
m = re.match(r"^Hostname\s+(\S+)", line)
if m and current_names:
ip = m.group(1)
for name in current_names:
mapping[name] = ip
current_names = []
return mapping
_original_getaddrinfo = socket.getaddrinfo
def _make_patched_getaddrinfo(host_map):
"""Return a getaddrinfo that resolves hosts in host_map to their IPs."""
def patched_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
if host in host_map:
ip = host_map[host]
return _original_getaddrinfo(ip, port, family, type, proto, flags)
return _original_getaddrinfo(host, port, family, type, proto, flags)
return patched_getaddrinfo
@pytest.fixture(autouse=True, scope="session")
def _setup_localchat_dns():
"""Monkey-patch socket.getaddrinfo to resolve .localchat via ssh-config."""
ssh_config = os.environ.get("CHATMAIL_SSH_CONFIG")
if not ssh_config or not Path(ssh_config).exists():
yield {}
return
host_map = _parse_ssh_config_hosts(ssh_config)
if not host_map:
yield {}
return
socket.getaddrinfo = _make_patched_getaddrinfo(host_map)
try:
yield host_map
finally:
socket.getaddrinfo = _original_getaddrinfo
@pytest.fixture(scope="session")
def ssh_config_host_map(_setup_localchat_dns):
"""Return the host-name → IP map parsed from ssh-config."""
return _setup_localchat_dns
def pytest_configure(config):
config._benchresults = {}
config.addinivalue_line(
@@ -35,6 +95,11 @@ def pytest_runtest_setup(item):
def _get_chatmail_config():
ini = os.environ.get("CHATMAIL_INI")
if ini:
path = Path(ini).resolve()
if path.exists():
return read_config(path), path
current = Path().resolve()
while 1:
path = current.joinpath("chatmail.ini").resolve()
@@ -65,6 +130,12 @@ def sshdomain(maildomain):
return os.environ.get("CHATMAIL_SSH", maildomain)
@pytest.fixture(scope="session")
def maildomain_ip(maildomain, ssh_config_host_map):
"""Return the IP for maildomain from ssh-config, or maildomain itself."""
return ssh_config_host_map.get(maildomain, maildomain)
@pytest.fixture
def maildomain2():
domain = os.environ.get("CHATMAIL_DOMAIN2")
@@ -306,23 +377,32 @@ from deltachat_rpc_client import DeltaChat, Rpc
class ChatmailACFactory:
"""RPC-based account factory for chatmail testing."""
def __init__(self, rpc, maildomain, gencreds, chatmail_config):
def __init__(
self,
rpc,
maildomain,
maildomain_ip,
gencreds,
chatmail_config,
ssh_config_host_map,
):
self.dc = DeltaChat(rpc)
self.rpc = rpc
self._maildomain = maildomain
self._maildomain_ip = maildomain_ip
self.gencreds = gencreds
self.chatmail_config = chatmail_config
self._ssh_config_host_map = ssh_config_host_map
def _make_transport(self, domain):
"""Build a transport config dict for the given domain."""
addr, password = self.gencreds(domain)
server = self._ssh_config_host_map.get(domain, domain)
transport = {
"addr": addr,
"password": password,
# Setting server explicitly skips requesting autoconfig XML,
# see https://datatracker.ietf.org/doc/draft-ietf-mailmaint-autoconfig/
"imapServer": domain,
"smtpServer": domain,
"imapServer": server,
"smtpServer": server,
}
if self.chatmail_config.tls_cert_mode == "self":
transport["certificateChecks"] = "acceptInvalidCertificates"
@@ -376,13 +456,17 @@ def rpc(tmp_path_factory):
@pytest.fixture
def cmfactory(rpc, gencreds, maildomain, chatmail_config):
def cmfactory(
rpc, gencreds, maildomain, maildomain_ip, chatmail_config, ssh_config_host_map
):
"""Return a ChatmailACFactory for creating online Delta Chat accounts."""
return ChatmailACFactory(
rpc=rpc,
maildomain=maildomain,
maildomain_ip=maildomain_ip,
gencreds=gencreds,
chatmail_config=chatmail_config,
ssh_config_host_map=ssh_config_host_map,
)
@@ -394,14 +478,21 @@ def remote(sshdomain):
class Remote:
def __init__(self, sshdomain):
self.sshdomain = sshdomain
self.ssh_config = os.environ.get("CHATMAIL_SSH_CONFIG")
def iter_output(self, logcmd="", ready=None):
getjournal = "journalctl -f" if not logcmd else logcmd
print(self.sshdomain)
match self.sshdomain:
case "@local": command = []
case "localhost": command = []
case _: command = ["ssh", f"root@{self.sshdomain}"]
case "@local":
command = []
case "localhost":
command = []
case _:
command = ["ssh"]
if self.ssh_config:
command.extend(["-F", self.ssh_config])
command.append(f"root@{self.sshdomain}")
[command.append(arg) for arg in getjournal.split()]
self.popen = subprocess.Popen(
command,

View File

@@ -23,7 +23,10 @@ class TestCmdline:
run = parser.parse_args(["run"])
assert init and run
def test_init_not_overwrite(self, capsys):
def test_init_not_overwrite(self, tmp_path, capsys, monkeypatch):
monkeypatch.delenv("CHATMAIL_INI", raising=False)
monkeypatch.chdir(tmp_path)
assert main(["init", "chat.example.org"]) == 0
capsys.readouterr()

View File

@@ -3,7 +3,7 @@ from copy import deepcopy
import pytest
from cmdeploy import remote
from cmdeploy.dns import check_full_zone, check_initial_remote_data
from cmdeploy.dns import check_full_zone, check_initial_remote_data, parse_zone_records
@pytest.fixture
@@ -126,17 +126,11 @@ class TestPerformInitialChecks:
def parse_zonefile_into_dict(zonefile, mockdns_base, only_required=False):
for zf_line in zonefile.split("\n"):
if zf_line.startswith("#"):
if "Recommended" in zf_line and only_required:
return
continue
if not zf_line.strip():
continue
zf_domain, zf_typ, zf_value = zf_line.split(maxsplit=2)
zf_domain = zf_domain.rstrip(".")
zf_value = zf_value.strip()
mockdns_base.setdefault(zf_typ, {})[zf_domain] = zf_value
if only_required:
# Only take records before the "; Recommended" section
zonefile = zonefile.split("; Recommended")[0]
for name, ttl, rtype, rdata in parse_zone_records(zonefile):
mockdns_base.setdefault(rtype, {})[name] = rdata
class MockSSHExec:

View File

@@ -0,0 +1,174 @@
"""Tests for cmdeploy lxc-* subcommands."""
import shutil
import subprocess
import sys
import pytest
from cmdeploy.lxc import cli
from cmdeploy.lxc.incus import Incus
pytestmark = pytest.mark.skipif(
not shutil.which("incus") or not shutil.which("lxc"),
reason="incus/lxc not installed",
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def ix():
return Incus()
@pytest.fixture(scope="session")
def lxc_setup():
ix = Incus()
ix.get_dns_container().ensure()
return ix.list_managed()
@pytest.fixture(scope="session")
def relay_container(lxc_setup):
test_names = {f"{n}-localchat" for n in cli.RELAY_NAMES}
relays = [c for c in lxc_setup if c["name"] in test_names and c.get("ip")]
if not relays:
pytest.skip("no test relay containers running")
return relays[0]
@pytest.fixture
def cmdeploy():
def run(*args):
return subprocess.run(
[sys.executable, "-m", "cmdeploy.cmdeploy", *args],
capture_output=True,
text=True,
check=False,
)
return run
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"subcmd, expected, absent",
[
(None, ["lxc-start", "lxc-stop", "lxc-test", "lxc-status"], ["lxc-destroy"]),
("lxc-start", ["--ipv4-only", "--run"], ["--config"]),
("lxc-stop", ["--destroy", "--destroy-all"], ["--config"]),
("lxc-test", ["--one"], ["--config"]),
("lxc-status", [], ["--config"]),
("run", ["--ssh-config"], ["--lxc"]),
("dns", ["--ssh-config"], []),
("test", ["--ssh-config"], []),
("status", ["--ssh-config"], []),
],
)
def test_help_options(cmdeploy, subcmd, expected, absent):
args = [subcmd, "--help"] if subcmd else ["--help"]
result = cmdeploy(*args)
output = result.stdout + result.stderr
assert result.returncode == 0
for flag in expected:
assert flag in output
for flag in absent:
assert flag not in output
class TestSSHConfig:
def test_lxconfigs(self, ix, lxc_setup):
d = ix.lxconfigs_dir
assert d.name == "lxconfigs"
assert d.exists()
path = ix.ssh_config_path
assert path.name == "ssh-config"
assert path.parent.name == "lxconfigs"
def test_write_ssh_config(self, ix, lxc_setup):
path = ix.write_ssh_config()
assert path.exists()
text = path.read_text()
for c in lxc_setup:
if c.get("ip"):
assert c["name"] in text
assert f"Hostname {c['ip']}" in text
assert "User root" in text
assert "IdentityFile" in text
assert "StrictHostKeyChecking accept-new" in text
def test_dns(ix, relay_container):
def dig(qname, qtype):
ct = ix.get_dns_container()
return ct.bash(f"dig @127.0.0.1 {qname} {qtype} +short").strip()
domain = relay_container["domain"]
assert dig(domain, "A") == relay_container["ip"]
assert domain in dig(domain, "MX")
assert "587" in dig(f"_submission._tcp.{domain}", "SRV")
class TestLxcStatus:
def test_cli_lxc_status_help(self, cmdeploy):
result = cmdeploy("lxc-status", "--help")
assert result.returncode == 0
assert "status" in result.stdout.lower()
def test_shows_containers(self, lxc_setup, capsys):
class QuietOut:
def red(self, msg, **kw):
pass
def green(self, msg, **kw):
pass
ret = cli.lxc_status_cmd(None, QuietOut())
assert ret == 0
captured = capsys.readouterr().out
assert "ns-localchat" in captured
assert "running" in captured
def test_deploy_freshness(self, ix, monkeypatch):
ct = ix.get_container("x")
monkeypatch.setattr(
"cmdeploy.lxc.incus.RelayContainer.deployed_version",
lambda _self: "abc123def456",
)
monkeypatch.setattr(
"cmdeploy.lxc.incus.RelayContainer.deployed_domain",
lambda _self: ct.domain,
)
monkeypatch.setattr(
"cmdeploy.lxc.cli.get_version_string",
lambda: "abc123def456",
)
assert "IN-SYNC" in cli._deploy_status(ct, "abc123def456", ix)
assert "STALE" in cli._deploy_status(ct, "other_hash_here", ix)
# Hash matches but local has uncommitted changes
monkeypatch.setattr(
"cmdeploy.lxc.cli.get_version_string",
lambda: "abc123def456\ndiff --git a/foo",
)
assert "DIRTY" in cli._deploy_status(ct, "abc123def456", ix)
monkeypatch.setattr(
"cmdeploy.lxc.incus.RelayContainer.deployed_version",
lambda _self: None,
)
assert "NOT DEPLOYED" in cli._deploy_status(ct, "abc123", ix)

View File

@@ -0,0 +1,63 @@
"""Shared utility functions for cmdeploy."""
import subprocess
import textwrap
from pathlib import Path
def _project_root():
"""Return the project root directory."""
return Path(__file__).resolve().parent.parent.parent.parent
def collapse(text):
"""Dedent, join lines, and strip a (triple-quoted) string.
Handy for writing shell commands across multiple lines::
cmd = collapse(f\"""
cmdeploy run
--config {ct.ini}
--ssh-host {ct.domain}
\""")
"""
return textwrap.dedent(text).replace("\n", " ").strip()
def shell(cmd, check=False, **kwargs):
"""Run a shell command string with sensible defaults.
*cmd* is passed through :func:`collapse` first, so callers
can use triple-quoted f-strings freely.
Captures stdout/stderr by default; pass ``capture_output=False``
to stream output to the terminal instead.
"""
if "capture_output" not in kwargs and "stdout" not in kwargs:
kwargs["capture_output"] = True
return subprocess.run(collapse(cmd), shell=True, text=True, check=check, **kwargs)
def get_git_hash():
"""Return the local HEAD commit hash, or None."""
result = shell(
"git rev-parse HEAD",
cwd=str(_project_root()),
)
if result.returncode == 0:
return result.stdout.strip()
return None
def get_version_string():
"""Return ``git_hash\\ngit_diff`` for the local working tree.
Used by :class:`~cmdeploy.deployers.GithashDeployer` to write
``/etc/chatmail-version`` and by ``lxc-status`` to compare
the deployed state against the local checkout.
"""
git_hash = get_git_hash() or "unknown"
try:
git_diff = shell("git diff", cwd=str(_project_root())).stdout
except Exception:
git_diff = ""
return git_hash + "\n" + git_diff