lxc: add Out class with --verbose, section timing, and coloured shell output

Move the Out output-printer class to cmdeploy/util.py so it is shared
across CLI modules.  All print/shell calls in lxc/cli.py, lxc/incus.py,
and dns.py now route through Out instead of bare print().

Key additions:
- Out.section() / Out.section_line(): coloured section headers scaled
  to the current terminal width (or $_CMDEPLOY_WIDTH for sub-processes).
- Out.shell(): merges stdout/stderr, prefixes each output line, and
  prints a red error line with the exit code on failure.
- Out.new_prefixed_out(): indented sub-printer that shares section_timings.
- 'cmdeploy -v / -vv' exposes the verbosity levels.
- Tests for Out added to test_util.py.
This commit is contained in:
holger krekel
2026-03-08 18:07:45 +01:00
parent 010e9de08e
commit a102ed7d61
9 changed files with 355 additions and 168 deletions

View File

@@ -85,13 +85,13 @@ def mockout():
captured_green = []
captured_plain = []
def red(self, msg):
def red(self, msg, **kw):
self.captured_red.append(msg)
def green(self, msg):
def green(self, msg, **kw):
self.captured_green.append(msg)
def __call__(self, msg):
def print(self, msg="", **kw):
self.captured_plain.append(msg)
return MockOut()

View File

