mirror of
https://github.com/chatmail/relay.git
synced 2026-05-10 16:04:37 +00:00
Compare commits
4 Commits
hetzner-po
...
hpk/cidebu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa1891fc54 | ||
|
|
37e02445ce | ||
|
|
2e5a1a3a67 | ||
|
|
be5b25b0ab |
@@ -1,15 +0,0 @@
|
|||||||
{chatmail_domain}. A {ipv4}
|
|
||||||
{chatmail_domain}. AAAA {ipv6}
|
|
||||||
{chatmail_domain}. MX 10 {chatmail_domain}.
|
|
||||||
_submission._tcp.{chatmail_domain}. SRV 0 1 587 {chatmail_domain}.
|
|
||||||
_submissions._tcp.{chatmail_domain}. SRV 0 1 465 {chatmail_domain}.
|
|
||||||
_imap._tcp.{chatmail_domain}. SRV 0 1 143 {chatmail_domain}.
|
|
||||||
_imaps._tcp.{chatmail_domain}. SRV 0 1 993 {chatmail_domain}.
|
|
||||||
{chatmail_domain}. CAA 128 issue "letsencrypt.org;accounturi={acme_account_url}"
|
|
||||||
{chatmail_domain}. TXT "v=spf1 a:{chatmail_domain} ~all"
|
|
||||||
_dmarc.{chatmail_domain}. TXT "v=DMARC1;p=reject;adkim=s;aspf=s"
|
|
||||||
_mta-sts.{chatmail_domain}. TXT "v=STSv1; id={sts_id}"
|
|
||||||
mta-sts.{chatmail_domain}. CNAME {chatmail_domain}.
|
|
||||||
www.{chatmail_domain}. CNAME {chatmail_domain}.
|
|
||||||
{dkim_entry}
|
|
||||||
_adsp._domainkey.{chatmail_domain}. TXT "dkim=discardable"
|
|
||||||
21
cmdeploy/src/cmdeploy/chatmail.zone.j2
Normal file
21
cmdeploy/src/cmdeploy/chatmail.zone.j2
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
{% if ipv4 %}
|
||||||
|
{{ chatmail_domain }}. A {{ ipv4 }}
|
||||||
|
{% endif %}
|
||||||
|
{% if ipv6 %}
|
||||||
|
{{ chatmail_domain }}. AAAA {{ ipv6 }}
|
||||||
|
{% endif %}
|
||||||
|
{{ chatmail_domain }}. MX 10 {{ chatmail_domain }}.
|
||||||
|
_submission._tcp.{{ chatmail_domain }}. SRV 0 1 587 {{ chatmail_domain }}.
|
||||||
|
_submissions._tcp.{{ chatmail_domain }}. SRV 0 1 465 {{ chatmail_domain }}.
|
||||||
|
_imap._tcp.{{ chatmail_domain }}. SRV 0 1 143 {{ chatmail_domain }}.
|
||||||
|
_imaps._tcp.{{ chatmail_domain }}. SRV 0 1 993 {{ chatmail_domain }}.
|
||||||
|
{% if acme_account_url %}
|
||||||
|
{{ chatmail_domain }}. CAA 128 issue "letsencrypt.org;accounturi={{ acme_account_url }}"
|
||||||
|
{% endif %}
|
||||||
|
{{ chatmail_domain }}. TXT "v=spf1 a:{{ chatmail_domain }} ~all"
|
||||||
|
_dmarc.{{ chatmail_domain }}. TXT "v=DMARC1;p=reject;adkim=s;aspf=s"
|
||||||
|
_mta-sts.{{ chatmail_domain }}. TXT "v=STSv1; id={{ sts_id }}"
|
||||||
|
mta-sts.{{ chatmail_domain }}. CNAME {{ chatmail_domain }}.
|
||||||
|
www.{{ chatmail_domain }}. CNAME {{ chatmail_domain }}.
|
||||||
|
{{ dkim_entry }}
|
||||||
|
_adsp._domainkey.{{ chatmail_domain }}. TXT "dkim=discardable"
|
||||||
@@ -15,8 +15,7 @@ from pathlib import Path
|
|||||||
from chatmaild.config import read_config, write_initial_config
|
from chatmaild.config import read_config, write_initial_config
|
||||||
from termcolor import colored
|
from termcolor import colored
|
||||||
|
|
||||||
from . import remote_funcs
|
from . import dns, remote_funcs
|
||||||
from .dns import show_dns
|
|
||||||
from .sshexec import SSHExec
|
from .sshexec import SSHExec
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -54,7 +53,10 @@ def run_cmd_options(parser):
|
|||||||
|
|
||||||
def run_cmd(args, out):
|
def run_cmd(args, out):
|
||||||
"""Deploy chatmail services on the remote server."""
|
"""Deploy chatmail services on the remote server."""
|
||||||
retcode, remote_data = show_dns(args, out)
|
|
||||||
|
remote_data = dns.get_initial_remote_data(args, out)
|
||||||
|
if not remote_data:
|
||||||
|
return 1
|
||||||
|
|
||||||
env = os.environ.copy()
|
env = os.environ.copy()
|
||||||
env["CHATMAIL_INI"] = args.inipath
|
env["CHATMAIL_INI"] = args.inipath
|
||||||
@@ -62,12 +64,12 @@ def run_cmd(args, out):
|
|||||||
pyinf = "pyinfra --dry" if args.dry_run else "pyinfra"
|
pyinf = "pyinfra --dry" if args.dry_run else "pyinfra"
|
||||||
cmd = f"{pyinf} --ssh-user root {args.config.mail_domain} {deploy_path}"
|
cmd = f"{pyinf} --ssh-user root {args.config.mail_domain} {deploy_path}"
|
||||||
|
|
||||||
out.check_call(cmd, env=env)
|
retcode = out.check_call(cmd, env=env)
|
||||||
if retcode == 0:
|
if retcode == 0:
|
||||||
out.green("Deploy completed, call `cmdeploy test` next.")
|
out.green("Deploy completed, call `cmdeploy dns` next.")
|
||||||
elif not remote_data["acme_account_url"]:
|
elif not remote_data["acme_account_url"]:
|
||||||
out.red("Deploy completed but letsencrypt not configured")
|
out.red("Deploy completed but letsencrypt not configured")
|
||||||
out.red("Run 'cmdeploy dns' or 'cmdeploy run' again")
|
out.red("Run 'cmdeploy run' again")
|
||||||
retcode = 0
|
retcode = 0
|
||||||
else:
|
else:
|
||||||
out.red("Deploy failed")
|
out.red("Deploy failed")
|
||||||
@@ -84,7 +86,10 @@ def dns_cmd_options(parser):
|
|||||||
|
|
||||||
def dns_cmd(args, out):
|
def dns_cmd(args, out):
|
||||||
"""Check DNS entries and optionally generate dns zone file."""
|
"""Check DNS entries and optionally generate dns zone file."""
|
||||||
retcode, remote_data = show_dns(args, out)
|
remote_data = dns.get_initial_remote_data(args, out)
|
||||||
|
if not remote_data:
|
||||||
|
return 1
|
||||||
|
retcode = dns.show_dns(args, out, remote_data)
|
||||||
return retcode
|
return retcode
|
||||||
|
|
||||||
|
|
||||||
@@ -278,9 +283,16 @@ def main(args=None):
|
|||||||
if not hasattr(args, "func"):
|
if not hasattr(args, "func"):
|
||||||
return parser.parse_args(["-h"])
|
return parser.parse_args(["-h"])
|
||||||
|
|
||||||
def get_sshexec(log=None):
|
ssh_exec_cache = []
|
||||||
print(f"[ssh] login to {args.config.mail_domain}")
|
|
||||||
return SSHExec(args.config.mail_domain, remote_funcs, log=log)
|
def get_sshexec():
|
||||||
|
if not ssh_exec_cache:
|
||||||
|
print(f"[ssh] login to {args.config.mail_domain}")
|
||||||
|
ssh_exec = SSHExec(
|
||||||
|
args.config.mail_domain, remote_funcs, verbose=args.verbose
|
||||||
|
)
|
||||||
|
ssh_exec_cache.append(ssh_exec)
|
||||||
|
return ssh_exec_cache[0]
|
||||||
|
|
||||||
args.get_sshexec = get_sshexec
|
args.get_sshexec = get_sshexec
|
||||||
|
|
||||||
|
|||||||
@@ -1,59 +1,74 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import importlib
|
import importlib
|
||||||
import sys
|
|
||||||
|
from jinja2 import Template
|
||||||
|
|
||||||
from . import remote_funcs
|
from . import remote_funcs
|
||||||
|
|
||||||
|
|
||||||
def show_dns(args, out) -> int:
|
def get_initial_remote_data(args, out):
|
||||||
|
sshexec = args.get_sshexec()
|
||||||
|
mail_domain = args.config.mail_domain
|
||||||
|
remote_data = sshexec.logged(
|
||||||
|
call=remote_funcs.perform_initial_checks, kwargs=dict(mail_domain=mail_domain)
|
||||||
|
)
|
||||||
|
|
||||||
|
if not remote_data["A"] and not remote_data["AAAA"]:
|
||||||
|
out.red("Missing A and/or AAAA DNS records for {mail_domain}!")
|
||||||
|
elif not remote_data["MTA_STS"]:
|
||||||
|
out.red("Missing MTA_STS record:")
|
||||||
|
out(f"{mail_domain}. CNAME {mail_domain}")
|
||||||
|
else:
|
||||||
|
return remote_data
|
||||||
|
|
||||||
|
|
||||||
|
def show_dns(args, out, remote_data) -> int:
|
||||||
"""Check existing DNS records, optionally write them to zone file
|
"""Check existing DNS records, optionally write them to zone file
|
||||||
and return (exitcode, remote_data) tuple."""
|
and return (exitcode, remote_data) tuple."""
|
||||||
template = importlib.resources.files(__package__).joinpath("chatmail.zone.f")
|
|
||||||
mail_domain = args.config.mail_domain
|
|
||||||
|
|
||||||
def log_progress(data):
|
sshexec = args.get_sshexec()
|
||||||
sys.stdout.write(".")
|
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
sshexec = args.get_sshexec(log=print if args.verbose else log_progress)
|
if not remote_data["acme_account_url"]:
|
||||||
print("Checking DNS entries ", end="\n" if args.verbose else "")
|
out.red("could not get letsencrypt account url, please run 'cmdeploy run'")
|
||||||
|
return 1
|
||||||
|
|
||||||
remote_data = sshexec(remote_funcs.perform_initial_checks, mail_domain=mail_domain)
|
if not remote_data["dkim_entry"]:
|
||||||
|
out.red("could not determine dkim_entry, please run 'cmdeploy run'")
|
||||||
|
return 1
|
||||||
|
|
||||||
assert remote_data["ipv4"] or remote_data["ipv6"]
|
sts_id = remote_data.get("sts_id")
|
||||||
|
if not sts_id:
|
||||||
|
sts_id = datetime.datetime.now().strftime("%Y%m%d%H%M")
|
||||||
|
|
||||||
with open(template, "r") as f:
|
template = importlib.resources.files(__package__).joinpath("chatmail.zone.j2")
|
||||||
zonefile = f.read().format(
|
content = template.read_text()
|
||||||
acme_account_url=remote_data["acme_account_url"],
|
zonefile = Template(content).render(
|
||||||
dkim_entry=remote_data["dkim_entry"],
|
acme_account_url=remote_data.get("acme_account_url"),
|
||||||
ipv6=remote_data["ipv6"],
|
dkim_entry=remote_data["dkim_entry"],
|
||||||
ipv4=remote_data["ipv4"],
|
ipv4=remote_data["A"],
|
||||||
sts_id=datetime.datetime.now().strftime("%Y%m%d%H%M"),
|
ipv6=remote_data["AAAA"],
|
||||||
chatmail_domain=args.config.mail_domain,
|
sts_id=sts_id,
|
||||||
)
|
chatmail_domain=args.config.mail_domain,
|
||||||
|
)
|
||||||
|
lines = [x.strip() for x in zonefile.split("\n") if x.strip()]
|
||||||
|
lines.append("")
|
||||||
|
zonefile = "\n".join(lines)
|
||||||
|
|
||||||
to_print = sshexec(remote_funcs.check_zonefile, zonefile=zonefile)
|
diff_records = sshexec.logged(
|
||||||
if not args.verbose:
|
remote_funcs.check_zonefile, kwargs=dict(zonefile=zonefile)
|
||||||
print()
|
)
|
||||||
|
|
||||||
if getattr(args, "zonefile", None):
|
if getattr(args, "zonefile", None):
|
||||||
with open(args.zonefile, "w+") as zf:
|
with open(args.zonefile, "w+") as zf:
|
||||||
zf.write(zonefile)
|
zf.write(zonefile)
|
||||||
out.green(f"DNS records successfully written to: {args.zonefile}")
|
out.green(f"DNS records successfully written to: {args.zonefile}")
|
||||||
return 0, remote_data
|
return -1
|
||||||
|
|
||||||
if to_print:
|
if diff_records:
|
||||||
to_print.insert(
|
out.red("Please set the following DNS entries at your DNS provider:\n")
|
||||||
0, "You should configure the following entries at your DNS provider:\n"
|
for line in diff_records:
|
||||||
)
|
out(line)
|
||||||
to_print.append(
|
return 1
|
||||||
"\nIf you already configured the DNS entries, "
|
|
||||||
"wait a bit until the DNS entries propagate to the Internet."
|
|
||||||
)
|
|
||||||
out.red("\n".join(to_print))
|
|
||||||
exit_code = 1
|
|
||||||
else:
|
else:
|
||||||
out.green("Great! All your DNS entries are verified and correct.")
|
out.green("Great! All your DNS entries are verified and correct.")
|
||||||
exit_code = 0
|
return 0
|
||||||
|
|
||||||
return exit_code, remote_data
|
|
||||||
|
|||||||
@@ -1,21 +1,21 @@
|
|||||||
"""
|
"""
|
||||||
Functions to be executed on an ssh-connected host.
|
Pure python functions which execute remotely in a system Python interpreter.
|
||||||
|
|
||||||
All functions of this module need to work with Python builtin types
|
All functions of this module
|
||||||
and standard library dependencies only.
|
|
||||||
|
|
||||||
When a remote function executes remotely, it runs in a system python interpreter
|
- need to get and and return Python builtin data types only,
|
||||||
without any installed dependencies.
|
|
||||||
|
|
||||||
|
- can only use standard library dependencies,
|
||||||
|
|
||||||
|
- can freely call each other.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import re
|
import re
|
||||||
import socket
|
|
||||||
from subprocess import CalledProcessError, check_output
|
from subprocess import CalledProcessError, check_output
|
||||||
|
|
||||||
|
|
||||||
def shell(command, fail_ok=False):
|
def shell(command, fail_ok=False):
|
||||||
log(f"$ {command}")
|
print(f"$ {command}")
|
||||||
try:
|
try:
|
||||||
return check_output(command, shell=True).decode().rstrip()
|
return check_output(command, shell=True).decode().rstrip()
|
||||||
except CalledProcessError:
|
except CalledProcessError:
|
||||||
@@ -30,63 +30,61 @@ def get_systemd_running():
|
|||||||
|
|
||||||
|
|
||||||
def perform_initial_checks(mail_domain):
|
def perform_initial_checks(mail_domain):
|
||||||
res = {}
|
"""Collecting initial DNS zone content."""
|
||||||
|
A = query_dns("A", mail_domain)
|
||||||
|
AAAA = query_dns("AAAA", mail_domain)
|
||||||
|
MTA_STS = query_dns("CNAME", f"mta-sts.{mail_domain}")
|
||||||
|
|
||||||
|
res = dict(A=A, AAAA=AAAA, MTA_STS=MTA_STS)
|
||||||
|
if not MTA_STS or (not A and not AAAA):
|
||||||
|
return res
|
||||||
|
|
||||||
res["acme_account_url"] = shell("acmetool account-url", fail_ok=True)
|
res["acme_account_url"] = shell("acmetool account-url", fail_ok=True)
|
||||||
if not shell("dig", fail_ok=True):
|
if not shell("dig", fail_ok=True):
|
||||||
shell("apt-get install -y dnsutils")
|
shell("apt-get install -y dnsutils")
|
||||||
shell(f"unbound-control flush_zone {mail_domain}", fail_ok=True)
|
shell(f"unbound-control flush_zone {mail_domain}", fail_ok=True)
|
||||||
|
|
||||||
res["dkim_entry"] = get_dkim_entry(mail_domain, dkim_selector="opendkim")
|
res["dkim_entry"] = get_dkim_entry(mail_domain, dkim_selector="opendkim")
|
||||||
res["ipv4"] = get_ip_address(socket.AF_INET)
|
|
||||||
res["ipv6"] = get_ip_address(socket.AF_INET6)
|
# parse out sts-id if exists, example: "v=STSv1; id=2090123"
|
||||||
|
parts = query_dns("TXT", f"_mta-sts.{mail_domain}").split("id=")
|
||||||
|
res["sts_id"] = parts[1].rstrip('"') if len(parts) == 2 else ""
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
def get_dkim_entry(mail_domain, dkim_selector):
|
def get_dkim_entry(mail_domain, dkim_selector):
|
||||||
dkim_pubkey = shell(
|
try:
|
||||||
f"openssl rsa -in /etc/dkimkeys/{dkim_selector}.private "
|
dkim_pubkey = shell(
|
||||||
"-pubout 2>/dev/null | awk '/-/{next}{printf(\"%s\",$0)}'"
|
f"openssl rsa -in /etc/dkimkeys/{dkim_selector}.private "
|
||||||
)
|
"-pubout 2>/dev/null | awk '/-/{next}{printf(\"%s\",$0)}'"
|
||||||
|
)
|
||||||
|
except CalledProcessError:
|
||||||
|
return
|
||||||
dkim_value_raw = f"v=DKIM1;k=rsa;p={dkim_pubkey};s=email;t=s"
|
dkim_value_raw = f"v=DKIM1;k=rsa;p={dkim_pubkey};s=email;t=s"
|
||||||
dkim_value = '" "'.join(re.findall(".{1,255}", dkim_value_raw))
|
dkim_value = '" "'.join(re.findall(".{1,255}", dkim_value_raw))
|
||||||
return f'{dkim_selector}._domainkey.{mail_domain}. TXT "{dkim_value}"'
|
return f'{dkim_selector}._domainkey.{mail_domain}. TXT "{dkim_value}"'
|
||||||
|
|
||||||
|
|
||||||
def get_ip_address(typ):
|
|
||||||
sock = socket.socket(typ, socket.SOCK_DGRAM)
|
|
||||||
sock.settimeout(0)
|
|
||||||
sock.connect(("notifications.delta.chat", 1))
|
|
||||||
return sock.getsockname()[0]
|
|
||||||
|
|
||||||
|
|
||||||
def query_dns(typ, domain):
|
def query_dns(typ, domain):
|
||||||
res = shell(f"dig -r -q {domain} -t {typ} +short")
|
res = shell(f"dig -r -q {domain} -t {typ} +short")
|
||||||
return set(filter(None, res.split("\n")))
|
print(res)
|
||||||
|
if res:
|
||||||
|
return res.split("\n")[0]
|
||||||
|
|
||||||
|
|
||||||
def check_zonefile(zonefile):
|
def check_zonefile(zonefile):
|
||||||
|
"""Check all expected zone file entries."""
|
||||||
diff = []
|
diff = []
|
||||||
|
|
||||||
for zf_line in zonefile.splitlines():
|
for zf_line in zonefile.splitlines():
|
||||||
|
print("")
|
||||||
|
print(f"dns-checking {zf_line!r}")
|
||||||
zf_domain, zf_typ, zf_value = zf_line.split(maxsplit=2)
|
zf_domain, zf_typ, zf_value = zf_line.split(maxsplit=2)
|
||||||
zf_domain = zf_domain.rstrip(".")
|
zf_domain = zf_domain.rstrip(".")
|
||||||
zf_value = zf_value.strip()
|
zf_value = zf_value.strip()
|
||||||
query_values = query_dns(zf_typ, zf_domain)
|
query_value = query_dns(zf_typ, zf_domain)
|
||||||
if zf_value in query_values:
|
if zf_value != query_value:
|
||||||
continue
|
assert zf_typ in ("A", "AAAA", "CNAME", "CAA", "SRV", "MX", "TXT"), zf_line
|
||||||
|
diff.append(zf_line)
|
||||||
if zf_typ == "CAA" and zf_value.endswith('accounturi="'):
|
|
||||||
# this is an initial run where acmetool did not work yet
|
|
||||||
continue
|
|
||||||
|
|
||||||
if query_values and zf_typ == "TXT" and zf_domain.startswith("_mta-sts."):
|
|
||||||
(query_value,) = query_values
|
|
||||||
if query_value.split("id=")[0] == zf_value.split("id=")[0]:
|
|
||||||
continue
|
|
||||||
|
|
||||||
assert zf_typ in ("A", "AAAA", "CNAME", "CAA", "SRV", "MX", "TXT"), zf_line
|
|
||||||
diff.append(zf_line)
|
|
||||||
|
|
||||||
return diff
|
return diff
|
||||||
|
|
||||||
@@ -96,10 +94,11 @@ def check_zonefile(zonefile):
|
|||||||
|
|
||||||
if __name__ == "__channelexec__":
|
if __name__ == "__channelexec__":
|
||||||
|
|
||||||
def log(item):
|
def print(item):
|
||||||
channel.send(("log", item)) # noqa
|
channel.send(("log", item)) # noqa
|
||||||
|
|
||||||
while 1:
|
while 1:
|
||||||
func_name, kwargs = channel.receive() # noqa
|
func_name, kwargs = channel.receive() # noqa
|
||||||
|
kwargs = kwargs if kwargs else {}
|
||||||
res = globals()[func_name](**kwargs) # noqa
|
res = globals()[func_name](**kwargs) # noqa
|
||||||
channel.send(("finish", res)) # noqa
|
channel.send(("finish", res)) # noqa
|
||||||
|
|||||||
@@ -1,20 +1,39 @@
|
|||||||
|
import sys
|
||||||
|
|
||||||
import execnet
|
import execnet
|
||||||
|
|
||||||
|
|
||||||
class SSHExec:
|
class SSHExec:
|
||||||
RemoteError = execnet.RemoteError
|
RemoteError = execnet.RemoteError
|
||||||
|
|
||||||
def __init__(self, host, remote_funcs, log=None, python="python3", timeout=60):
|
def __init__(self, host, remote_funcs, verbose=False, python="python3", timeout=60):
|
||||||
self.gateway = execnet.makegateway(f"ssh=root@{host}//python={python}")
|
self.gateway = execnet.makegateway(f"ssh=root@{host}//python={python}")
|
||||||
self._remote_cmdloop_channel = self.gateway.remote_exec(remote_funcs)
|
self._remote_cmdloop_channel = self.gateway.remote_exec(remote_funcs)
|
||||||
self.log = log
|
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
|
self.verbose = verbose
|
||||||
|
|
||||||
def __call__(self, func, **kwargs):
|
def __call__(self, call, kwargs=None, log_callback=None):
|
||||||
self._remote_cmdloop_channel.send((func.__name__, kwargs))
|
self._remote_cmdloop_channel.send((call.__name__, kwargs))
|
||||||
while 1:
|
while 1:
|
||||||
code, data = self._remote_cmdloop_channel.receive(timeout=self.timeout)
|
code, data = self._remote_cmdloop_channel.receive(timeout=self.timeout)
|
||||||
if code == "log" and self.log:
|
if log_callback is not None and code == "log":
|
||||||
self.log(data)
|
log_callback(data)
|
||||||
elif code == "finish":
|
elif code == "finish":
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
def logged(self, call, kwargs):
|
||||||
|
def log_progress(data):
|
||||||
|
sys.stdout.write(".")
|
||||||
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
title = call.__doc__
|
||||||
|
if not title:
|
||||||
|
title = call.__name__
|
||||||
|
if self.verbose:
|
||||||
|
print("[ssh] " + title)
|
||||||
|
return self(call, kwargs, log_callback=print)
|
||||||
|
else:
|
||||||
|
print(title, end="")
|
||||||
|
res = self(call, kwargs, log_callback=log_progress)
|
||||||
|
print()
|
||||||
|
return res
|
||||||
|
|||||||
@@ -7,18 +7,38 @@ from cmdeploy.sshexec import SSHExec
|
|||||||
|
|
||||||
|
|
||||||
class TestSSHExecutor:
|
class TestSSHExecutor:
|
||||||
@pytest.fixture
|
@pytest.fixture(scope="class")
|
||||||
def sshexec(self, sshdomain):
|
def sshexec(self, sshdomain):
|
||||||
return SSHExec(sshdomain, remote_funcs)
|
return SSHExec(sshdomain, remote_funcs)
|
||||||
|
|
||||||
def test_ls(self, sshexec):
|
def test_ls(self, sshexec):
|
||||||
out = sshexec(remote_funcs.shell, command="ls")
|
out = sshexec(call=remote_funcs.shell, kwargs=dict(command="ls"))
|
||||||
out2 = sshexec(remote_funcs.shell, command="ls")
|
out2 = sshexec(call=remote_funcs.shell, kwargs=dict(command="ls"))
|
||||||
assert out == out2
|
assert out == out2
|
||||||
|
|
||||||
def test_perform_initial(self, sshexec, maildomain):
|
def test_perform_initial(self, sshexec, maildomain):
|
||||||
res = sshexec(remote_funcs.perform_initial_checks, mail_domain=maildomain)
|
res = sshexec(
|
||||||
assert res["ipv4"] or res["ipv6"]
|
remote_funcs.perform_initial_checks, kwargs=dict(mail_domain=maildomain)
|
||||||
|
)
|
||||||
|
assert res["A"] or res["AAAA"]
|
||||||
|
|
||||||
|
def test_logged(self, sshexec, maildomain, capsys):
|
||||||
|
sshexec.logged(
|
||||||
|
remote_funcs.perform_initial_checks, kwargs=dict(mail_domain=maildomain)
|
||||||
|
)
|
||||||
|
out, err = capsys.readouterr()
|
||||||
|
assert out.startswith("Collecting")
|
||||||
|
assert out.endswith("....\n")
|
||||||
|
assert out.count("\n") == 1
|
||||||
|
|
||||||
|
sshexec.verbose = True
|
||||||
|
sshexec.logged(
|
||||||
|
remote_funcs.perform_initial_checks, kwargs=dict(mail_domain=maildomain)
|
||||||
|
)
|
||||||
|
out, err = capsys.readouterr()
|
||||||
|
lines = out.split("\n")
|
||||||
|
assert len(lines) > 4
|
||||||
|
assert remote_funcs.perform_initial_checks.__doc__ in lines[0]
|
||||||
|
|
||||||
|
|
||||||
def test_remote(remote, imap_or_smtp):
|
def test_remote(remote, imap_or_smtp):
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ def pytest_runtest_setup(item):
|
|||||||
pytest.skip("skipping slow test, use --slow to run")
|
pytest.skip("skipping slow test, use --slow to run")
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(scope="session")
|
||||||
def chatmail_config(pytestconfig):
|
def chatmail_config(pytestconfig):
|
||||||
current = basedir = Path().resolve()
|
current = basedir = Path().resolve()
|
||||||
while 1:
|
while 1:
|
||||||
@@ -49,12 +49,12 @@ def chatmail_config(pytestconfig):
|
|||||||
pytest.skip(f"no chatmail.ini file found in {basedir} or parent dirs")
|
pytest.skip(f"no chatmail.ini file found in {basedir} or parent dirs")
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(scope="session")
|
||||||
def maildomain(chatmail_config):
|
def maildomain(chatmail_config):
|
||||||
return chatmail_config.mail_domain
|
return chatmail_config.mail_domain
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(scope="session")
|
||||||
def sshdomain(maildomain):
|
def sshdomain(maildomain):
|
||||||
return os.environ.get("CHATMAIL_SSH", maildomain)
|
return os.environ.get("CHATMAIL_SSH", maildomain)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user