mirror of
https://github.com/chatmail/relay.git
synced 2026-05-10 16:04:37 +00:00
refactor(cmdeploy): replace globals() subcommand scan with explicit SUBCOMMANDS list
The get_parser() loop that scanned globals() for *_cmd names was fragile and forced # noqa: F401 on all lxc imports (ruff couldn't see they were used dynamically). Replace it with an explicit SUBCOMMANDS list of (cmd_func, options_func, needs_config) tuples. This makes the full set of subcommands visible at a glance, their registration order defined, and the imports unconditionally used (no more noqa suppressions). Also add --ssh-config option to run/dns/status/test so all SSH resolution can go through a config file, used by lxc-test for completely local setups.
This commit is contained in:
@@ -15,10 +15,27 @@ from pathlib import Path
|
||||
import pyinfra
|
||||
from chatmaild.config import read_config, write_initial_config
|
||||
from packaging import version
|
||||
from termcolor import colored
|
||||
|
||||
from . import dns, remote
|
||||
from .sshexec import LocalExec, SSHExec
|
||||
from .lxc.cli import (
|
||||
lxc_start_cmd,
|
||||
lxc_start_cmd_options,
|
||||
lxc_status_cmd,
|
||||
lxc_status_cmd_options,
|
||||
lxc_stop_cmd,
|
||||
lxc_stop_cmd_options,
|
||||
lxc_test_cmd,
|
||||
lxc_test_cmd_options,
|
||||
)
|
||||
from .lxc.incus import DNSConfigurationError
|
||||
from .sshexec import (
|
||||
LocalExec,
|
||||
SSHExec,
|
||||
resolve_host_from_ssh_config,
|
||||
resolve_key_from_ssh_config,
|
||||
)
|
||||
from .util import Out
|
||||
from .www import main as webdev_main
|
||||
|
||||
#
|
||||
# cmdeploy sub commands and options
|
||||
@@ -82,18 +99,21 @@ def run_cmd_options(parser):
|
||||
help="disable checks nslookup for dns",
|
||||
)
|
||||
add_ssh_host_option(parser)
|
||||
add_ssh_config_option(parser)
|
||||
|
||||
|
||||
def run_cmd(args, out):
|
||||
"""Deploy chatmail services on the remote server."""
|
||||
|
||||
ssh_host = args.ssh_host if args.ssh_host else args.config.mail_domain
|
||||
sshexec = get_sshexec(ssh_host)
|
||||
sshexec = get_sshexec(ssh_host, ssh_config=args.ssh_config)
|
||||
require_iroh = args.config.enable_iroh_relay
|
||||
strict_tls = args.config.tls_cert_mode == "acme"
|
||||
if not args.dns_check_disabled:
|
||||
remote_data = dns.get_initial_remote_data(sshexec, args.config.mail_domain)
|
||||
if not dns.check_initial_remote_data(remote_data, strict_tls=strict_tls, print=out.red):
|
||||
if not dns.check_initial_remote_data(
|
||||
remote_data, strict_tls=strict_tls, print=out.red
|
||||
):
|
||||
return 1
|
||||
|
||||
env = os.environ.copy()
|
||||
@@ -104,10 +124,24 @@ def run_cmd(args, out):
|
||||
if not args.dns_check_disabled:
|
||||
env["CHATMAIL_ADDR_V4"] = remote_data.get("A") or ""
|
||||
env["CHATMAIL_ADDR_V6"] = remote_data.get("AAAA") or ""
|
||||
env["DEBIAN_FRONTEND"] = "noninteractive"
|
||||
env["TERM"] = "linux"
|
||||
deploy_path = importlib.resources.files(__package__).joinpath("run.py").resolve()
|
||||
pyinf = "pyinfra --dry" if args.dry_run else "pyinfra"
|
||||
|
||||
cmd = f"{pyinf} --ssh-user root {ssh_host} {deploy_path} -y"
|
||||
ssh_config = args.ssh_config
|
||||
if ssh_config:
|
||||
ssh_config = str(Path(ssh_config).resolve())
|
||||
|
||||
# Use pyinfra's native SSH data keys to configure the connection directly
|
||||
# rather than relying on paramiko config parsing (see also sshexec.py)
|
||||
ip = resolve_host_from_ssh_config(ssh_host, ssh_config)
|
||||
key = resolve_key_from_ssh_config(ssh_host, ssh_config)
|
||||
data_args = f"--data ssh_hostname={ip} --data ssh_known_hosts_file=/dev/null"
|
||||
if key:
|
||||
data_args += f" --data ssh_key={key}"
|
||||
cmd = f"{pyinf} --ssh-user root {ssh_host} {deploy_path} -y {data_args}"
|
||||
if ssh_host in ["localhost", "@docker"]:
|
||||
if ssh_host == "@docker":
|
||||
env["CHATMAIL_NOPORTCHECK"] = "True"
|
||||
@@ -119,10 +153,17 @@ def run_cmd(args, out):
|
||||
return 1
|
||||
|
||||
try:
|
||||
out.check_call(cmd, env=env)
|
||||
ret = out.shell(cmd, env=env)
|
||||
if ret:
|
||||
out.red("Deploy failed")
|
||||
return 1
|
||||
if args.website_only:
|
||||
out.green("Website deployment completed.")
|
||||
elif not args.dns_check_disabled and strict_tls and not remote_data["acme_account_url"]:
|
||||
elif (
|
||||
not args.dns_check_disabled
|
||||
and strict_tls
|
||||
and not remote_data["acme_account_url"]
|
||||
):
|
||||
out.red("Deploy completed but letsencrypt not configured")
|
||||
out.red("Run 'cmdeploy run' again")
|
||||
else:
|
||||
@@ -139,15 +180,16 @@ def dns_cmd_options(parser):
|
||||
dest="zonefile",
|
||||
type=pathlib.Path,
|
||||
default=None,
|
||||
help="write out a zonefile",
|
||||
help="write DNS records in standard BIND format to the given file",
|
||||
)
|
||||
add_ssh_host_option(parser)
|
||||
add_ssh_config_option(parser)
|
||||
|
||||
|
||||
def dns_cmd(args, out):
|
||||
"""Check DNS entries and optionally generate dns zone file."""
|
||||
ssh_host = args.ssh_host if args.ssh_host else args.config.mail_domain
|
||||
sshexec = get_sshexec(ssh_host, verbose=args.verbose)
|
||||
sshexec = get_sshexec(ssh_host, verbose=args.verbose, ssh_config=args.ssh_config)
|
||||
tls_cert_mode = args.config.tls_cert_mode
|
||||
strict_tls = tls_cert_mode == "acme"
|
||||
remote_data = dns.get_initial_remote_data(sshexec, args.config.mail_domain)
|
||||
@@ -178,13 +220,14 @@ def dns_cmd(args, out):
|
||||
|
||||
def status_cmd_options(parser):
|
||||
add_ssh_host_option(parser)
|
||||
add_ssh_config_option(parser)
|
||||
|
||||
|
||||
def status_cmd(args, out):
|
||||
"""Display status for online chatmail instance."""
|
||||
|
||||
ssh_host = args.ssh_host if args.ssh_host else args.config.mail_domain
|
||||
sshexec = get_sshexec(ssh_host, verbose=args.verbose)
|
||||
sshexec = get_sshexec(ssh_host, verbose=args.verbose, ssh_config=args.ssh_config)
|
||||
|
||||
out.green(f"chatmail domain: {args.config.mail_domain}")
|
||||
if args.config.privacy_mail:
|
||||
@@ -204,14 +247,14 @@ def test_cmd_options(parser):
|
||||
help="also run slow tests",
|
||||
)
|
||||
add_ssh_host_option(parser)
|
||||
add_ssh_config_option(parser)
|
||||
|
||||
|
||||
def test_cmd(args, out):
|
||||
"""Run local and online tests for chatmail deployment."""
|
||||
|
||||
env = os.environ.copy()
|
||||
if args.ssh_host:
|
||||
env["CHATMAIL_SSH"] = args.ssh_host
|
||||
env["CHATMAIL_INI"] = str(args.inipath.resolve())
|
||||
|
||||
pytest_path = shutil.which("pytest")
|
||||
pytest_args = [
|
||||
@@ -225,7 +268,11 @@ def test_cmd(args, out):
|
||||
]
|
||||
if args.slow:
|
||||
pytest_args.append("--slow")
|
||||
ret = out.run_ret(pytest_args, env=env)
|
||||
if args.ssh_host:
|
||||
pytest_args.extend(["--ssh-host", args.ssh_host])
|
||||
if args.ssh_config:
|
||||
pytest_args.extend(["--ssh-config", str(Path(args.ssh_config).resolve())])
|
||||
ret = out.shell(" ".join(pytest_args), env=env)
|
||||
return ret
|
||||
|
||||
|
||||
@@ -262,8 +309,8 @@ def fmt_cmd(args, out):
|
||||
format_args.extend(sources)
|
||||
check_args.extend(sources)
|
||||
|
||||
out.check_call(" ".join(format_args), quiet=not args.verbose)
|
||||
out.check_call(" ".join(check_args), quiet=not args.verbose)
|
||||
out.shell(" ".join(format_args), quiet=not args.verbose)
|
||||
out.shell(" ".join(check_args), quiet=not args.verbose)
|
||||
|
||||
|
||||
def bench_cmd(args, out):
|
||||
@@ -276,9 +323,7 @@ def bench_cmd(args, out):
|
||||
|
||||
def webdev_cmd(args, out):
|
||||
"""Run local web development loop for static web pages."""
|
||||
from .www import main
|
||||
|
||||
main()
|
||||
webdev_main()
|
||||
|
||||
|
||||
#
|
||||
@@ -286,32 +331,6 @@ def webdev_cmd(args, out):
|
||||
#
|
||||
|
||||
|
||||
class Out:
|
||||
"""Convenience output printer providing coloring."""
|
||||
|
||||
def red(self, msg, file=sys.stderr):
|
||||
print(colored(msg, "red"), file=file)
|
||||
|
||||
def green(self, msg, file=sys.stderr):
|
||||
print(colored(msg, "green"), file=file)
|
||||
|
||||
def __call__(self, msg, red=False, green=False, file=sys.stdout):
|
||||
color = "red" if red else ("green" if green else None)
|
||||
print(colored(msg, color), file=file)
|
||||
|
||||
def check_call(self, arg, env=None, quiet=False):
|
||||
if not quiet:
|
||||
self(f"[$ {arg}]", file=sys.stderr)
|
||||
return subprocess.check_call(arg, shell=True, env=env)
|
||||
|
||||
def run_ret(self, args, env=None, quiet=False):
|
||||
if not quiet:
|
||||
cmdstring = " ".join(args)
|
||||
self(f"[$ {cmdstring}]", file=sys.stderr)
|
||||
proc = subprocess.run(args, env=env, check=False)
|
||||
return proc.returncode
|
||||
|
||||
|
||||
def add_ssh_host_option(parser):
|
||||
parser.add_argument(
|
||||
"--ssh-host",
|
||||
@@ -321,6 +340,16 @@ def add_ssh_host_option(parser):
|
||||
)
|
||||
|
||||
|
||||
def add_ssh_config_option(parser):
|
||||
parser.add_argument(
|
||||
"--ssh-config",
|
||||
dest="ssh_config",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Path to an SSH config file (e.g. lxconfigs/ssh-config).",
|
||||
)
|
||||
|
||||
|
||||
def add_config_option(parser):
|
||||
parser.add_argument(
|
||||
"--config",
|
||||
@@ -330,25 +359,26 @@ def add_config_option(parser):
|
||||
type=Path,
|
||||
help="path to the chatmail.ini file",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
"-v",
|
||||
dest="verbose",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="provide verbose logging",
|
||||
)
|
||||
|
||||
|
||||
def add_subcommand(subparsers, func):
|
||||
def add_subcommand(subparsers, func, add_config=True):
|
||||
name = func.__name__
|
||||
assert name.endswith("_cmd")
|
||||
name = name[:-4]
|
||||
name = name[:-4].replace("_", "-")
|
||||
doc = func.__doc__.strip()
|
||||
help = doc.split("\n")[0].strip(".")
|
||||
p = subparsers.add_parser(name, description=doc, help=help)
|
||||
p.set_defaults(func=func)
|
||||
add_config_option(p)
|
||||
if add_config:
|
||||
add_config_option(p)
|
||||
p.add_argument(
|
||||
"-v",
|
||||
"--verbose",
|
||||
dest="verbose",
|
||||
action="count",
|
||||
default=0,
|
||||
help="increase verbosity (can be repeated: -v, -vv)",
|
||||
)
|
||||
return p
|
||||
|
||||
|
||||
@@ -357,45 +387,60 @@ Setup your chatmail server configuration and
|
||||
deploy it via SSH to your remote location.
|
||||
"""
|
||||
|
||||
# Explicit subcommand registry: (cmd_func, options_func_or_None, needs_config).
|
||||
# LXC commands don't need a chatmail.ini (no config); all others do.
|
||||
SUBCOMMANDS = [
|
||||
(init_cmd, init_cmd_options, True),
|
||||
(run_cmd, run_cmd_options, True),
|
||||
(dns_cmd, dns_cmd_options, True),
|
||||
(status_cmd, status_cmd_options, True),
|
||||
(test_cmd, test_cmd_options, True),
|
||||
(fmt_cmd, fmt_cmd_options, True),
|
||||
(bench_cmd, None, True),
|
||||
(webdev_cmd, None, True),
|
||||
(lxc_start_cmd, lxc_start_cmd_options, False),
|
||||
(lxc_stop_cmd, lxc_stop_cmd_options, False),
|
||||
(lxc_status_cmd, lxc_status_cmd_options, False),
|
||||
(lxc_test_cmd, lxc_test_cmd_options, False),
|
||||
]
|
||||
|
||||
|
||||
def get_parser():
|
||||
"""Return an ArgumentParser for the 'cmdeploy' CLI"""
|
||||
|
||||
parser = argparse.ArgumentParser(description=description.strip())
|
||||
parser.set_defaults(func=None, inipath=None)
|
||||
subparsers = parser.add_subparsers(title="subcommands")
|
||||
|
||||
# find all subcommands in the module namespace
|
||||
glob = globals()
|
||||
for name, func in glob.items():
|
||||
if name.endswith("_cmd"):
|
||||
subparser = add_subcommand(subparsers, func)
|
||||
addopts = glob.get(name + "_options")
|
||||
if addopts is not None:
|
||||
addopts(subparser)
|
||||
for func, addopts, needs_config in SUBCOMMANDS:
|
||||
subparser = add_subcommand(subparsers, func, add_config=needs_config)
|
||||
if addopts is not None:
|
||||
addopts(subparser)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def get_sshexec(ssh_host: str, verbose=True):
|
||||
def get_sshexec(ssh_host: str, verbose=True, ssh_config=None):
|
||||
if ssh_host in ["localhost", "@local"]:
|
||||
return LocalExec(verbose, docker=False)
|
||||
elif ssh_host == "@docker":
|
||||
return LocalExec(verbose, docker=True)
|
||||
if verbose:
|
||||
print(f"[ssh] login to {ssh_host}")
|
||||
return SSHExec(ssh_host, verbose=verbose)
|
||||
return SSHExec(ssh_host, verbose=verbose, ssh_config=ssh_config)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
"""Provide main entry point for 'cmdeploy' CLI invocation."""
|
||||
parser = get_parser()
|
||||
args = parser.parse_args(args=args)
|
||||
if not hasattr(args, "func"):
|
||||
if args.func is None:
|
||||
return parser.parse_args(["-h"])
|
||||
|
||||
out = Out()
|
||||
out = Out(verbosity=args.verbose)
|
||||
kwargs = {}
|
||||
if args.func.__name__ not in ("init_cmd", "fmt_cmd"):
|
||||
|
||||
if args.inipath is not None and args.func.__name__ not in ("init_cmd", "fmt_cmd"):
|
||||
if not args.inipath.exists():
|
||||
out.red(f"expecting {args.inipath} to exist, run init first?")
|
||||
raise SystemExit(1)
|
||||
@@ -410,6 +455,9 @@ def main(args=None):
|
||||
if res is None:
|
||||
res = 0
|
||||
return res
|
||||
except DNSConfigurationError as exc:
|
||||
out.red(str(exc))
|
||||
return 1
|
||||
except KeyboardInterrupt:
|
||||
out.red("KeyboardInterrupt")
|
||||
sys.exit(130)
|
||||
|
||||
@@ -23,7 +23,10 @@ class TestCmdline:
|
||||
run = parser.parse_args(["run"])
|
||||
assert init and run
|
||||
|
||||
def test_init_not_overwrite(self, capsys):
|
||||
def test_init_not_overwrite(self, tmp_path, capsys, monkeypatch):
|
||||
monkeypatch.delenv("CHATMAIL_INI", raising=False)
|
||||
monkeypatch.chdir(tmp_path)
|
||||
|
||||
assert main(["init", "chat.example.org"]) == 0
|
||||
capsys.readouterr()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user