@@ -15,9 +15,8 @@ from pathlib import Path
import pyinfra
from chatmaild.config import read_config, write_initial_config
from packaging import version
from termcolor import colored
from . import dns, remote
from . import dns, remote # noqa: F401
from .lxc.cli import ( # noqa: F401
lxc_start_cmd,
lxc_start_cmd_options,
@@ -35,6 +34,7 @@ from .sshexec import (
resolve_host_from_ssh_config,
resolve_key_from_ssh_config,
)
from .util import Out
from .www import main as webdev_main
#
@@ -124,6 +124,8 @@ def run_cmd(args, out):
if not args.dns_check_disabled:
env["CHATMAIL_ADDR_V4"] = remote_data.get("A") or ""
env["CHATMAIL_ADDR_V6"] = remote_data.get("AAAA") or ""
env["DEBIAN_FRONTEND"] = "noninteractive"
env["TERM"] = "linux"
deploy_path = importlib.resources.files(__package__).joinpath("run.py").resolve()
pyinf = "pyinfra --dry" if args.dry_run else "pyinfra"
@@ -151,7 +153,10 @@ def run_cmd(args, out):
return 1
try:
out.check_call(cmd, env=env)
ret = out.shell(cmd, env=env)
if ret:
out.red("Deploy failed")
return 1
if args.website_only:
out.green("Website deployment completed.")
elif (
@@ -267,7 +272,7 @@ def test_cmd(args, out):
pytest_args.extend(["--ssh-host", args.ssh_host])
if args.ssh_config:
pytest_args.extend(["--ssh-config", str(Path(args.ssh_config).resolve())])
ret = out.run_ret(pytest_args, env=env)
ret = out.shell(" ".join(pytest_args), env=env)
return ret
@@ -304,8 +309,8 @@ def fmt_cmd(args, out):
format_args.extend(sources)
check_args.extend(sources)
out.check_call(" ".join(format_args), quiet=not args.verbose)
out.check_call(" ".join(check_args), quiet=not args.verbose)
out.shell(" ".join(format_args), quiet=not args.verbose)
out.shell(" ".join(check_args), quiet=not args.verbose)
def bench_cmd(args, out):
@@ -326,32 +331,6 @@ def webdev_cmd(args, out):
#
class Out:
"""Convenience output printer providing coloring."""
def red(self, msg, file=sys.stderr):
print(colored(msg, "red"), file=file)
def green(self, msg, file=sys.stderr):
print(colored(msg, "green"), file=file)
def __call__(self, msg, red=False, green=False, file=sys.stdout):
color = "red" if red else ("green" if green else None)
print(colored(msg, color), file=file)
def check_call(self, arg, env=None, quiet=False):
if not quiet:
self(f"[$ {arg}]", file=sys.stderr)
return subprocess.check_call(arg, shell=True, env=env)
def run_ret(self, args, env=None, quiet=False):
if not quiet:
cmdstring = " ".join(args)
self(f"[$ {cmdstring}]", file=sys.stderr)
proc = subprocess.run(args, env=env, check=False)
return proc.returncode
def add_ssh_host_option(parser):
parser.add_argument(
"--ssh-host",
@@ -381,15 +360,6 @@ def add_config_option(parser):
help="path to the chatmail.ini file",
)
parser.add_argument(
"--verbose",
"-v",
dest="verbose",
action="store_true",
default=False,
help="provide verbose logging",
)
def add_subcommand(subparsers, func, add_config=True):
name = func.__name__
@@ -415,6 +385,14 @@ def get_parser():
parser = argparse.ArgumentParser(description=description.strip())
parser.set_defaults(func=None, inipath=None)
parser.add_argument(
"-v",
"--verbose",
dest="verbose",
action="count",
default=0,
help="increase verbosity (can be repeated: -v, -vv)",
)
subparsers = parser.add_subparsers(title="subcommands")
# find all subcommands in the module namespace
@@ -447,7 +425,7 @@ def main(args=None):
if args.func is None:
return parser.parse_args(["-h"])
out = Out()
out = Out(sepchar="\u2501", verbosity=args.verbose)
kwargs = {}
if args.inipath is not None and args.func.__name__ not in ("init_cmd", "fmt_cmd"):

View File

@@ -17,9 +17,6 @@ from pyinfra.facts.files import Sha256File
from pyinfra.facts.systemd import SystemdEnabled
from pyinfra.operations import apt, files, pip, server, systemd
from cmdeploy.cmdeploy import Out
from cmdeploy.util import get_version_string
from .acmetool import AcmetoolDeployer
from .basedeploy import (
Deployer,
@@ -37,6 +34,7 @@ from .nginx.deployer import NginxDeployer
from .opendkim.deployer import OpendkimDeployer
from .postfix.deployer import PostfixDeployer
from .selfsigned.deployer import SelfSignedTlsDeployer
from .util import Out, get_version_string
from .www import build_webpages, find_merge_conflict, get_paths

View File

@@ -91,18 +91,19 @@ def check_full_zone(sshexec, remote_data, out, zonefile) -> int:
if required_diff:
out.red("Please set required DNS entries at your DNS provider:\n")
for line in required_diff:
out(line)
out("")
out.print(line)
out.print()
returncode = 1
if remote_data.get("dkim_entry") in required_diff:
out(
"If the DKIM entry above does not work with your DNS provider, you can try this one:\n"
out.print(
"If the DKIM entry above does not work with your DNS provider,"
" you can try this one:\n"
)
out(remote_data.get("web_dkim_entry") + "\n")
out.print(remote_data.get("web_dkim_entry") + "\n")
if recommended_diff:
out("WARNING: these recommended DNS entries are not set:\n")
out.print("WARNING: these recommended DNS entries are not set:\n")
for line in recommended_diff:
out(line)
out.print(line)
if not (recommended_diff or required_diff):
out.green("Great! All your DNS entries are verified and correct.")

View File

@@ -1,11 +1,9 @@
"""lxc-start/stop/status/test subcommands for testing with local containers."""
import os
import subprocess
import time
from contextlib import contextmanager
from ..util import collapse, get_git_hash, get_version_string, shell
from ..util import get_git_hash, get_version_string, shell
from .incus import Incus, RelayContainer
RELAY_NAMES = ("test0", "test1")
@@ -36,11 +34,20 @@ def lxc_start_cmd_options(parser):
def lxc_start_cmd(args, out):
"""Create/Ensure and start LXC relay and DNS containers."""
ix = Incus()
with out.section("Preparing container setup"):
_lxc_start_cmd(args, out)
def _lxc_start_cmd(args, out):
ix = Incus(out)
sub = out.new_prefixed_out()
out.green("Ensuring base image ...")
ix.ensure_base_image()
out.green("Ensuring DNS container (ns-localchat) ...")
dns_ct = ix.get_dns_container()
dns_ct.ensure()
print(f" DNS container IP: {dns_ct.ipv4}")
sub.print(f"DNS container IP: {dns_ct.ipv4}")
names = args.names if args.names else RELAY_NAMES
relays = list(ix.get_container(n) for n in names)
@@ -49,12 +56,12 @@ def lxc_start_cmd(args, out):
ct.ensure()
ip = ct.ipv4
print(" Configuring container hostname ...")
sub.print("Configuring container hostname ...")
ct.configure_hosts(ip)
print(f" Writing {ct.ini.name} ...")
sub.print(f"Writing {ct.ini.name} ...")
ct.write_ini(disable_ipv6=args.ipv4_only)
print(f" Config: {ct.ini}")
sub.print(f"Config: {ct.ini}")
if args.ipv4_only:
ct.disable_ipv6()
ipv6 = None
@@ -65,10 +72,10 @@ def lxc_start_cmd(args, out):
check=False,
)
ipv6 = output.strip() if output else None
print(f" {_format_addrs(ip, ipv6)}")
sub.print(f"{_format_addrs(ip, ipv6)}")
out.green(f" Container {ct.name!r} ready: {ct.domain} -> {ip}")
print()
sub.green(f"Container {ct.name!r} ready: {ct.domain} -> {ip}")
out.print()
# Reset DNS zones only for the containers we just started
started_cnames = {ct.name for ct in relays}
@@ -76,42 +83,42 @@ def lxc_start_cmd(args, out):
started = [c for c in managed if c["name"] in started_cnames]
if started:
print(
out.print(
f"Resetting DNS zones for {len(started)} domain(s) (A + AAAA records) ..."
)
dns_ct.reset_dns_records(dns_ct.ipv4, started)
for ct in relays:
if ct.name in started_cnames:
print(f" Configuring DNS in {ct.name} ...")
sub.print(f"Configuring DNS in {ct.name} ...")
ct.configure_dns(dns_ct.ipv4)
# Generate the unified SSH config
out.green("Writing ssh-config ...")
ssh_cfg = ix.write_ssh_config()
print(f" {ssh_cfg}")
sub.print(f"{ssh_cfg}")
# Verify SSH via the generated config
for ct in relays:
print(f" Verifying SSH to {ct.name} via ssh-config ...")
sub.print(f"Verifying SSH to {ct.name} via ssh-config ...")
if ct.verify_ssh(ssh_cfg):
print(f" SSH OK: ssh -F lxconfigs/ssh-config {ct.domain}")
sub.print(f"SSH OK: ssh -F lxconfigs/ssh-config {ct.domain}")
else:
out.red(f" WARNING: SSH verification failed for {ct.name}")
sub.red(f"WARNING: SSH verification failed for {ct.name}")
# Print integration suggestions
ssh_cfg = ix.ssh_config_path
if not ix.check_ssh_include():
out.green(
"\n (Optional) To use containers from any SSH client, add to ~/.ssh/config:"
sub.green(
"\n(Optional) To use containers from any SSH client, add to ~/.ssh/config:"
)
out.green(f" Include {ssh_cfg}")
sub.green(f" Include {ssh_cfg}")
# Optionally run cmdeploy run on each relay
if args.run:
for ct in relays:
with _section(out, f"cmdeploy run: {ct.sname} ({ct.domain})"):
ret = _run_cmdeploy("run", ct, ix, extra=["--skip-dns-check"])
with out.section(f"cmdeploy run: {ct.sname} ({ct.domain})"):
ret = _run_cmdeploy("run", ct, ix, out, extra=["--skip-dns-check"])
if ret:
out.red(f"Deploy to {ct.sname} failed (exit {ret})")
return ret
@@ -142,7 +149,7 @@ def lxc_stop_cmd_options(parser):
def lxc_stop_cmd(args, out):
"""Stop (and optionally destroy) local LXC relay containers."""
ix = Incus()
ix = Incus(out)
names = args.names or RELAY_NAMES
destroy = args.destroy or args.destroy_all
@@ -186,7 +193,7 @@ def lxc_test_cmd(args, out):
All commands run directly on the host using
``--ssh-config lxconfigs/ssh-config`` for SSH access.
"""
ix = Incus()
ix = Incus(out)
t_total = time.time()
relay_names = list(RELAY_NAMES)
if args.one:
@@ -201,59 +208,64 @@ def lxc_test_cmd(args, out):
for ct in map(ix.get_container, relay_names):
name = ct.sname
ipv4_only = ipv4_only_flags.get(name, False)
label = "IPv4-only" if ipv4_only else "dual-stack"
with _section(out, f"LXC: lxc-start {name} ({label})"):
args.names = [name]
args.ipv4_only = ipv4_only
args.run = False
ret = lxc_start_cmd(args, out)
v_flag = " -" + "v" * out.verbosity if out.verbosity > 0 else ""
start_cmd = f"cmdeploy{v_flag} lxc-start {name}"
if ipv4_only:
start_cmd += " --ipv4-only"
with out.section(f"cmdeploy lxc-start: {name}"):
ret = out.shell(start_cmd, cwd=str(ix.project_root))
if ret:
return ret
status = _deploy_status(ct, local_hash, ix)
if "IN-SYNC" in status:
_section_line(out, f"cmdeploy run: {name}{status}, skipping")
else:
with _section(out, f"cmdeploy run: {name} ({ct.domain})"):
ret = _run_cmdeploy("run", ct, ix, extra=["--skip-dns-check"])
with out.section(f"cmdeploy run: {name}"):
if "IN-SYNC" in status:
out.print(f"{name} is {status}, skipping")
else:
ret = _run_cmdeploy("run", ct, ix, out, extra=["--skip-dns-check"])
if ret:
out.red(f"Deploy to {name} failed (exit {ret})")
return ret
# Snapshot the first relay so subsequent ones launch pre-deployed
if not ix.find_relay_image():
with _section(out, "LXC: publishing relay image"):
with out.section("lxc-test: caching relay image"):
ct.publish_as_relay_image()
for ct in map(ix.get_container, relay_names):
with _section(out, f"cmdeploy dns: {ct.sname} ({ct.domain})"):
ret = _run_cmdeploy("dns", ct, ix, extra=["--zonefile", str(ct.zone)])
with out.section(f"cmdeploy dns: {ct.sname} ({ct.domain})"):
ret = _run_cmdeploy("dns", ct, ix, out, extra=["--zonefile", str(ct.zone)])
if ret:
out.red(f"DNS for {ct.sname} failed (exit {ret})")
return ret
with _section(out, "LXC: PowerDNS zone update"):
with out.section(f"lxc-test: loading DNS zones {' & '.join(relay_names)}"):
dns_ct = ix.get_dns_container()
for ct in map(ix.get_container, relay_names):
if ct.zone.exists():
zone_data = ct.zone.read_text()
print(f" Loading {ct.zone} into PowerDNS ...")
out.print(f"Loading {ct.zone} into PowerDNS ...")
dns_ct.set_dns_records(zone_data)
with _section(out, "cmdeploy test"):
with out.section("cmdeploy test"):
first = ix.get_container(relay_names[0])
env = None
if len(relay_names) > 1:
env = os.environ.copy()
env["CHATMAIL_DOMAIN2"] = ix.get_container(relay_names[1]).domain
ret = _run_cmdeploy("test", first, ix, **({"env": env} if env else {}))
ret = _run_cmdeploy("test", first, ix, out, **({"env": env} if env else {}))
if ret:
out.red(f"Tests failed (exit {ret})")
return ret
elapsed = time.time() - t_total
_section_line(out, f"lxc-test complete ({elapsed:.1f}s)")
out.section_line(f"lxc-test complete ({elapsed:.1f}s)")
if out.section_timings:
out.print("Section timings:")
for name, secs in out.section_timings:
out.print(f" {name:.<50s} {secs:5.1f}s")
out.print(f" {'total':.<50s} {elapsed:5.1f}s")
out.section_timings.clear()
return 0
@@ -268,7 +280,7 @@ def lxc_status_cmd_options(parser):
def lxc_status_cmd(args, out):
"""Show status of local LXC chatmail containers."""
ix = Incus()
ix = Incus(out)
containers = ix.list_managed()
if not containers:
out.red("No LXC containers found. Run 'cmdeploy lxc-start' first.")
@@ -281,23 +293,24 @@ def lxc_status_cmd(args, out):
data = ix.run_json(["storage", "show", "default"], check=False)
if data:
storage_path = data.get("config", {}).get("source")
msg = "Container status"
if storage_path:
out.green(f"Containers: ({storage_path})")
else:
out.green("Containers:")
msg += f": {storage_path}"
out.section_line(msg)
dns_ip = None
for c in containers:
_print_container_status(c, ix, local_hash)
_print_container_status(out, c, ix, local_hash)
if c["name"] == ix.get_dns_container().name:
dns_ip = c["ip"]
out.section_line("Host ssh and DNS configuration")
_print_ssh_status(out, ix)
_print_dns_forwarding_status(out, dns_ip)
return 0
def _print_container_status(c, ix, local_hash):
def _print_container_status(out, c, ix, local_hash):
"""Print name/status, domain/IPs, and RAM for one container."""
cname = c["name"]
is_running = c.get("status") == "Running"
@@ -310,16 +323,16 @@ def _print_container_status(c, ix, local_hash):
tag = "running"
else:
tag = f"running {_deploy_status(ct, local_hash, ix)}"
print(f" {cname:20s} {tag}")
out.print(f"{cname:20s} {tag}")
# Second line: domain, IPv4, IPv6
domain = c.get("domain", "")
ip = c.get("ip") or "?"
ipv6 = c.get("ipv6")
print(f" {domain:20s} {_format_addrs(ip, ipv6)}")
out.print(f"{domain:20s} {_format_addrs(ip, ipv6)}")
# Third line: RAM (RSS), config
indent = " " * 21
detail_out = out.new_prefixed_out(" " * 21)
try:
used, total = ct.rss_mib()
except Exception:
@@ -332,41 +345,42 @@ def _print_container_status(c, ix, local_hash):
else:
detail = ram_str
print(f" {indent}{detail}")
print()
detail_out.print(detail)
out.print()
def _print_ssh_status(out, ix):
"""Print SSH integration status."""
print()
ssh_cfg = ix.ssh_config_path
if ix.check_ssh_include():
out.green("SSH: ~/.ssh/config includes lxconfigs/ssh-config ✓")
else:
out.red("SSH: ~/.ssh/config does NOT include lxconfigs/ssh-config")
print(" Add to ~/.ssh/config:")
print(f" Include {ssh_cfg}")
sub = out.new_prefixed_out()
sub.print("Add to ~/.ssh/config:")
sub.print(f" Include {ssh_cfg}")
def _print_dns_forwarding_status(out, dns_ip):
"""Print host DNS forwarding status for .localchat."""
sub = out.new_prefixed_out()
if not dns_ip:
out.red("DNS: ns-localchat container not found")
return
try:
rv = shell("resolvectl status incusbr0", timeout=5)
rv = shell("resolvectl status incusbr0")
dns_ok = dns_ip in rv.stdout and "localchat" in rv.stdout
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
except Exception:
dns_ok = None
if dns_ok is True:
out.green(f"DNS: .localchat forwarding to {dns_ip}")
elif dns_ok is False:
out.red("DNS: .localchat forwarding NOT configured")
print(" Run:")
print(f" sudo resolvectl dns incusbr0 {dns_ip}")
print(" sudo resolvectl domain incusbr0 ~localchat")
sub.print("Run:")
sub.print(f" sudo resolvectl dns incusbr0 {dns_ip}")
sub.print(" sudo resolvectl domain incusbr0 ~localchat")
else:
print(" DNS: .localchat forwarding status UNKNOWN")
sub.print("DNS: .localchat forwarding status UNKNOWN")
# -------------------------------------------------------------------
@@ -381,26 +395,6 @@ def _format_addrs(ip, ipv6=None):
return ", ".join(parts)
SECTION_WIDTH = 72
@contextmanager
def _section(out, title):
bar = "\u2501" * (SECTION_WIDTH - len(title) - 5)
out.green(f"\u2501\u2501\u2501 {title} {bar}")
t0 = time.time()
yield
elapsed = time.time() - t0
print(f"{'':>{SECTION_WIDTH - 10}}({elapsed:.1f}s)")
print()
def _section_line(out, title):
bar = "\u2501" * (SECTION_WIDTH - len(title) - 5)
out.green(f"\u2501\u2501\u2501 {title} {bar}")
print()
def _deploy_status(ct, local_hash, ix):
"""Return a human-readable deploy status string.
@@ -446,15 +440,16 @@ def _add_name_args(parser, help_text=None):
)
def _run_cmdeploy(subcmd, ct, ix, extra=None, **kwargs):
def _run_cmdeploy(subcmd, ct, ix, out, extra=None, **kwargs):
"""Run ``cmdeploy <subcmd>`` with standard --config/--ssh flags.
*ct* is a Container (uses ``ct.ini`` and ``ct.domain``).
Returns the subprocess exit code.
"""
extra_str = " ".join(extra) if extra else ""
v_flag = " -" + "v" * out.verbosity if out.verbosity > 0 else ""
cmd = f"""\
cmdeploy {subcmd}
cmdeploy{v_flag} {subcmd}
--config {ct.ini}
--ssh-config {ix.ssh_config_path}
--ssh-host {ct.domain}
@@ -462,6 +457,4 @@ def _run_cmdeploy(subcmd, ct, ix, extra=None, **kwargs):
"""
if "cwd" not in kwargs:
kwargs["cwd"] = str(ix.project_root)
cmd = collapse(cmd)
print(f" [$ {cmd}]")
return shell(cmd, capture_output=False, **kwargs).returncode
return out.shell(cmd, **kwargs)

View File

@@ -47,7 +47,8 @@ class Incus:
all modules share a single entry point for Incus interactions.
"""
def __init__(self):
def __init__(self, out):
self.out = out
self.project_root = Path(__file__).resolve().parent.parent.parent.parent.parent
self.lxconfigs_dir = self.project_root / "lxconfigs"
self.lxconfigs_dir.mkdir(exist_ok=True)
@@ -98,15 +99,58 @@ class Incus:
return f"Include {self.ssh_config_path}" in lines
def run(self, args, check=True, capture=True, input=None):
"""Run an incus command."""
"""Run an incus command.
When *capture* is True and *verbosity* >= 1, output is streamed
to the terminal line-by-line while also being captured for
later return via result.stdout.
"""
cmd = ["incus"] + list(args)
kwargs = dict(check=check, text=True, input=input)
if capture:
kwargs["capture_output"] = True
else:
kwargs["stdout"] = None
kwargs["stderr"] = None
return subprocess.run(cmd, **kwargs) # noqa: PLW1510
sub = self.out.new_prefixed_out(" ")
if not capture:
# Simple case: let subprocess handle streams (no capture)
if self.out.verbosity >= 1:
sub.print(f"$ {' '.join(cmd)}")
return subprocess.run(
cmd, text=True, input=input, check=check, stdout=None, stderr=None
)
# Capture case: we may need to stream while capturing
if sub.verbosity >= 1:
cmd_lines = " ".join(cmd).splitlines()
sub.print(f"$ {cmd_lines.pop(0)}")
if sub.verbosity >= 2:
for line in cmd_lines:
sub.print(f" {line}")
proc = subprocess.Popen(
cmd,
text=True,
stdin=subprocess.PIPE if input else None,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
stdout_lines = []
if input:
proc.stdin.write(input)
proc.stdin.close()
for line in proc.stdout:
stdout_lines.append(line)
if sub.verbosity >= 2:
sub.print(f" > {line.rstrip()}")
ret = proc.wait()
stdout = "".join(stdout_lines)
if check and ret != 0:
for line in stdout.splitlines():
if sub.verbosity < 1: # and we haven't printed it yet
sub.red(line)
raise subprocess.CalledProcessError(ret, cmd, output=stdout)
return subprocess.CompletedProcess(cmd, ret, stdout=stdout)
def run_json(self, args, check=True):
"""Run an incus command with ``--format=json``.
@@ -186,9 +230,10 @@ class Incus:
Returns the image alias.
"""
if self._find_image(BASE_IMAGE_ALIAS):
self.out.print(f" Base image '{BASE_IMAGE_ALIAS}' already cached.")
return BASE_IMAGE_ALIAS
print(" Building base image (one-time setup) ...")
self.out.print(" Building base image (one-time setup) ...")
self.run(["delete", BASE_SETUP_NAME, "--force"], check=False)
self.run(["image", "delete", BASE_IMAGE_ALIAS], check=False)
@@ -214,7 +259,7 @@ class Incus:
self.run(["stop", BASE_SETUP_NAME])
self.run(["publish", BASE_SETUP_NAME, f"--alias={BASE_IMAGE_ALIAS}"])
self.run(["delete", BASE_SETUP_NAME, "--force"])
print(f" Base image '{BASE_IMAGE_ALIAS}' ready.")
self.out.print(f" Base image '{BASE_IMAGE_ALIAS}' ready.")
return BASE_IMAGE_ALIAS
def get_container(self, name):
@@ -239,6 +284,7 @@ class Container:
def __init__(self, incus, name, domain=None, memory="100MiB"):
self.incus = incus
self.out = incus.out
self.name = name
self.domain = domain or f"{name}{DOMAIN_SUFFIX}"
self.memory = memory
@@ -251,7 +297,8 @@ class Container:
*script* is dedented and stripped so callers can use triple-quoted strings.
When *check* is False, returns *None* on non-zero exit instead of raising.
"""
cmd = ["exec", self.name, "--", "bash", "-ec", textwrap.dedent(script).strip()]
script = textwrap.dedent(script).strip()
cmd = ["exec", self.name, "--", "bash", "-ec", script]
return self.incus.run_output(cmd, check=check)
def run_cmd(self, *args, check=True):
@@ -276,7 +323,7 @@ class Container:
def launch(self):
"""Launch from the best available image, return the alias used."""
image = self.incus.find_relay_image() or self.incus.ensure_base_image()
print(f" Launching from '{image}' image ...")
self.out.print(f" Launching from '{image}' image ...")
cfg = []
cfg += ("-c", f"{LABEL_KEY}=true")
cfg += ("-c", f"user.localchat-domain={self.domain}")
@@ -407,17 +454,18 @@ class RelayContainer(Container):
def publish_as_relay_image(self):
"""Publish this container as a reusable relay image.
Stops the container, publishes it as 'localchat-relay',
then restarts it.
Stops the container, 'publishes' it as 'localchat-relay', then restarts it.
"""
if self.incus.find_relay_image():
return
print(f" Publishing {self.name!r} as '{RELAY_IMAGE_ALIAS}' image ...")
self.out.print(
f" Locally caching {self.name!r} as '{RELAY_IMAGE_ALIAS}' image ..."
)
self.incus.run(
["publish", self.name, f"--alias={RELAY_IMAGE_ALIAS}", "--force"]
)
self.wait_ready()
print(f" Relay image '{RELAY_IMAGE_ALIAS}' ready.")
self.out.print(f" Relay image '{RELAY_IMAGE_ALIAS}' ready.")
def deployed_version(self):
"""Read /etc/chatmail-version, or None if absent."""
@@ -611,7 +659,7 @@ class DNSContainer(Container):
for d in domains:
domain = d["domain"]
ip = d["ip"]
print(f" {domain} -> {ip}")
self.out.print(f" {domain} -> {ip}")
# Delete and recreate zone fresh (removes stale records)
self.pdnsutil("delete-zone", domain, check=False)
@@ -628,11 +676,11 @@ class DNSContainer(Container):
ipv6 = d.get("ipv6")
if ipv6:
self.replace_rrset(domain, ".", "AAAA", "3600", ipv6)
print(f" zone reset: SOA, NS, A, AAAA ({ip}, {ipv6})")
self.out.print(f" zone reset: SOA, NS, A, AAAA ({ip}, {ipv6})")
else:
# Remove any stale AAAA record
self.pdnsutil("delete-rrset", domain, ".", "AAAA", check=False)
print(f" zone reset: SOA, NS, A ({ip}, IPv4-only)")
self.out.print(f" zone reset: SOA, NS, A ({ip}, IPv4-only)")
self.restart_services()

View File

@@ -8,6 +8,7 @@ import pytest
from cmdeploy.lxc import cli
from cmdeploy.lxc.incus import Incus
from cmdeploy.util import Out
pytestmark = pytest.mark.skipif(
not shutil.which("incus"),
@@ -22,12 +23,14 @@ pytestmark = pytest.mark.skipif(
@pytest.fixture
def ix():
return Incus()
out = Out()
return Incus(out)
@pytest.fixture(scope="session")
def lxc_setup():
ix = Incus()
out = Out()
ix = Incus(out)
ix.get_dns_container().ensure()
return ix.list_managed()
@@ -126,7 +129,7 @@ class TestLxcStatus:
assert "status" in result.stdout.lower()
def test_shows_containers(self, lxc_setup, capsys):
class QuietOut:
class QuietOut(Out):
def red(self, msg, **kw):
pass

View File

@@ -1,4 +1,71 @@
from cmdeploy.util import collapse, get_git_hash, get_version_string, shell
import sys
from cmdeploy.util import Out, collapse, get_git_hash, get_version_string, shell
class TestOut:
def test_prefix_default(self, capsys):
out = Out()
out.print("hello")
assert capsys.readouterr().out == "hello\n"
def test_prefix_custom(self, capsys):
out = Out(prefix=">> ")
out.print("hello")
assert capsys.readouterr().out == ">> hello\n"
def test_prefix_print_file(self):
import io
buf = io.StringIO()
out = Out(prefix=":: ")
out.print("msg", file=buf)
assert ":: msg" in buf.getvalue()
def test_new_prefixed_out(self, capsys):
parent = Out(prefix="A")
child = parent.new_prefixed_out("B")
child.print("x")
assert capsys.readouterr().out == "ABx\n"
# shares section_timings
assert child.section_timings is parent.section_timings
def test_section_no_auto_indent(self, capsys):
out = Out(prefix="")
with out.section("test"):
out.print("inside")
captured = capsys.readouterr().out
# "inside" should NOT be indented by section()
lines = captured.strip().splitlines()
inside_line = [l for l in lines if "inside" in l][0]
assert inside_line == "inside"
def test_section_records_timing(self):
out = Out()
with out.section("s1"):
pass
assert len(out.section_timings) == 1
assert out.section_timings[0][0] == "s1"
def test_shell_failure_shows_output(self):
"""When a shell command fails, its output and exit code are shown."""
import subprocess
result = subprocess.run(
[
sys.executable,
"-c",
"from cmdeploy.util import Out; Out(prefix='').shell("
"\"echo 'boom on stderr' >&2; exit 42\")",
],
capture_output=True,
text=True,
check=False,
)
# the command's stderr is merged into stdout by Popen
assert "boom on stderr" in result.stdout
# Out.red() prints the failure notice to stderr
assert "exit code 42" in result.stderr
def test_collapse():

View File

@@ -1,9 +1,108 @@
"""Shared utility functions for cmdeploy."""
import os
import shutil
import subprocess
import sys
import textwrap
import time
from contextlib import contextmanager
from pathlib import Path
from termcolor import colored
class Out:
"""Convenience output printer providing coloring and section formatting."""
def __init__(self, sepchar="\u2501", prefix="", verbosity=0):
self.section_timings = []
self.prefix = prefix
self.sepchar = sepchar
self.verbosity = verbosity
env_width = os.environ.get("_CMDEPLOY_WIDTH")
if env_width:
self.section_width = int(env_width)
else:
self.section_width = shutil.get_terminal_size((80, 24)).columns
def new_prefixed_out(self, newprefix=" "):
"""Return a new Out with an extended prefix,
sharing section_timings with the parent.
"""
out = Out(
sepchar=self.sepchar,
prefix=self.prefix + newprefix,
verbosity=self.verbosity,
)
out.section_timings = self.section_timings
return out
def red(self, msg, file=sys.stderr):
print(colored(self.prefix + msg, "red"), file=file, flush=True)
def green(self, msg, file=sys.stderr):
print(colored(self.prefix + msg, "green"), file=file, flush=True)
def print(self, msg="", **kwargs):
"""Print to stdout with automatic flush."""
if msg:
msg = self.prefix + msg
print(msg, flush=True, **kwargs)
def _format_header(self, title):
"""Return a formatted section header string."""
width = self.section_width - len(self.prefix)
bar = self.sepchar * (width - len(title) - 5)
return f"{self.sepchar * 3} {title} {bar}"
@contextmanager
def section(self, title):
"""Context manager that prints a section header and records elapsed time."""
self.green(self._format_header(title))
t0 = time.time()
yield
elapsed = time.time() - t0
self.section_timings.append((title, elapsed))
def section_line(self, title):
"""Print a section header without timing."""
self.green(self._format_header(title))
def shell(self, cmd, quiet=False, **kwargs):
"""Print *cmd*, run it, and re-print its output with the current prefix.
*cmd* is passed through :func:`collapse`, so callers
can use triple-quoted f-strings freely.
Stdout and stderr are merged, read line-by-line,
and each line is printed with ``self.prefix`` prepended.
When the command exits non-zero, a red error line is printed.
"""
cmd = collapse(cmd)
if not quiet:
self.print(f"$ {cmd}")
indent = self.prefix + " "
env = kwargs.pop("env", None)
if env is None:
env = os.environ.copy()
env["_CMDEPLOY_WIDTH"] = str(self.section_width - len(indent))
proc = subprocess.Popen(
cmd,
shell=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env,
**kwargs,
)
for line in proc.stdout:
sys.stdout.write(indent + line)
sys.stdout.flush()
ret = proc.wait()
if ret:
self.red(f"command failed with exit code {ret}: {cmd}")
return ret
def _project_root():
"""Return the project root directory."""