mirror of
https://github.com/chatmail/relay.git
synced 2026-05-10 16:04:37 +00:00
345 lines
9.2 KiB
Python
345 lines
9.2 KiB
Python
import os
|
|
import io
|
|
import time
|
|
import random
|
|
import subprocess
|
|
import imaplib
|
|
import smtplib
|
|
import itertools
|
|
from pathlib import Path
|
|
from math import ceil
|
|
import pytest
|
|
|
|
|
|
conftestdir = Path(__file__).parent
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
parser.addoption(
|
|
"--slow", action="store_true", default=False, help="also run slow tests"
|
|
)
|
|
|
|
|
|
def pytest_configure(config):
|
|
config._benchresults = {}
|
|
config.addinivalue_line(
|
|
"markers", "slow: mark test to require --slow option to run"
|
|
)
|
|
|
|
|
|
def pytest_runtest_setup(item):
|
|
markers = list(item.iter_markers(name="slow"))
|
|
if markers:
|
|
if not item.config.getoption("--slow"):
|
|
pytest.skip("skipping slow test, use --slow to run")
|
|
|
|
|
|
@pytest.fixture
|
|
def maildomain():
|
|
domain = os.environ.get("CHATMAIL_DOMAIN")
|
|
if not domain:
|
|
pytest.skip("set CHATMAIL_DOMAIN to a ssh-reachable chatmail instance")
|
|
return domain
|
|
|
|
|
|
@pytest.fixture
|
|
def sshdomain(maildomain):
|
|
return os.environ.get("CHATMAIL_SSH", maildomain)
|
|
|
|
|
|
@pytest.fixture
|
|
def maildomain2():
|
|
domain = os.environ.get("CHATMAIL_DOMAIN2")
|
|
if not domain:
|
|
pytest.skip("set CHATMAIL_DOMAIN2 to a ssh-reachable chatmail instance")
|
|
return domain
|
|
|
|
|
|
@pytest.fixture
|
|
def sshdomain2(maildomain2):
|
|
return os.environ.get("CHATMAIL_SSH2", maildomain2)
|
|
|
|
|
|
def pytest_report_header():
|
|
domain = os.environ.get("CHATMAIL_DOMAIN")
|
|
if domain:
|
|
text = f"chatmail test instance: {domain}"
|
|
return ["-" * len(text), text, "-" * len(text)]
|
|
|
|
|
|
@pytest.fixture
|
|
def benchmark(request):
|
|
def bench(func, num, name=None):
|
|
if name is None:
|
|
name = func.__name__
|
|
durations = []
|
|
for i in range(num):
|
|
now = time.time()
|
|
func()
|
|
durations.append(time.time() - now)
|
|
durations.sort()
|
|
request.config._benchresults[name] = durations
|
|
|
|
return bench
|
|
|
|
|
|
def pytest_terminal_summary(terminalreporter):
|
|
tr = terminalreporter
|
|
results = tr.config._benchresults
|
|
if not results:
|
|
return
|
|
|
|
tr.section("benchmark results")
|
|
float_names = 'median min max'.split()
|
|
width = max(map(len, float_names))
|
|
|
|
def fcol(parts):
|
|
return " ".join(part.rjust(width) for part in parts)
|
|
|
|
headers = f"{'benchmark name': <30} " + fcol(float_names)
|
|
tr.write_line(headers)
|
|
tr.write_line("-" * len(headers))
|
|
for name, durations in results.items():
|
|
measures = [
|
|
sorted(durations)[len(durations) // 2],
|
|
min(durations),
|
|
max(durations),
|
|
]
|
|
line = f"{name: <30} "
|
|
line += fcol(f"{float: 2.2f}" for float in measures)
|
|
tr.write_line(line)
|
|
|
|
|
|
@pytest.fixture
|
|
def imap(maildomain):
|
|
return ImapConn(maildomain)
|
|
|
|
|
|
class ImapConn:
|
|
AuthError = imaplib.IMAP4.error
|
|
logcmd = "journalctl -f -u dovecot"
|
|
name = "dovecot"
|
|
|
|
def __init__(self, host):
|
|
self.host = host
|
|
|
|
def connect(self):
|
|
print(f"imap-connect {self.host}")
|
|
self.conn = imaplib.IMAP4_SSL(self.host)
|
|
|
|
def login(self, user, password):
|
|
print(f"imap-login {user!r} {password!r}")
|
|
self.conn.login(user, password)
|
|
|
|
def fetch_all(self):
|
|
print("imap-fetch all")
|
|
status, res = self.conn.select()
|
|
if int(res[0]) == 0:
|
|
raise ValueError("no messages in imap folder")
|
|
status, results = self.conn.fetch("1:*", "(RFC822)")
|
|
assert status == "OK"
|
|
return results
|
|
|
|
def fetch_all_messages(self):
|
|
print("imap-fetch all messages")
|
|
results = self.fetch_all()
|
|
messages = []
|
|
for item in results:
|
|
if len(item) == 2:
|
|
messages.append(item[1].decode())
|
|
return messages
|
|
|
|
|
|
@pytest.fixture
|
|
def smtp(maildomain):
|
|
return SmtpConn(maildomain)
|
|
|
|
|
|
class SmtpConn:
|
|
AuthError = smtplib.SMTPAuthenticationError
|
|
logcmd = "journalctl -f -t postfix/smtpd -t postfix/smtp -t postfix/lmtp"
|
|
name = "postfix"
|
|
|
|
def __init__(self, host):
|
|
self.host = host
|
|
|
|
def connect(self):
|
|
print(f"smtp-connect {self.host}")
|
|
self.conn = smtplib.SMTP_SSL(self.host)
|
|
|
|
def login(self, user, password):
|
|
print(f"smtp-login {user!r} {password!r}")
|
|
self.conn.login(user, password)
|
|
|
|
def sendmail(self, from_addr, to_addrs, msg):
|
|
print(f"smtp-sendmail from={from_addr!r} to_addrs={to_addrs!r}")
|
|
print(f"smtp-sendmail message size: {len(msg)}")
|
|
return self.conn.sendmail(from_addr=from_addr, to_addrs=to_addrs, msg=msg)
|
|
|
|
|
|
@pytest.fixture(params=["imap", "smtp"])
|
|
def imap_or_smtp(request):
|
|
return request.getfixturevalue(request.param)
|
|
|
|
|
|
@pytest.fixture
|
|
def gencreds(maildomain):
|
|
count = itertools.count()
|
|
next(count)
|
|
|
|
def gen(domain=None):
|
|
domain = domain if domain else maildomain
|
|
while 1:
|
|
num = next(count)
|
|
alphanumeric = "abcdefghijklmnopqrstuvwxyz1234567890"
|
|
user = "".join(random.choices(alphanumeric, k=10))
|
|
user = f"ac{num}_{user}"
|
|
password = "".join(random.choices(alphanumeric, k=10))
|
|
yield f"{user}@{domain}", f"{password}"
|
|
|
|
return lambda domain=None: next(gen(domain))
|
|
|
|
|
|
#
|
|
# Delta Chat testplugin re-use
|
|
# use the cmfactory fixture to get chatmail instance accounts
|
|
#
|
|
|
|
|
|
class ChatmailTestProcess:
|
|
"""Provider for chatmail instance accounts as used by deltachat.testplugin.acfactory"""
|
|
|
|
def __init__(self, pytestconfig, maildomain, gencreds):
|
|
self.pytestconfig = pytestconfig
|
|
self.maildomain = maildomain
|
|
assert "." in self.maildomain, maildomain
|
|
self.gencreds = gencreds
|
|
self._addr2files = {}
|
|
|
|
def get_liveconfig_producer(self):
|
|
while 1:
|
|
user, password = self.gencreds(self.maildomain)
|
|
config = {
|
|
"addr": user,
|
|
"mail_pw": password,
|
|
}
|
|
# speed up account configuration
|
|
config["mail_server"] = self.maildomain
|
|
config["send_server"] = self.maildomain
|
|
yield config
|
|
|
|
def cache_maybe_retrieve_configured_db_files(self, cache_addr, db_target_path):
|
|
pass
|
|
|
|
def cache_maybe_store_configured_db_files(self, acc):
|
|
pass
|
|
|
|
|
|
@pytest.fixture
|
|
def cmfactory(request, gencreds, tmpdir, data, maildomain):
|
|
# cloned from deltachat.testplugin.amfactory
|
|
pytest.importorskip("deltachat")
|
|
from deltachat.testplugin import ACFactory
|
|
|
|
testproc = ChatmailTestProcess(request.config, maildomain, gencreds)
|
|
am = ACFactory(request=request, tmpdir=tmpdir, testprocess=testproc, data=data)
|
|
|
|
# nb. a bit hacky
|
|
# would probably be better if deltachat's test machinery grows native support
|
|
def switch_maildomain(maildomain2):
|
|
am.testprocess.maildomain = maildomain2
|
|
|
|
am.switch_maildomain = switch_maildomain
|
|
|
|
yield am
|
|
if hasattr(request.node, "rep_call") and request.node.rep_call.failed:
|
|
if testproc.pytestconfig.getoption("--extra-info"):
|
|
logfile = io.StringIO()
|
|
am.dump_imap_summary(logfile=logfile)
|
|
print(logfile.getvalue())
|
|
# request.node.add_report_section("call", "imap-server-state", s)
|
|
|
|
|
|
@pytest.fixture
|
|
def remote(sshdomain):
|
|
return Remote(sshdomain)
|
|
|
|
|
|
class Remote:
|
|
def __init__(self, sshdomain):
|
|
self.sshdomain = sshdomain
|
|
|
|
def iter_output(self, logcmd=""):
|
|
getjournal = f"journalctl -f" if not logcmd else logcmd
|
|
self.popen = subprocess.Popen(
|
|
["ssh", f"root@{self.sshdomain}", getjournal],
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
while 1:
|
|
line = self.popen.stdout.readline()
|
|
res = line.decode().strip().lower()
|
|
if res:
|
|
yield res
|
|
else:
|
|
break
|
|
|
|
|
|
@pytest.fixture
|
|
def mailgen(request):
|
|
datadir = conftestdir.joinpath("mail-data")
|
|
|
|
class Mailgen:
|
|
def get_encrypted(self, from_addr, to_addr):
|
|
data = datadir.joinpath("encrypted.eml").read_text()
|
|
return data.format(from_addr=from_addr, to_addr=to_addr)
|
|
|
|
return Mailgen()
|
|
|
|
|
|
@pytest.fixture
|
|
def cmsetup(maildomain, gencreds):
|
|
return CMSetup(maildomain, gencreds)
|
|
|
|
|
|
class CMSetup:
|
|
def __init__(self, maildomain, gencreds):
|
|
self.maildomain = maildomain
|
|
self.gencreds = gencreds
|
|
|
|
def gen_users(self, num):
|
|
print(f"Creating {num} online users")
|
|
users = []
|
|
for i in range(num):
|
|
addr, password = self.gencreds()
|
|
user = CMUser(self.maildomain, addr, password)
|
|
assert user.smtp
|
|
users.append(user)
|
|
return users
|
|
|
|
|
|
class CMUser:
|
|
def __init__(self, maildomain, addr, password):
|
|
self.maildomain = maildomain
|
|
self.addr = addr
|
|
self.password = password
|
|
self._smtp = None
|
|
self._imap = None
|
|
|
|
@property
|
|
def smtp(self):
|
|
if not self._smtp:
|
|
handle = SmtpConn(self.maildomain)
|
|
handle.connect()
|
|
handle.login(self.addr, self.password)
|
|
self._smtp = handle
|
|
return self._smtp
|
|
|
|
@property
|
|
def imap(self):
|
|
if not self._imap:
|
|
imap = ImapConn(self.maildomain)
|
|
imap.connect()
|
|
imap.login(self.addr, self.password)
|
|
self._imap = imap
|
|
return self._imap
|