feat: per-relay image caching, static IPs, and parallel deploy

Switch from a single localchat-relay image to per-relay cached
images (localchat-test0, localchat-test1) and add a DNS image
(localchat-ns).  Assign static IPs via a fixed incusbr0 bridge
subnet (10.200.200.0/24) so containers always get deterministic
addresses.

Container launch is split into 'incus init' + device-override +
'incus start' to set the static IP before boot.

Deploy runs in parallel via _run_cmdeploy_parallel(), which
captures output per-relay and shows progress lines.  Tests now
run in both directions (test0↔test1, test1↔test0).

publish_image() returns bool (True if published, False if cached)
so lxc-test can report cache hits.
This commit is contained in:
holger krekel
2026-03-07 14:40:00 +01:00
parent 6e52bfe8c4
commit 4b79606d49
2 changed files with 269 additions and 62 deletions

View File

@@ -2,10 +2,16 @@
import os
import subprocess
import threading
import time
from contextlib import contextmanager
from ..util import collapse, get_git_hash, get_version_string, shell
from ..util import (
collapse,
get_git_hash,
get_version_string,
shell,
)
from .incus import Incus, RelayContainer
RELAY_NAMES = ("test0", "test1")
@@ -40,6 +46,9 @@ def lxc_start_cmd(args, out):
out.green("Ensuring DNS container (ns-localchat) ...")
dns_ct = ix.get_dns_container()
dns_ct.ensure()
if not ix.find_dns_image():
with _section(out, "LXC: publishing DNS image"):
dns_ct.publish_as_dns_image()
print(f" DNS container IP: {dns_ct.ipv4}")
names = args.names if args.names else RELAY_NAMES
@@ -150,6 +159,9 @@ def lxc_stop_cmd(args, out):
if destroy:
out.green(f"Destroying container {ct.name!r} ...")
ct.destroy()
if hasattr(ct, "image_alias"):
out.green(f" Deleting cached image {ct.image_alias!r} ...")
ix.run(["image", "delete", ct.image_alias], check=False)
else:
out.green(f"Stopping container {ct.name!r} ...")
ct.stop(force=True)
@@ -194,10 +206,10 @@ def lxc_test_cmd(args, out):
local_hash = get_git_hash()
# Per-relay: start, deploy, then snapshot the first relay as a
# reusable image so the second relay launches pre-deployed.
# Per-relay: start containers, then deploy in parallel.
ipv4_only_flags = {RELAY_NAMES[0]: False, RELAY_NAMES[1]: True}
# Phase 1 — start all containers (sequential, fast)
for ct in map(ix.get_container, relay_names):
name = ct.sname
ipv4_only = ipv4_only_flags.get(name, False)
@@ -211,20 +223,34 @@ def lxc_test_cmd(args, out):
if ret:
return ret
# Phase 2 — deploy all relays in parallel
to_deploy = []
for ct in map(ix.get_container, relay_names):
status = _deploy_status(ct, local_hash, ix)
if "IN-SYNC" in status:
_section_line(out, f"cmdeploy run: {name}{status}, skipping")
_section_line(
out, f"cmdeploy run: {ct.sname}{status}, skipping"
)
else:
with _section(out, f"cmdeploy run: {name} ({ct.domain})"):
ret = _run_cmdeploy("run", ct, ix, extra=["--skip-dns-check"])
if ret:
out.red(f"Deploy to {name} failed (exit {ret})")
return ret
to_deploy.append(ct)
# Snapshot the first relay so subsequent ones launch pre-deployed
if not ix.find_relay_image():
with _section(out, "LXC: publishing relay image"):
ct.publish_as_relay_image()
if to_deploy:
with _section(out, "cmdeploy run (parallel)"):
ret = _run_cmdeploy_parallel(
"run", to_deploy, ix, out, extra=["--skip-dns-check"]
)
if ret:
return ret
# Phase 3 — publish images (sequential, fast)
for ct in map(ix.get_container, relay_names):
if ct.publish_image():
_section_line(out, f"LXC: published {ct.sname} image")
else:
_section_line(
out,
f"LXC: publish {ct.sname} image — skipped, cached",
)
for ct in map(ix.get_container, relay_names):
with _section(out, f"cmdeploy dns: {ct.sname} ({ct.domain})"):
@@ -241,16 +267,23 @@ def lxc_test_cmd(args, out):
print(f" Loading {ct.zone} into PowerDNS ...")
dns_ct.set_dns_records(zone_data)
with _section(out, "cmdeploy test"):
first = ix.get_container(relay_names[0])
# Run tests in both directions when two relays are available.
test_pairs = [(0, 1), (1, 0)] if len(relay_names) > 1 else [(0,)]
for pair in test_pairs:
first = ix.get_container(relay_names[pair[0]])
label = first.sname
env = None
if len(relay_names) > 1:
if len(pair) > 1:
second = ix.get_container(relay_names[pair[1]])
label = f"{first.sname} \u2194 {second.sname}"
env = os.environ.copy()
env["CHATMAIL_DOMAIN2"] = ix.get_container(relay_names[1]).domain
ret = _run_cmdeploy("test", first, ix, **({"env": env} if env else {}))
if ret:
out.red(f"Tests failed (exit {ret})")
return ret
env["CHATMAIL_DOMAIN2"] = second.domain
with _section(out, f"cmdeploy test: {label}"):
ret = _run_cmdeploy("test", first, ix, **({"env": env} if env else {}))
if ret:
out.red(f"Tests failed (exit {ret})")
return ret
elapsed = time.time() - t_total
_section_line(out, f"lxc-test complete ({elapsed:.1f}s)")
@@ -446,22 +479,92 @@ def _add_name_args(parser, help_text=None):
)
def _build_cmdeploy_cmd(subcmd, ct, ix, extra=None):
"""Build the ``cmdeploy <subcmd>`` command string."""
extra_str = " ".join(extra) if extra else ""
return collapse(f"""\
cmdeploy {subcmd}
--config {ct.ini}
--ssh-config {ix.ssh_config_path}
--ssh-host {ct.domain}
{extra_str}
""")
def _run_cmdeploy(subcmd, ct, ix, extra=None, **kwargs):
"""Run ``cmdeploy <subcmd>`` with standard --config/--ssh flags.
*ct* is a Container (uses ``ct.ini`` and ``ct.domain``).
Returns the subprocess exit code.
"""
extra_str = " ".join(extra) if extra else ""
cmd = f"""\
cmdeploy {subcmd}
--config {ct.ini}
--ssh-config {ix.ssh_config_path}
--ssh-host {ct.domain}
{extra_str}
"""
cmd = _build_cmdeploy_cmd(subcmd, ct, ix, extra=extra)
if "cwd" not in kwargs:
kwargs["cwd"] = str(ix.project_root)
cmd = collapse(cmd)
print(f" [$ {cmd}]")
return shell(cmd, capture_output=False, **kwargs).returncode
# Number of tail lines to print on failure.
_FAIL_CONTEXT_LINES = 40
def _run_cmdeploy_parallel(subcmd, containers, ix, out, extra=None):
"""Run ``cmdeploy <subcmd>`` for every container in parallel.
Output is captured and filtered: only lines containing
``"Start operation"`` are printed (prefixed with the relay
short-name). On failure the last *_FAIL_CONTEXT_LINES*
lines of that process's output are shown.
"""
procs = [] # list of (container, Popen, collected_lines)
cwd = str(ix.project_root)
for ct in containers:
cmd = _build_cmdeploy_cmd(subcmd, ct, ix, extra=extra)
print(f" [{ct.sname}] $ {cmd}")
proc = subprocess.Popen(
cmd,
shell=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=cwd,
)
procs.append((ct, proc, []))
def _reader(ct, proc, lines):
prefix = f" [{ct.sname}]"
for raw in proc.stdout:
line = raw.rstrip("\n")
lines.append(line)
if "Starting operation" in line:
print(f"{prefix} {line}")
threads = []
for ct, proc, lines in procs:
t = threading.Thread(
target=_reader, args=(ct, proc, lines), daemon=True,
)
t.start()
threads.append(t)
for t in threads:
t.join()
for _, proc, _ in procs:
proc.wait()
# Check results
first_failure = 0
for ct, proc, lines in procs:
if proc.returncode:
out.red(
f"Deploy to {ct.sname} failed "
f"(exit {proc.returncode})"
)
tail = lines[-_FAIL_CONTEXT_LINES:]
for tl in tail:
print(f" [{ct.sname}] {tl}")
if not first_failure:
first_failure = proc.returncode
return first_failure

