From a102ed7d61aae75571e91a61ad7c9a92d8ea7db2 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Sun, 8 Mar 2026 18:07:45 +0100 Subject: [PATCH] lxc: add Out class with --verbose, section timing, and coloured shell output Move the Out output-printer class to cmdeploy/util.py so it is shared across CLI modules. All print/shell calls in lxc/cli.py, lxc/incus.py, and dns.py now route through Out instead of bare print(). Key additions: - Out.section() / Out.section_line(): coloured section headers scaled to the current terminal width (or $_CMDEPLOY_WIDTH for sub-processes). - Out.shell(): merges stdout/stderr, prefixes each output line, and prints a red error line with the exit code on failure. - Out.new_prefixed_out(): indented sub-printer that shares section_timings. - 'cmdeploy -v / -vv' exposes the verbosity levels. - Tests for Out added to test_util.py. --- chatmaild/src/chatmaild/tests/plugin.py | 6 +- cmdeploy/src/cmdeploy/cmdeploy.py | 62 +++----- cmdeploy/src/cmdeploy/deployers.py | 4 +- cmdeploy/src/cmdeploy/dns.py | 15 +- cmdeploy/src/cmdeploy/lxc/cli.py | 171 +++++++++++------------ cmdeploy/src/cmdeploy/lxc/incus.py | 88 +++++++++--- cmdeploy/src/cmdeploy/tests/test_lxc.py | 9 +- cmdeploy/src/cmdeploy/tests/test_util.py | 69 ++++++++- cmdeploy/src/cmdeploy/util.py | 99 +++++++++++++ 9 files changed, 355 insertions(+), 168 deletions(-) diff --git a/chatmaild/src/chatmaild/tests/plugin.py b/chatmaild/src/chatmaild/tests/plugin.py index b57418a3..165de3e0 100644 --- a/chatmaild/src/chatmaild/tests/plugin.py +++ b/chatmaild/src/chatmaild/tests/plugin.py @@ -85,13 +85,13 @@ def mockout(): captured_green = [] captured_plain = [] - def red(self, msg): + def red(self, msg, **kw): self.captured_red.append(msg) - def green(self, msg): + def green(self, msg, **kw): self.captured_green.append(msg) - def __call__(self, msg): + def print(self, msg="", **kw): self.captured_plain.append(msg) return MockOut() diff --git a/cmdeploy/src/cmdeploy/cmdeploy.py b/cmdeploy/src/cmdeploy/cmdeploy.py index d9b432cf..29d14046 100644 --- a/cmdeploy/src/cmdeploy/cmdeploy.py +++ b/cmdeploy/src/cmdeploy/cmdeploy.py @@ -15,9 +15,8 @@ from pathlib import Path import pyinfra from chatmaild.config import read_config, write_initial_config from packaging import version -from termcolor import colored -from . import dns, remote +from . import dns, remote # noqa: F401 from .lxc.cli import ( # noqa: F401 lxc_start_cmd, lxc_start_cmd_options, @@ -35,6 +34,7 @@ from .sshexec import ( resolve_host_from_ssh_config, resolve_key_from_ssh_config, ) +from .util import Out from .www import main as webdev_main # @@ -124,6 +124,8 @@ def run_cmd(args, out): if not args.dns_check_disabled: env["CHATMAIL_ADDR_V4"] = remote_data.get("A") or "" env["CHATMAIL_ADDR_V6"] = remote_data.get("AAAA") or "" + env["DEBIAN_FRONTEND"] = "noninteractive" + env["TERM"] = "linux" deploy_path = importlib.resources.files(__package__).joinpath("run.py").resolve() pyinf = "pyinfra --dry" if args.dry_run else "pyinfra" @@ -151,7 +153,10 @@ def run_cmd(args, out): return 1 try: - out.check_call(cmd, env=env) + ret = out.shell(cmd, env=env) + if ret: + out.red("Deploy failed") + return 1 if args.website_only: out.green("Website deployment completed.") elif ( @@ -267,7 +272,7 @@ def test_cmd(args, out): pytest_args.extend(["--ssh-host", args.ssh_host]) if args.ssh_config: pytest_args.extend(["--ssh-config", str(Path(args.ssh_config).resolve())]) - ret = out.run_ret(pytest_args, env=env) + ret = out.shell(" ".join(pytest_args), env=env) return ret @@ -304,8 +309,8 @@ def fmt_cmd(args, out): format_args.extend(sources) check_args.extend(sources) - out.check_call(" ".join(format_args), quiet=not args.verbose) - out.check_call(" ".join(check_args), quiet=not args.verbose) + out.shell(" ".join(format_args), quiet=not args.verbose) + out.shell(" ".join(check_args), quiet=not args.verbose) def bench_cmd(args, out): @@ -326,32 +331,6 @@ def webdev_cmd(args, out): # -class Out: - """Convenience output printer providing coloring.""" - - def red(self, msg, file=sys.stderr): - print(colored(msg, "red"), file=file) - - def green(self, msg, file=sys.stderr): - print(colored(msg, "green"), file=file) - - def __call__(self, msg, red=False, green=False, file=sys.stdout): - color = "red" if red else ("green" if green else None) - print(colored(msg, color), file=file) - - def check_call(self, arg, env=None, quiet=False): - if not quiet: - self(f"[$ {arg}]", file=sys.stderr) - return subprocess.check_call(arg, shell=True, env=env) - - def run_ret(self, args, env=None, quiet=False): - if not quiet: - cmdstring = " ".join(args) - self(f"[$ {cmdstring}]", file=sys.stderr) - proc = subprocess.run(args, env=env, check=False) - return proc.returncode - - def add_ssh_host_option(parser): parser.add_argument( "--ssh-host", @@ -381,15 +360,6 @@ def add_config_option(parser): help="path to the chatmail.ini file", ) - parser.add_argument( - "--verbose", - "-v", - dest="verbose", - action="store_true", - default=False, - help="provide verbose logging", - ) - def add_subcommand(subparsers, func, add_config=True): name = func.__name__ @@ -415,6 +385,14 @@ def get_parser(): parser = argparse.ArgumentParser(description=description.strip()) parser.set_defaults(func=None, inipath=None) + parser.add_argument( + "-v", + "--verbose", + dest="verbose", + action="count", + default=0, + help="increase verbosity (can be repeated: -v, -vv)", + ) subparsers = parser.add_subparsers(title="subcommands") # find all subcommands in the module namespace @@ -447,7 +425,7 @@ def main(args=None): if args.func is None: return parser.parse_args(["-h"]) - out = Out() + out = Out(sepchar="\u2501", verbosity=args.verbose) kwargs = {} if args.inipath is not None and args.func.__name__ not in ("init_cmd", "fmt_cmd"): diff --git a/cmdeploy/src/cmdeploy/deployers.py b/cmdeploy/src/cmdeploy/deployers.py index 2bee1784..db952e8b 100644 --- a/cmdeploy/src/cmdeploy/deployers.py +++ b/cmdeploy/src/cmdeploy/deployers.py @@ -17,9 +17,6 @@ from pyinfra.facts.files import Sha256File from pyinfra.facts.systemd import SystemdEnabled from pyinfra.operations import apt, files, pip, server, systemd -from cmdeploy.cmdeploy import Out -from cmdeploy.util import get_version_string - from .acmetool import AcmetoolDeployer from .basedeploy import ( Deployer, @@ -37,6 +34,7 @@ from .nginx.deployer import NginxDeployer from .opendkim.deployer import OpendkimDeployer from .postfix.deployer import PostfixDeployer from .selfsigned.deployer import SelfSignedTlsDeployer +from .util import Out, get_version_string from .www import build_webpages, find_merge_conflict, get_paths diff --git a/cmdeploy/src/cmdeploy/dns.py b/cmdeploy/src/cmdeploy/dns.py index b519e71c..adfdb3f4 100644 --- a/cmdeploy/src/cmdeploy/dns.py +++ b/cmdeploy/src/cmdeploy/dns.py @@ -91,18 +91,19 @@ def check_full_zone(sshexec, remote_data, out, zonefile) -> int: if required_diff: out.red("Please set required DNS entries at your DNS provider:\n") for line in required_diff: - out(line) - out("") + out.print(line) + out.print() returncode = 1 if remote_data.get("dkim_entry") in required_diff: - out( - "If the DKIM entry above does not work with your DNS provider, you can try this one:\n" + out.print( + "If the DKIM entry above does not work with your DNS provider," + " you can try this one:\n" ) - out(remote_data.get("web_dkim_entry") + "\n") + out.print(remote_data.get("web_dkim_entry") + "\n") if recommended_diff: - out("WARNING: these recommended DNS entries are not set:\n") + out.print("WARNING: these recommended DNS entries are not set:\n") for line in recommended_diff: - out(line) + out.print(line) if not (recommended_diff or required_diff): out.green("Great! All your DNS entries are verified and correct.") diff --git a/cmdeploy/src/cmdeploy/lxc/cli.py b/cmdeploy/src/cmdeploy/lxc/cli.py index df536e02..c4569d15 100644 --- a/cmdeploy/src/cmdeploy/lxc/cli.py +++ b/cmdeploy/src/cmdeploy/lxc/cli.py @@ -1,11 +1,9 @@ """lxc-start/stop/status/test subcommands for testing with local containers.""" import os -import subprocess import time -from contextlib import contextmanager -from ..util import collapse, get_git_hash, get_version_string, shell +from ..util import get_git_hash, get_version_string, shell from .incus import Incus, RelayContainer RELAY_NAMES = ("test0", "test1") @@ -36,11 +34,20 @@ def lxc_start_cmd_options(parser): def lxc_start_cmd(args, out): """Create/Ensure and start LXC relay and DNS containers.""" - ix = Incus() + + with out.section("Preparing container setup"): + _lxc_start_cmd(args, out) + + +def _lxc_start_cmd(args, out): + ix = Incus(out) + sub = out.new_prefixed_out() + out.green("Ensuring base image ...") + ix.ensure_base_image() out.green("Ensuring DNS container (ns-localchat) ...") dns_ct = ix.get_dns_container() dns_ct.ensure() - print(f" DNS container IP: {dns_ct.ipv4}") + sub.print(f"DNS container IP: {dns_ct.ipv4}") names = args.names if args.names else RELAY_NAMES relays = list(ix.get_container(n) for n in names) @@ -49,12 +56,12 @@ def lxc_start_cmd(args, out): ct.ensure() ip = ct.ipv4 - print(" Configuring container hostname ...") + sub.print("Configuring container hostname ...") ct.configure_hosts(ip) - print(f" Writing {ct.ini.name} ...") + sub.print(f"Writing {ct.ini.name} ...") ct.write_ini(disable_ipv6=args.ipv4_only) - print(f" Config: {ct.ini}") + sub.print(f"Config: {ct.ini}") if args.ipv4_only: ct.disable_ipv6() ipv6 = None @@ -65,10 +72,10 @@ def lxc_start_cmd(args, out): check=False, ) ipv6 = output.strip() if output else None - print(f" {_format_addrs(ip, ipv6)}") + sub.print(f"{_format_addrs(ip, ipv6)}") - out.green(f" Container {ct.name!r} ready: {ct.domain} -> {ip}") - print() + sub.green(f"Container {ct.name!r} ready: {ct.domain} -> {ip}") + out.print() # Reset DNS zones only for the containers we just started started_cnames = {ct.name for ct in relays} @@ -76,42 +83,42 @@ def lxc_start_cmd(args, out): started = [c for c in managed if c["name"] in started_cnames] if started: - print( + out.print( f"Resetting DNS zones for {len(started)} domain(s) (A + AAAA records) ..." ) dns_ct.reset_dns_records(dns_ct.ipv4, started) for ct in relays: if ct.name in started_cnames: - print(f" Configuring DNS in {ct.name} ...") + sub.print(f"Configuring DNS in {ct.name} ...") ct.configure_dns(dns_ct.ipv4) # Generate the unified SSH config out.green("Writing ssh-config ...") ssh_cfg = ix.write_ssh_config() - print(f" {ssh_cfg}") + sub.print(f"{ssh_cfg}") # Verify SSH via the generated config for ct in relays: - print(f" Verifying SSH to {ct.name} via ssh-config ...") + sub.print(f"Verifying SSH to {ct.name} via ssh-config ...") if ct.verify_ssh(ssh_cfg): - print(f" SSH OK: ssh -F lxconfigs/ssh-config {ct.domain}") + sub.print(f"SSH OK: ssh -F lxconfigs/ssh-config {ct.domain}") else: - out.red(f" WARNING: SSH verification failed for {ct.name}") + sub.red(f"WARNING: SSH verification failed for {ct.name}") # Print integration suggestions ssh_cfg = ix.ssh_config_path if not ix.check_ssh_include(): - out.green( - "\n (Optional) To use containers from any SSH client, add to ~/.ssh/config:" + sub.green( + "\n(Optional) To use containers from any SSH client, add to ~/.ssh/config:" ) - out.green(f" Include {ssh_cfg}") + sub.green(f" Include {ssh_cfg}") # Optionally run cmdeploy run on each relay if args.run: for ct in relays: - with _section(out, f"cmdeploy run: {ct.sname} ({ct.domain})"): - ret = _run_cmdeploy("run", ct, ix, extra=["--skip-dns-check"]) + with out.section(f"cmdeploy run: {ct.sname} ({ct.domain})"): + ret = _run_cmdeploy("run", ct, ix, out, extra=["--skip-dns-check"]) if ret: out.red(f"Deploy to {ct.sname} failed (exit {ret})") return ret @@ -142,7 +149,7 @@ def lxc_stop_cmd_options(parser): def lxc_stop_cmd(args, out): """Stop (and optionally destroy) local LXC relay containers.""" - ix = Incus() + ix = Incus(out) names = args.names or RELAY_NAMES destroy = args.destroy or args.destroy_all @@ -186,7 +193,7 @@ def lxc_test_cmd(args, out): All commands run directly on the host using ``--ssh-config lxconfigs/ssh-config`` for SSH access. """ - ix = Incus() + ix = Incus(out) t_total = time.time() relay_names = list(RELAY_NAMES) if args.one: @@ -201,59 +208,64 @@ def lxc_test_cmd(args, out): for ct in map(ix.get_container, relay_names): name = ct.sname ipv4_only = ipv4_only_flags.get(name, False) - label = "IPv4-only" if ipv4_only else "dual-stack" - - with _section(out, f"LXC: lxc-start {name} ({label})"): - args.names = [name] - args.ipv4_only = ipv4_only - args.run = False - ret = lxc_start_cmd(args, out) + v_flag = " -" + "v" * out.verbosity if out.verbosity > 0 else "" + start_cmd = f"cmdeploy{v_flag} lxc-start {name}" + if ipv4_only: + start_cmd += " --ipv4-only" + with out.section(f"cmdeploy lxc-start: {name}"): + ret = out.shell(start_cmd, cwd=str(ix.project_root)) if ret: return ret status = _deploy_status(ct, local_hash, ix) - if "IN-SYNC" in status: - _section_line(out, f"cmdeploy run: {name} — {status}, skipping") - else: - with _section(out, f"cmdeploy run: {name} ({ct.domain})"): - ret = _run_cmdeploy("run", ct, ix, extra=["--skip-dns-check"]) + with out.section(f"cmdeploy run: {name}"): + if "IN-SYNC" in status: + out.print(f"{name} is {status}, skipping") + else: + ret = _run_cmdeploy("run", ct, ix, out, extra=["--skip-dns-check"]) if ret: out.red(f"Deploy to {name} failed (exit {ret})") return ret # Snapshot the first relay so subsequent ones launch pre-deployed if not ix.find_relay_image(): - with _section(out, "LXC: publishing relay image"): + with out.section("lxc-test: caching relay image"): ct.publish_as_relay_image() for ct in map(ix.get_container, relay_names): - with _section(out, f"cmdeploy dns: {ct.sname} ({ct.domain})"): - ret = _run_cmdeploy("dns", ct, ix, extra=["--zonefile", str(ct.zone)]) + with out.section(f"cmdeploy dns: {ct.sname} ({ct.domain})"): + ret = _run_cmdeploy("dns", ct, ix, out, extra=["--zonefile", str(ct.zone)]) if ret: out.red(f"DNS for {ct.sname} failed (exit {ret})") return ret - with _section(out, "LXC: PowerDNS zone update"): + with out.section(f"lxc-test: loading DNS zones {' & '.join(relay_names)}"): dns_ct = ix.get_dns_container() for ct in map(ix.get_container, relay_names): if ct.zone.exists(): zone_data = ct.zone.read_text() - print(f" Loading {ct.zone} into PowerDNS ...") + out.print(f"Loading {ct.zone} into PowerDNS ...") dns_ct.set_dns_records(zone_data) - with _section(out, "cmdeploy test"): + with out.section("cmdeploy test"): first = ix.get_container(relay_names[0]) env = None if len(relay_names) > 1: env = os.environ.copy() env["CHATMAIL_DOMAIN2"] = ix.get_container(relay_names[1]).domain - ret = _run_cmdeploy("test", first, ix, **({"env": env} if env else {})) + ret = _run_cmdeploy("test", first, ix, out, **({"env": env} if env else {})) if ret: out.red(f"Tests failed (exit {ret})") return ret elapsed = time.time() - t_total - _section_line(out, f"lxc-test complete ({elapsed:.1f}s)") + out.section_line(f"lxc-test complete ({elapsed:.1f}s)") + if out.section_timings: + out.print("Section timings:") + for name, secs in out.section_timings: + out.print(f" {name:.<50s} {secs:5.1f}s") + out.print(f" {'total':.<50s} {elapsed:5.1f}s") + out.section_timings.clear() return 0 @@ -268,7 +280,7 @@ def lxc_status_cmd_options(parser): def lxc_status_cmd(args, out): """Show status of local LXC chatmail containers.""" - ix = Incus() + ix = Incus(out) containers = ix.list_managed() if not containers: out.red("No LXC containers found. Run 'cmdeploy lxc-start' first.") @@ -281,23 +293,24 @@ def lxc_status_cmd(args, out): data = ix.run_json(["storage", "show", "default"], check=False) if data: storage_path = data.get("config", {}).get("source") + msg = "Container status" if storage_path: - out.green(f"Containers: ({storage_path})") - else: - out.green("Containers:") + msg += f": {storage_path}" + out.section_line(msg) dns_ip = None for c in containers: - _print_container_status(c, ix, local_hash) + _print_container_status(out, c, ix, local_hash) if c["name"] == ix.get_dns_container().name: dns_ip = c["ip"] + out.section_line("Host ssh and DNS configuration") _print_ssh_status(out, ix) _print_dns_forwarding_status(out, dns_ip) return 0 -def _print_container_status(c, ix, local_hash): +def _print_container_status(out, c, ix, local_hash): """Print name/status, domain/IPs, and RAM for one container.""" cname = c["name"] is_running = c.get("status") == "Running" @@ -310,16 +323,16 @@ def _print_container_status(c, ix, local_hash): tag = "running" else: tag = f"running {_deploy_status(ct, local_hash, ix)}" - print(f" {cname:20s} {tag}") + out.print(f"{cname:20s} {tag}") # Second line: domain, IPv4, IPv6 domain = c.get("domain", "") ip = c.get("ip") or "?" ipv6 = c.get("ipv6") - print(f" {domain:20s} {_format_addrs(ip, ipv6)}") + out.print(f"{domain:20s} {_format_addrs(ip, ipv6)}") # Third line: RAM (RSS), config - indent = " " * 21 + detail_out = out.new_prefixed_out(" " * 21) try: used, total = ct.rss_mib() except Exception: @@ -332,41 +345,42 @@ def _print_container_status(c, ix, local_hash): else: detail = ram_str - print(f" {indent}{detail}") - print() + detail_out.print(detail) + out.print() def _print_ssh_status(out, ix): """Print SSH integration status.""" - print() ssh_cfg = ix.ssh_config_path if ix.check_ssh_include(): out.green("SSH: ~/.ssh/config includes lxconfigs/ssh-config ✓") else: out.red("SSH: ~/.ssh/config does NOT include lxconfigs/ssh-config") - print(" Add to ~/.ssh/config:") - print(f" Include {ssh_cfg}") + sub = out.new_prefixed_out() + sub.print("Add to ~/.ssh/config:") + sub.print(f" Include {ssh_cfg}") def _print_dns_forwarding_status(out, dns_ip): """Print host DNS forwarding status for .localchat.""" + sub = out.new_prefixed_out() if not dns_ip: out.red("DNS: ns-localchat container not found") return try: - rv = shell("resolvectl status incusbr0", timeout=5) + rv = shell("resolvectl status incusbr0") dns_ok = dns_ip in rv.stdout and "localchat" in rv.stdout - except (FileNotFoundError, subprocess.TimeoutExpired, OSError): + except Exception: dns_ok = None if dns_ok is True: out.green(f"DNS: .localchat forwarding to {dns_ip} ✓") elif dns_ok is False: out.red("DNS: .localchat forwarding NOT configured") - print(" Run:") - print(f" sudo resolvectl dns incusbr0 {dns_ip}") - print(" sudo resolvectl domain incusbr0 ~localchat") + sub.print("Run:") + sub.print(f" sudo resolvectl dns incusbr0 {dns_ip}") + sub.print(" sudo resolvectl domain incusbr0 ~localchat") else: - print(" DNS: .localchat forwarding status UNKNOWN") + sub.print("DNS: .localchat forwarding status UNKNOWN") # ------------------------------------------------------------------- @@ -381,26 +395,6 @@ def _format_addrs(ip, ipv6=None): return ", ".join(parts) -SECTION_WIDTH = 72 - - -@contextmanager -def _section(out, title): - bar = "\u2501" * (SECTION_WIDTH - len(title) - 5) - out.green(f"\u2501\u2501\u2501 {title} {bar}") - t0 = time.time() - yield - elapsed = time.time() - t0 - print(f"{'':>{SECTION_WIDTH - 10}}({elapsed:.1f}s)") - print() - - -def _section_line(out, title): - bar = "\u2501" * (SECTION_WIDTH - len(title) - 5) - out.green(f"\u2501\u2501\u2501 {title} {bar}") - print() - - def _deploy_status(ct, local_hash, ix): """Return a human-readable deploy status string. @@ -446,15 +440,16 @@ def _add_name_args(parser, help_text=None): ) -def _run_cmdeploy(subcmd, ct, ix, extra=None, **kwargs): +def _run_cmdeploy(subcmd, ct, ix, out, extra=None, **kwargs): """Run ``cmdeploy `` with standard --config/--ssh flags. *ct* is a Container (uses ``ct.ini`` and ``ct.domain``). Returns the subprocess exit code. """ extra_str = " ".join(extra) if extra else "" + v_flag = " -" + "v" * out.verbosity if out.verbosity > 0 else "" cmd = f"""\ - cmdeploy {subcmd} + cmdeploy{v_flag} {subcmd} --config {ct.ini} --ssh-config {ix.ssh_config_path} --ssh-host {ct.domain} @@ -462,6 +457,4 @@ def _run_cmdeploy(subcmd, ct, ix, extra=None, **kwargs): """ if "cwd" not in kwargs: kwargs["cwd"] = str(ix.project_root) - cmd = collapse(cmd) - print(f" [$ {cmd}]") - return shell(cmd, capture_output=False, **kwargs).returncode + return out.shell(cmd, **kwargs) diff --git a/cmdeploy/src/cmdeploy/lxc/incus.py b/cmdeploy/src/cmdeploy/lxc/incus.py index 391c8b51..61abe0f1 100644 --- a/cmdeploy/src/cmdeploy/lxc/incus.py +++ b/cmdeploy/src/cmdeploy/lxc/incus.py @@ -47,7 +47,8 @@ class Incus: all modules share a single entry point for Incus interactions. """ - def __init__(self): + def __init__(self, out): + self.out = out self.project_root = Path(__file__).resolve().parent.parent.parent.parent.parent self.lxconfigs_dir = self.project_root / "lxconfigs" self.lxconfigs_dir.mkdir(exist_ok=True) @@ -98,15 +99,58 @@ class Incus: return f"Include {self.ssh_config_path}" in lines def run(self, args, check=True, capture=True, input=None): - """Run an incus command.""" + """Run an incus command. + + When *capture* is True and *verbosity* >= 1, output is streamed + to the terminal line-by-line while also being captured for + later return via result.stdout. + """ cmd = ["incus"] + list(args) - kwargs = dict(check=check, text=True, input=input) - if capture: - kwargs["capture_output"] = True - else: - kwargs["stdout"] = None - kwargs["stderr"] = None - return subprocess.run(cmd, **kwargs) # noqa: PLW1510 + sub = self.out.new_prefixed_out(" ") + + if not capture: + # Simple case: let subprocess handle streams (no capture) + if self.out.verbosity >= 1: + sub.print(f"$ {' '.join(cmd)}") + return subprocess.run( + cmd, text=True, input=input, check=check, stdout=None, stderr=None + ) + + # Capture case: we may need to stream while capturing + if sub.verbosity >= 1: + cmd_lines = " ".join(cmd).splitlines() + sub.print(f"$ {cmd_lines.pop(0)}") + if sub.verbosity >= 2: + for line in cmd_lines: + sub.print(f" {line}") + + proc = subprocess.Popen( + cmd, + text=True, + stdin=subprocess.PIPE if input else None, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + + stdout_lines = [] + if input: + proc.stdin.write(input) + proc.stdin.close() + + for line in proc.stdout: + stdout_lines.append(line) + if sub.verbosity >= 2: + sub.print(f" > {line.rstrip()}") + + ret = proc.wait() + stdout = "".join(stdout_lines) + if check and ret != 0: + for line in stdout.splitlines(): + if sub.verbosity < 1: # and we haven't printed it yet + sub.red(line) + raise subprocess.CalledProcessError(ret, cmd, output=stdout) + + return subprocess.CompletedProcess(cmd, ret, stdout=stdout) def run_json(self, args, check=True): """Run an incus command with ``--format=json``. @@ -186,9 +230,10 @@ class Incus: Returns the image alias. """ if self._find_image(BASE_IMAGE_ALIAS): + self.out.print(f" Base image '{BASE_IMAGE_ALIAS}' already cached.") return BASE_IMAGE_ALIAS - print(" Building base image (one-time setup) ...") + self.out.print(" Building base image (one-time setup) ...") self.run(["delete", BASE_SETUP_NAME, "--force"], check=False) self.run(["image", "delete", BASE_IMAGE_ALIAS], check=False) @@ -214,7 +259,7 @@ class Incus: self.run(["stop", BASE_SETUP_NAME]) self.run(["publish", BASE_SETUP_NAME, f"--alias={BASE_IMAGE_ALIAS}"]) self.run(["delete", BASE_SETUP_NAME, "--force"]) - print(f" Base image '{BASE_IMAGE_ALIAS}' ready.") + self.out.print(f" Base image '{BASE_IMAGE_ALIAS}' ready.") return BASE_IMAGE_ALIAS def get_container(self, name): @@ -239,6 +284,7 @@ class Container: def __init__(self, incus, name, domain=None, memory="100MiB"): self.incus = incus + self.out = incus.out self.name = name self.domain = domain or f"{name}{DOMAIN_SUFFIX}" self.memory = memory @@ -251,7 +297,8 @@ class Container: *script* is dedented and stripped so callers can use triple-quoted strings. When *check* is False, returns *None* on non-zero exit instead of raising. """ - cmd = ["exec", self.name, "--", "bash", "-ec", textwrap.dedent(script).strip()] + script = textwrap.dedent(script).strip() + cmd = ["exec", self.name, "--", "bash", "-ec", script] return self.incus.run_output(cmd, check=check) def run_cmd(self, *args, check=True): @@ -276,7 +323,7 @@ class Container: def launch(self): """Launch from the best available image, return the alias used.""" image = self.incus.find_relay_image() or self.incus.ensure_base_image() - print(f" Launching from '{image}' image ...") + self.out.print(f" Launching from '{image}' image ...") cfg = [] cfg += ("-c", f"{LABEL_KEY}=true") cfg += ("-c", f"user.localchat-domain={self.domain}") @@ -407,17 +454,18 @@ class RelayContainer(Container): def publish_as_relay_image(self): """Publish this container as a reusable relay image. - Stops the container, publishes it as 'localchat-relay', - then restarts it. + Stops the container, 'publishes' it as 'localchat-relay', then restarts it. """ if self.incus.find_relay_image(): return - print(f" Publishing {self.name!r} as '{RELAY_IMAGE_ALIAS}' image ...") + self.out.print( + f" Locally caching {self.name!r} as '{RELAY_IMAGE_ALIAS}' image ..." + ) self.incus.run( ["publish", self.name, f"--alias={RELAY_IMAGE_ALIAS}", "--force"] ) self.wait_ready() - print(f" Relay image '{RELAY_IMAGE_ALIAS}' ready.") + self.out.print(f" Relay image '{RELAY_IMAGE_ALIAS}' ready.") def deployed_version(self): """Read /etc/chatmail-version, or None if absent.""" @@ -611,7 +659,7 @@ class DNSContainer(Container): for d in domains: domain = d["domain"] ip = d["ip"] - print(f" {domain} -> {ip}") + self.out.print(f" {domain} -> {ip}") # Delete and recreate zone fresh (removes stale records) self.pdnsutil("delete-zone", domain, check=False) @@ -628,11 +676,11 @@ class DNSContainer(Container): ipv6 = d.get("ipv6") if ipv6: self.replace_rrset(domain, ".", "AAAA", "3600", ipv6) - print(f" zone reset: SOA, NS, A, AAAA ({ip}, {ipv6})") + self.out.print(f" zone reset: SOA, NS, A, AAAA ({ip}, {ipv6})") else: # Remove any stale AAAA record self.pdnsutil("delete-rrset", domain, ".", "AAAA", check=False) - print(f" zone reset: SOA, NS, A ({ip}, IPv4-only)") + self.out.print(f" zone reset: SOA, NS, A ({ip}, IPv4-only)") self.restart_services() diff --git a/cmdeploy/src/cmdeploy/tests/test_lxc.py b/cmdeploy/src/cmdeploy/tests/test_lxc.py index cf156b61..c9f40249 100644 --- a/cmdeploy/src/cmdeploy/tests/test_lxc.py +++ b/cmdeploy/src/cmdeploy/tests/test_lxc.py @@ -8,6 +8,7 @@ import pytest from cmdeploy.lxc import cli from cmdeploy.lxc.incus import Incus +from cmdeploy.util import Out pytestmark = pytest.mark.skipif( not shutil.which("incus"), @@ -22,12 +23,14 @@ pytestmark = pytest.mark.skipif( @pytest.fixture def ix(): - return Incus() + out = Out() + return Incus(out) @pytest.fixture(scope="session") def lxc_setup(): - ix = Incus() + out = Out() + ix = Incus(out) ix.get_dns_container().ensure() return ix.list_managed() @@ -126,7 +129,7 @@ class TestLxcStatus: assert "status" in result.stdout.lower() def test_shows_containers(self, lxc_setup, capsys): - class QuietOut: + class QuietOut(Out): def red(self, msg, **kw): pass diff --git a/cmdeploy/src/cmdeploy/tests/test_util.py b/cmdeploy/src/cmdeploy/tests/test_util.py index e2fa45d9..b1f22ef2 100644 --- a/cmdeploy/src/cmdeploy/tests/test_util.py +++ b/cmdeploy/src/cmdeploy/tests/test_util.py @@ -1,4 +1,71 @@ -from cmdeploy.util import collapse, get_git_hash, get_version_string, shell +import sys + +from cmdeploy.util import Out, collapse, get_git_hash, get_version_string, shell + + +class TestOut: + def test_prefix_default(self, capsys): + out = Out() + out.print("hello") + assert capsys.readouterr().out == "hello\n" + + def test_prefix_custom(self, capsys): + out = Out(prefix=">> ") + out.print("hello") + assert capsys.readouterr().out == ">> hello\n" + + def test_prefix_print_file(self): + import io + + buf = io.StringIO() + out = Out(prefix=":: ") + out.print("msg", file=buf) + assert ":: msg" in buf.getvalue() + + def test_new_prefixed_out(self, capsys): + parent = Out(prefix="A") + child = parent.new_prefixed_out("B") + child.print("x") + assert capsys.readouterr().out == "ABx\n" + # shares section_timings + assert child.section_timings is parent.section_timings + + def test_section_no_auto_indent(self, capsys): + out = Out(prefix="") + with out.section("test"): + out.print("inside") + captured = capsys.readouterr().out + # "inside" should NOT be indented by section() + lines = captured.strip().splitlines() + inside_line = [l for l in lines if "inside" in l][0] + assert inside_line == "inside" + + def test_section_records_timing(self): + out = Out() + with out.section("s1"): + pass + assert len(out.section_timings) == 1 + assert out.section_timings[0][0] == "s1" + + def test_shell_failure_shows_output(self): + """When a shell command fails, its output and exit code are shown.""" + import subprocess + + result = subprocess.run( + [ + sys.executable, + "-c", + "from cmdeploy.util import Out; Out(prefix='').shell(" + "\"echo 'boom on stderr' >&2; exit 42\")", + ], + capture_output=True, + text=True, + check=False, + ) + # the command's stderr is merged into stdout by Popen + assert "boom on stderr" in result.stdout + # Out.red() prints the failure notice to stderr + assert "exit code 42" in result.stderr def test_collapse(): diff --git a/cmdeploy/src/cmdeploy/util.py b/cmdeploy/src/cmdeploy/util.py index ed70236d..3832202b 100644 --- a/cmdeploy/src/cmdeploy/util.py +++ b/cmdeploy/src/cmdeploy/util.py @@ -1,9 +1,108 @@ """Shared utility functions for cmdeploy.""" +import os +import shutil import subprocess +import sys import textwrap +import time +from contextlib import contextmanager from pathlib import Path +from termcolor import colored + + +class Out: + """Convenience output printer providing coloring and section formatting.""" + + def __init__(self, sepchar="\u2501", prefix="", verbosity=0): + self.section_timings = [] + self.prefix = prefix + self.sepchar = sepchar + self.verbosity = verbosity + env_width = os.environ.get("_CMDEPLOY_WIDTH") + if env_width: + self.section_width = int(env_width) + else: + self.section_width = shutil.get_terminal_size((80, 24)).columns + + def new_prefixed_out(self, newprefix=" "): + """Return a new Out with an extended prefix, + sharing section_timings with the parent. + """ + out = Out( + sepchar=self.sepchar, + prefix=self.prefix + newprefix, + verbosity=self.verbosity, + ) + out.section_timings = self.section_timings + return out + + def red(self, msg, file=sys.stderr): + print(colored(self.prefix + msg, "red"), file=file, flush=True) + + def green(self, msg, file=sys.stderr): + print(colored(self.prefix + msg, "green"), file=file, flush=True) + + def print(self, msg="", **kwargs): + """Print to stdout with automatic flush.""" + if msg: + msg = self.prefix + msg + print(msg, flush=True, **kwargs) + + def _format_header(self, title): + """Return a formatted section header string.""" + width = self.section_width - len(self.prefix) + bar = self.sepchar * (width - len(title) - 5) + return f"{self.sepchar * 3} {title} {bar}" + + @contextmanager + def section(self, title): + """Context manager that prints a section header and records elapsed time.""" + self.green(self._format_header(title)) + t0 = time.time() + yield + elapsed = time.time() - t0 + self.section_timings.append((title, elapsed)) + + def section_line(self, title): + """Print a section header without timing.""" + self.green(self._format_header(title)) + + def shell(self, cmd, quiet=False, **kwargs): + """Print *cmd*, run it, and re-print its output with the current prefix. + + *cmd* is passed through :func:`collapse`, so callers + can use triple-quoted f-strings freely. + Stdout and stderr are merged, read line-by-line, + and each line is printed with ``self.prefix`` prepended. + When the command exits non-zero, a red error line is printed. + """ + cmd = collapse(cmd) + if not quiet: + self.print(f"$ {cmd}") + indent = self.prefix + " " + env = kwargs.pop("env", None) + if env is None: + env = os.environ.copy() + env["_CMDEPLOY_WIDTH"] = str(self.section_width - len(indent)) + proc = subprocess.Popen( + cmd, + shell=True, + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + **kwargs, + ) + for line in proc.stdout: + sys.stdout.write(indent + line) + sys.stdout.flush() + ret = proc.wait() + if ret: + self.red(f"command failed with exit code {ret}: {cmd}") + return ret + def _project_root(): """Return the project root directory."""