diff --git a/chatmaild/src/chatmaild/tests/plugin.py b/chatmaild/src/chatmaild/tests/plugin.py index b57418a3..165de3e0 100644 --- a/chatmaild/src/chatmaild/tests/plugin.py +++ b/chatmaild/src/chatmaild/tests/plugin.py @@ -85,13 +85,13 @@ def mockout(): captured_green = [] captured_plain = [] - def red(self, msg): + def red(self, msg, **kw): self.captured_red.append(msg) - def green(self, msg): + def green(self, msg, **kw): self.captured_green.append(msg) - def __call__(self, msg): + def print(self, msg="", **kw): self.captured_plain.append(msg) return MockOut() diff --git a/cmdeploy/src/cmdeploy/tests/test_util.py b/cmdeploy/src/cmdeploy/tests/test_util.py new file mode 100644 index 00000000..b1f22ef2 --- /dev/null +++ b/cmdeploy/src/cmdeploy/tests/test_util.py @@ -0,0 +1,120 @@ +import sys + +from cmdeploy.util import Out, collapse, get_git_hash, get_version_string, shell + + +class TestOut: + def test_prefix_default(self, capsys): + out = Out() + out.print("hello") + assert capsys.readouterr().out == "hello\n" + + def test_prefix_custom(self, capsys): + out = Out(prefix=">> ") + out.print("hello") + assert capsys.readouterr().out == ">> hello\n" + + def test_prefix_print_file(self): + import io + + buf = io.StringIO() + out = Out(prefix=":: ") + out.print("msg", file=buf) + assert ":: msg" in buf.getvalue() + + def test_new_prefixed_out(self, capsys): + parent = Out(prefix="A") + child = parent.new_prefixed_out("B") + child.print("x") + assert capsys.readouterr().out == "ABx\n" + # shares section_timings + assert child.section_timings is parent.section_timings + + def test_section_no_auto_indent(self, capsys): + out = Out(prefix="") + with out.section("test"): + out.print("inside") + captured = capsys.readouterr().out + # "inside" should NOT be indented by section() + lines = captured.strip().splitlines() + inside_line = [l for l in lines if "inside" in l][0] + assert inside_line == "inside" + + def test_section_records_timing(self): + out = Out() + with out.section("s1"): + pass + assert len(out.section_timings) == 1 + assert out.section_timings[0][0] == "s1" + + def test_shell_failure_shows_output(self): + """When a shell command fails, its output and exit code are shown.""" + import subprocess + + result = subprocess.run( + [ + sys.executable, + "-c", + "from cmdeploy.util import Out; Out(prefix='').shell(" + "\"echo 'boom on stderr' >&2; exit 42\")", + ], + capture_output=True, + text=True, + check=False, + ) + # the command's stderr is merged into stdout by Popen + assert "boom on stderr" in result.stdout + # Out.red() prints the failure notice to stderr + assert "exit code 42" in result.stderr + + +def test_collapse(): + text = """ + line 1 + line 2 + """ + assert collapse(text) == "line 1 line 2" + assert collapse(" single line ") == "single line" + + +def test_git_helpers_no_git(tmp_path): + # Not a git repo + assert get_git_hash(root=tmp_path) is None + assert get_version_string(root=tmp_path) == "unknown" + + +def test_git_helpers_empty_repo(tmp_path): + shell("git init", cwd=tmp_path, check=True) + # No commits yet + assert get_git_hash(root=tmp_path) is None + assert get_version_string(root=tmp_path) == "unknown" + + +def test_git_helpers_with_commits_and_diffs(tmp_path): + shell("git init", cwd=tmp_path, check=True) + shell("git config user.email you@example.com", cwd=tmp_path, check=True) + shell("git config user.name 'Your Name'", cwd=tmp_path, check=True) + + # First commit + path = tmp_path / "file.txt" + path.write_text("content") + shell("git add file.txt", cwd=tmp_path, check=True) + shell("git commit -m initial", cwd=tmp_path, check=True) + + git_hash = get_git_hash(root=tmp_path) + assert len(git_hash) >= 7 # usually 40, but git is git + assert get_version_string(root=tmp_path) == git_hash + + # Create a diff + path.write_text("new content") + v = get_version_string(root=tmp_path) + assert v.startswith(git_hash + "\n") + assert "new content" in v + assert not v.endswith("\n") + + # Commit again -> no diff + shell("git add file.txt", cwd=tmp_path, check=True) + shell("git commit -m second", cwd=tmp_path, check=True) + new_hash = get_git_hash(root=tmp_path) + assert new_hash != git_hash + assert get_version_string(root=tmp_path) == new_hash diff --git a/cmdeploy/src/cmdeploy/util.py b/cmdeploy/src/cmdeploy/util.py new file mode 100644 index 00000000..4a5178b7 --- /dev/null +++ b/cmdeploy/src/cmdeploy/util.py @@ -0,0 +1,169 @@ +"""Shared utility functions for cmdeploy.""" + +import os +import shutil +import subprocess +import sys +import textwrap +import time +from contextlib import contextmanager +from pathlib import Path + +from termcolor import colored + + +class Out: + """Convenience output printer providing coloring and section formatting.""" + + def __init__(self, prefix="", verbosity=0): + self.section_timings = [] + self.prefix = prefix + self.sepchar = "\u2501" + self.verbosity = verbosity + env_width = os.environ.get("_CMDEPLOY_WIDTH") + if env_width: + self.section_width = int(env_width) + else: + self.section_width = shutil.get_terminal_size((80, 24)).columns + + def new_prefixed_out(self, newprefix=" "): + """Return a new Out with an extended prefix, + sharing section_timings with the parent. + """ + out = Out( + prefix=self.prefix + newprefix, + verbosity=self.verbosity, + ) + out.section_timings = self.section_timings + return out + + def red(self, msg, file=sys.stderr): + print(colored(self.prefix + msg, "red"), file=file, flush=True) + + def green(self, msg, file=sys.stderr): + print(colored(self.prefix + msg, "green"), file=file, flush=True) + + def print(self, msg="", **kwargs): + """Print to stdout with automatic flush.""" + if msg: + msg = self.prefix + msg + print(msg, flush=True, **kwargs) + + def _format_header(self, title): + """Return a formatted section header string.""" + width = self.section_width - len(self.prefix) + bar = self.sepchar * (width - len(title) - 5) + return f"{self.sepchar * 3} {title} {bar}" + + @contextmanager + def section(self, title): + """Context manager that prints a section header and records elapsed time.""" + self.green(self._format_header(title)) + t0 = time.time() + yield + elapsed = time.time() - t0 + self.section_timings.append((title, elapsed)) + + def section_line(self, title): + """Print a section header without timing.""" + self.green(self._format_header(title)) + + def shell(self, cmd, quiet=False, **kwargs): + """Print *cmd*, run it, and re-print its output with the current prefix. + + *cmd* is passed through :func:`collapse`, so callers + can use triple-quoted f-strings freely. + Stdout and stderr are merged, read line-by-line, + and each line is printed with ``self.prefix`` prepended. + When the command exits non-zero, a red error line is printed. + """ + cmd = collapse(cmd) + if not quiet: + self.print(f"$ {cmd}") + indent = self.prefix + " " + env = kwargs.pop("env", None) + if env is None: + env = os.environ.copy() + env["_CMDEPLOY_WIDTH"] = str(self.section_width - len(indent)) + proc = subprocess.Popen( + cmd, + shell=True, + text=True, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + **kwargs, + ) + for line in proc.stdout: + sys.stdout.write(indent + line) + sys.stdout.flush() + ret = proc.wait() + if ret: + self.red(f"command failed with exit code {ret}: {cmd}") + return ret + + +def _project_root(): + """Return the project root directory.""" + return Path(__file__).resolve().parent.parent.parent.parent + + +def collapse(text): + """Dedent, join lines, and strip a (triple-quoted) string. + + Handy for writing shell commands across multiple lines:: + + cmd = collapse(f\""" + cmdeploy run + --config {ct.ini} + --ssh-host {ct.domain} + \""") + """ + return textwrap.dedent(text).replace("\n", " ").strip() + + +def shell(cmd, check=False, **kwargs): + """Run a shell command string with sensible defaults. + + *cmd* is passed through :func:`collapse` first, so callers + can use triple-quoted f-strings freely. + Captures stdout/stderr by default; pass ``capture_output=False`` + to stream output to the terminal instead. + """ + if "capture_output" not in kwargs and "stdout" not in kwargs: + kwargs["capture_output"] = True + kwargs.setdefault("stdin", subprocess.DEVNULL) + return subprocess.run(collapse(cmd), shell=True, text=True, check=check, **kwargs) + + +def get_git_hash(root=None): + """Return the local HEAD commit hash, or None.""" + if root is None: + root = _project_root() + result = shell( + "git rev-parse HEAD", + cwd=str(root), + ) + if result.returncode == 0: + return result.stdout.strip() + return None + + +def get_version_string(root=None): + """Return ``git_hash\\ngit_diff`` for the local working tree. + + Used by :class:`~cmdeploy.deployers.GithashDeployer` to write + ``/etc/chatmail-version`` and by ``lxc-status`` to compare + the deployed state against the local checkout. + """ + if root is None: + root = _project_root() + git_hash = get_git_hash(root=root) or "unknown" + try: + git_diff = shell("git diff", cwd=str(root)).stdout.strip() + except Exception: + git_diff = "" + if git_diff: + return f"{git_hash}\n{git_diff}" + return git_hash