View File

@@ -14,11 +14,19 @@ DOMAIN_SUFFIX = ".localchat"
UPSTREAM_IMAGE = "images:debian/12"
BASE_IMAGE_ALIAS = "localchat-base"
BASE_SETUP_NAME = "localchat-base-setup"
RELAY_IMAGE_ALIAS = "localchat-relay"
DNS_IMAGE_ALIAS = "localchat-ns"
DNS_CONTAINER_NAME = "ns-localchat"
DNS_DOMAIN = "ns.localchat"
BRIDGE_IPV4 = "10.200.200.1/24"
DNS_IP = "10.200.200.2"
RELAY_IPS = {
"test0": "10.200.200.10",
"test1": "10.200.200.11",
"test2": "10.200.200.12",
}
def _extract_ip(net_data, family="inet"):
"""Extract the first global-scope IP of *family* from network state data.
@@ -139,14 +147,16 @@ class Incus:
return alias
return None
def find_relay_image(self):
"""Return the relay image alias if it exists, else None."""
return self._find_image(RELAY_IMAGE_ALIAS)
def find_dns_image(self):
"""Return the DNS image alias if it exists, else None."""
return self._find_image(DNS_IMAGE_ALIAS)
def delete_images(self):
"""Delete the cached base and relay images."""
for alias in (RELAY_IMAGE_ALIAS, BASE_IMAGE_ALIAS):
"""Delete all cached localchat images."""
for alias in (DNS_IMAGE_ALIAS, BASE_IMAGE_ALIAS):
self.run(["image", "delete", alias], check=False)
for name in RELAY_IPS:
self.run(["image", "delete", f"localchat-{name}"], check=False)
def list_managed(self):
"""Return list of dicts with name, ip, ipv6, domain, status, memory_usage."""
@@ -188,14 +198,25 @@ class Incus:
self.run(["delete", BASE_SETUP_NAME, "--force"], check=False)
self.run(["image", "delete", BASE_IMAGE_ALIAS], check=False)
self.run(["launch", UPSTREAM_IMAGE, BASE_SETUP_NAME])
self.run(
["launch", UPSTREAM_IMAGE, BASE_SETUP_NAME, "-c", "limits.memory=512MiB"]
)
ct = Container(self, BASE_SETUP_NAME)
ct = Container(self, BASE_SETUP_NAME, memory="512MiB")
ct.wait_ready()
key_path = self.ssh_key_path
pub_key = key_path.with_suffix(".pub").read_text().strip()
ct.bash(f"""\
print(" ── apt-get install (base image) ──")
ct.bash(
f"""\
systemctl disable --now systemd-resolved 2>/dev/null || true
rm -f /etc/resolv.conf
echo 'nameserver 9.9.9.9' > /etc/resolv.conf
while fuser /var/lib/apt/lists/lock >/dev/null 2>&1 ; do
echo "Waiting for other apt-get instance to finish..."
sleep 5
done
apt-get -o DPkg::Lock::Timeout=60 update
DEBIAN_FRONTEND=noninteractive apt-get install -y openssh-server python3
systemctl enable ssh
@@ -204,7 +225,10 @@ class Incus:
chmod 700 /root/.ssh
echo '{pub_key}' > /root/.ssh/authorized_keys
chmod 600 /root/.ssh/authorized_keys
""")
""",
capture=False,
)
print(" ── base image install done ──")
self.run(["stop", BASE_SETUP_NAME])
self.run(["publish", BASE_SETUP_NAME, f"--alias={BASE_IMAGE_ALIAS}"])
@@ -212,6 +236,28 @@ class Incus:
print(f" Base image '{BASE_IMAGE_ALIAS}' ready.")
return BASE_IMAGE_ALIAS
def ensure_bridge(self):
"""Ensure incusbr0 exists and uses our fixed IPv4 subnet."""
bridge = self.run_json(["network", "show", "incusbr0"], check=False)
if bridge and bridge.get("config", {}).get("ipv4.address") == BRIDGE_IPV4:
return
print(f" Configuring incusbr0 with static subnet {BRIDGE_IPV4} ...")
if not bridge:
self.run(["network", "create", "incusbr0"], check=False)
self.run(
[
"network",
"set",
"incusbr0",
f"ipv4.address={BRIDGE_IPV4}",
"ipv4.nat=true",
"ipv6.address=none",
"dns.mode=none",
]
)
def get_container(self, name):
"""Return a container handle for the given name.
@@ -237,21 +283,25 @@ class Container:
so callers don't repeat the name everywhere.
"""
def __init__(self, incus, name, domain=None, memory="100MiB"):
def __init__(self, incus, name, domain=None, memory="200MiB", ipv4=None):
self.incus = incus
self.name = name
self.domain = domain or f"{name}{DOMAIN_SUFFIX}"
self.memory = memory
self.ipv4 = None
self.ipv4 = ipv4
self.ipv6 = None
def bash(self, script, check=True):
def bash(self, script, check=True, capture=True):
"""Returns stdout from executing ``bash -ec <script>`` inside this container.
*script* is dedented and stripped so callers can use triple-quoted strings.
When *check* is False, returns *None* on non-zero exit instead of raising.
When *capture* is False, output streams to the terminal and None is returned.
"""
cmd = ["exec", self.name, "--", "bash", "-ec", textwrap.dedent(script).strip()]
if not capture:
self.incus.run(cmd, check=check, capture=False)
return None
return self.incus.run_output(cmd, check=check)
def run_cmd(self, *args, check=True):
@@ -273,15 +323,28 @@ class Container:
cmd.append("--force")
self.incus.run(cmd, check=False)
def launch(self):
"""Launch from the best available image, return the alias used."""
image = self.incus.find_relay_image() or self.incus.ensure_base_image()
print(f" Launching from '{image}' image ...")
def launch(self, image=None):
"""Launch from the specified image, or the base image if None."""
self.incus.ensure_bridge()
if image is None:
image = self.incus.ensure_base_image()
cfg = []
cfg += ("-c", f"{LABEL_KEY}=true")
cfg += ("-c", f"user.localchat-domain={self.domain}")
cfg += ("-c", f"limits.memory={self.memory}")
self.incus.run(["launch", image, self.name, *cfg])
self.incus.run(["init", image, self.name, *cfg])
if self.ipv4:
self.incus.run(
[
"config",
"device",
"override",
self.name,
"eth0",
f"ipv4.address={self.ipv4}",
]
)
self.incus.run(["start", self.name])
return image
def ensure(self):
@@ -294,12 +357,19 @@ class Container:
data = self.incus.run_json(["list", self.name], check=False) or []
existing = [c for c in data if c["name"] == self.name]
image = None
if existing:
if existing[0]["status"] != "Running":
status = existing[0]["status"]
if status != "Running":
print(f" Starting stopped {self.name} container ...")
self.start()
else:
print(f" {self.name} already running")
else:
self.launch()
image = self.launch()
self.wait_ready()
if image:
print(f" Ensured {self.name} (launched from {image!r} image)")
return self
def destroy(self):
@@ -366,14 +436,21 @@ class RelayContainer(Container):
f"{name}-localchat",
domain=f"_{name}{DOMAIN_SUFFIX}",
memory="500MiB",
ipv4=RELAY_IPS.get(name),
)
self.sname = name
self.image_alias = f"localchat-{name}"
self.ini = incus.lxconfigs_dir / f"chatmail-{name}.ini"
self.zone = incus.lxconfigs_dir / f"{name}.zone"
def launch(self):
"""Launch (from a potentially cached image) and clear inherited chatmail-version."""
image = super().launch()
"""Launch from a cached per-relay image if available, else from base."""
cached = self.incus._find_image(self.image_alias)
if cached:
print(f" Using cached image {cached!r}")
else:
print(" No cached image, building from base")
image = super().launch(image=cached)
self.bash("rm -f /etc/chatmail-version")
return image
@@ -403,20 +480,23 @@ class RelayContainer(Container):
echo '{ip} {self.name} {self.domain}' >> /etc/hosts
""")
def publish_as_relay_image(self):
"""Publish this container as a reusable relay image.
def publish_image(self):
"""Publish this container as a reusable per-relay image.
Stops the container, publishes it as 'localchat-relay',
then restarts it.
Returns True if an image was published,
False if a cached image already existed.
"""
if self.incus.find_relay_image():
return
print(f" Publishing {self.name!r} as '{RELAY_IMAGE_ALIAS}' image ...")
if self.incus._find_image(self.image_alias):
return False
self.bash("apt-get clean && rm -rf /var/lib/apt/lists/*")
print(f" Publishing {self.name!r} as {self.image_alias!r} image ...")
self.incus.run(
["publish", self.name, f"--alias={RELAY_IMAGE_ALIAS}", "--force"]
["publish", self.name, f"--alias={self.image_alias}", "--force"],
capture=False,
)
self.wait_ready()
print(f" Relay image '{RELAY_IMAGE_ALIAS}' ready.")
print(f" Image {self.image_alias!r} ready.")
return True
def deployed_version(self):
"""Read /etc/chatmail-version, or None if absent."""
@@ -472,7 +552,31 @@ class DNSContainer(Container):
"""Specialised container handle for the PowerDNS name server."""
def __init__(self, incus):
super().__init__(incus, DNS_CONTAINER_NAME, domain=DNS_DOMAIN)
super().__init__(
incus, DNS_CONTAINER_NAME, domain=DNS_DOMAIN, memory="256MiB", ipv4=DNS_IP
)
def launch(self):
"""Launch from cached DNS image if available, else from base image."""
cached = self.incus._find_image(DNS_IMAGE_ALIAS)
if cached:
print(f" Using cached image {cached!r}")
else:
print(" No cached image, building from base")
return super().launch(image=cached)
def publish_as_dns_image(self):
"""Publish this container as a reusable DNS image."""
if self.incus._find_image(DNS_IMAGE_ALIAS):
return
self.bash("apt-get clean && rm -rf /var/lib/apt/lists/*")
print(f" Publishing {self.name!r} as {DNS_IMAGE_ALIAS!r} image ...")
self.incus.run(
["publish", self.name, f"--alias={DNS_IMAGE_ALIAS}", "--force"],
capture=False,
)
self.wait_ready()
print(f" DNS image {DNS_IMAGE_ALIAS!r} ready.")
def pdnsutil(self, *args, check=True):
"""Run ``pdnsutil <args>`` inside the DNS container."""