Compare commits

..

5 Commits

Author SHA1 Message Date
holger krekel
009f549619 document some attributes in chatmail.ini 2023-12-09 01:20:17 +01:00
holger krekel
99d36235fe get passthrough_recipients list from config 2023-12-09 01:07:37 +01:00
holger krekel
b52a8c969f various fixes 2023-12-09 00:22:58 +01:00
holger krekel
8520a9d8f2 introduce basic config file 2023-12-08 21:56:15 +01:00
holger krekel
652b9688d3 deploy chatmaild in a virtualenv to make it easier to add dependencies 2023-12-08 21:55:18 +01:00
15 changed files with 325 additions and 135 deletions

View File

@@ -1,15 +1,32 @@
[config]
[params]
# how many mails a user can send out per minute
max_user_send_per_minute = 60
# list of e-mail recipients for which to accept outbound un-encrypted mails
passthrough_recipients = privacy@testrun.org xstore@testrun.org
# where the filtermail SMTP service listens
filtermail_smtp_port = 10080
# to which port to re-inject messages after they passed filtermail
postfix_reinject_port = 10025
[privacy:testrun]
# the settings in this section are only applied
# if the instantiated mail domain shell-matches the 'domain' setting
domain = *.testrun.org
privacy_postal =
Merlinux GmbH, Represented by the managing director H. Krekel,
Reichgrafen Str. 20, 79102 Freiburg, Germany
privacy_mail = delta-privacy@merlinux.eu
privacy_pdo =
Prof. Dr. Fabian Schmieder, lexICT UG (limited), Ostfeldstr. 49, 30559 Hannover.
You can contact him at *delta-privacy@merlinux.eu* (Keyword: DPO)
privacy_supervisor =
State Commissioner for Data Protection and Freedom of Information of
Baden-Württemberg in 70173 Stuttgart, Germany.

View File

@@ -4,9 +4,10 @@ build-backend = "setuptools.build_meta"
[project]
name = "chatmaild"
version = "0.1"
version = "0.2"
dependencies = [
"aiosmtpd",
"iniconfig",
]
[project.scripts]

View File

@@ -0,0 +1,41 @@
from pathlib import Path
from fnmatch import fnmatch
import iniconfig
system_mailname_path = Path("/etc/mailname")
def read_config(inipath, mailname=None):
if mailname is None:
with open(system_mailname_path) as f:
mailname = f.read().strip()
ini = iniconfig.IniConfig(inipath)
privacy = {}
for section in ini:
if section.name.startswith("privacy:"):
domain = section["domain"]
if fnmatch(mailname, domain):
privacy = section
break
return Config(inipath, mailname, privacy, params=ini.sections["params"])
class Config:
def __init__(self, inipath, mailname, privacy, params):
self._inipath = inipath
self.mailname = mailname
self.privacy_postal = privacy.get("privacy_postal")
self.privacy_mail = privacy.get("privacy_mail")
self.privacy_pdo = privacy.get("privacy_pdo")
self.privacy_supervisor = privacy.get("privacy_supervisor")
self.max_user_send_per_minute = int(params["max_user_send_per_minute"])
self.filtermail_smtp_port = int(params["filtermail_smtp_port"])
self.postfix_reinject_port = int(params["postfix_reinject_port"])
self.passthrough_recipients = params["passthrough_recipients"].split()
def _getbytefile(self):
return open(self._inipath, "rb")

View File

@@ -11,6 +11,8 @@ from aiosmtpd.smtp import SMTP
from aiosmtpd.controller import Controller
from smtplib import SMTP as SMTPClient
from .config import read_config
def check_encrypted(message):
"""Check that the message is an OpenPGP-encrypted message."""
@@ -34,14 +36,6 @@ def check_encrypted(message):
return True
def is_passthrough_recipient(recipient):
"""Check whether a recipient is configured as passthrough."""
passthroughlist = ["privacy@testrun.org"]
if recipient in passthroughlist:
return True
return False
def check_mdn(message, envelope):
if len(envelope.rcpt_tos) != 1:
return False
@@ -70,19 +64,21 @@ def check_mdn(message, envelope):
return True
class SMTPController(Controller):
def factory(self):
return SMTP(self.handler, **self.SMTP_kwargs)
async def asyncmain_beforequeue(config):
port = config.filtermail_smtp_port
Controller(BeforeQueueHandler(config), hostname="127.0.0.1", port=port).start()
class BeforeQueueHandler:
def __init__(self):
def __init__(self, config):
self.config = config
self.send_rate_limiter = SendRateLimiter()
async def handle_MAIL(self, server, session, envelope, address, mail_options):
logging.info(f"handle_MAIL from {address}")
envelope.mail_from = address
if not self.send_rate_limiter.is_sending_allowed(address):
max_sent = self.config.max_user_send_per_minute
if not self.send_rate_limiter.is_sending_allowed(address, max_sent):
return f"450 4.7.1: Too much mail from {address}"
parts = envelope.mail_from.split("@")
@@ -93,65 +89,58 @@ class BeforeQueueHandler:
async def handle_DATA(self, server, session, envelope):
logging.info("handle_DATA before-queue")
error = check_DATA(envelope)
error = self.check_DATA(envelope)
if error:
return error
logging.info("re-injecting the mail that passed checks")
client = SMTPClient("localhost", "10025")
client = SMTPClient("localhost", self.config.postfix_reinject_port)
client.sendmail(envelope.mail_from, envelope.rcpt_tos, envelope.content)
return "250 OK"
def check_DATA(self, envelope):
"""the central filtering function for e-mails."""
logging.info(f"Processing DATA message from {envelope.mail_from}")
async def asyncmain_beforequeue(port):
Controller(BeforeQueueHandler(), hostname="127.0.0.1", port=port).start()
message = BytesParser(policy=policy.default).parsebytes(envelope.content)
mail_encrypted = check_encrypted(message)
_, from_addr = parseaddr(message.get("from").strip())
logging.info(f"mime-from: {from_addr} envelope-from: {envelope.mail_from!r}")
if envelope.mail_from.lower() != from_addr.lower():
return f"500 Invalid FROM <{from_addr!r}> for <{envelope.mail_from!r}>"
def check_DATA(envelope):
"""the central filtering function for e-mails."""
logging.info(f"Processing DATA message from {envelope.mail_from}")
if not mail_encrypted and check_mdn(message, envelope):
return
message = BytesParser(policy=policy.default).parsebytes(envelope.content)
mail_encrypted = check_encrypted(message)
passthrough_recipients = self.config.passthrough_recipients
envelope_from_domain = from_addr.split("@").pop()
for recipient in envelope.rcpt_tos:
if envelope.mail_from == recipient:
# Always allow sending emails to self.
continue
if recipient in passthrough_recipients:
continue
res = recipient.split("@")
if len(res) != 2:
return f"500 Invalid address <{recipient}>"
_recipient_addr, recipient_domain = res
_, from_addr = parseaddr(message.get("from").strip())
logging.info(f"mime-from: {from_addr} envelope-from: {envelope.mail_from!r}")
if envelope.mail_from.lower() != from_addr.lower():
return f"500 Invalid FROM <{from_addr!r}> for <{envelope.mail_from!r}>"
if not mail_encrypted and check_mdn(message, envelope):
return
envelope_from_domain = from_addr.split("@").pop()
for recipient in envelope.rcpt_tos:
if envelope.mail_from == recipient:
# Always allow sending emails to self.
continue
if is_passthrough_recipient(recipient):
# Always allow recipients marked as passthrough
continue
res = recipient.split("@")
if len(res) != 2:
return f"500 Invalid address <{recipient}>"
_recipient_addr, recipient_domain = res
is_outgoing = recipient_domain != envelope_from_domain
if is_outgoing and not mail_encrypted:
is_securejoin = message.get("secure-join") in ["vc-request", "vg-request"]
if not is_securejoin:
return f"500 Invalid unencrypted mail to <{recipient}>"
is_outgoing = recipient_domain != envelope_from_domain
if is_outgoing and not mail_encrypted:
is_securejoin = message.get("secure-join") in ["vc-request", "vg-request"]
if not is_securejoin:
return f"500 Invalid unencrypted mail to <{recipient}>"
class SendRateLimiter:
MAX_USER_SEND_PER_MINUTE = 80
def __init__(self):
self.addr2timestamps = {}
def is_sending_allowed(self, mail_from):
def is_sending_allowed(self, mail_from, max_send_per_minute):
last = self.addr2timestamps.setdefault(mail_from, [])
now = time.time()
last[:] = [ts for ts in last if ts >= (now - 60)]
if len(last) <= self.MAX_USER_SEND_PER_MINUTE:
if len(last) <= max_send_per_minute:
last.append(now)
return True
return False
@@ -160,9 +149,10 @@ class SendRateLimiter:
def main():
args = sys.argv[1:]
assert len(args) == 1
config = read_config(args[0])
logging.basicConfig(level=logging.WARN)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncmain_beforequeue(port=int(args[0]))
task = asyncmain_beforequeue(config)
loop.create_task(task)
loop.run_forever()

View File

@@ -2,7 +2,7 @@
Description=Chatmail Postfix BeforeQeue filter
[Service]
ExecStart={execpath} 10080
ExecStart={execpath} {config_path}
Restart=always
RestartSec=30

View File

@@ -6,7 +6,6 @@ import importlib.resources
import subprocess
import shutil
import io
import configparser
from pathlib import Path
from pyinfra import host
@@ -15,6 +14,9 @@ from pyinfra.facts.files import File
from pyinfra.facts.systemd import SystemdEnabled
from .acmetool import deploy_acmetool
import chatmaild.filtermail
from chatmaild.config import read_config
def _build_chatmaild(dist_dir) -> None:
dist_dir = Path(dist_dir).resolve()
@@ -30,11 +32,24 @@ def _build_chatmaild(dist_dir) -> None:
return entries[0]
def _install_remote_venv_with_chatmaild() -> None:
def remove_legacy_artifacts():
# disable legacy doveauth-dictproxy.service
if host.get_fact(SystemdEnabled).get("doveauth-dictproxy.service"):
systemd.service(
name="Disable legacy doveauth-dictproxy.service",
service="doveauth-dictproxy.service",
running=False,
enabled=False,
)
def _install_remote_venv_with_chatmaild(config) -> None:
remove_legacy_artifacts()
dist_file = _build_chatmaild(dist_dir=Path("chatmaild/dist"))
remote_base_dir = "/usr/local/lib/chatmaild"
remote_dist_file = f"{remote_base_dir}/dist/{dist_file.name}"
remote_venv_dir = f"{remote_base_dir}/venv"
remote_chatmail_inipath = f"{remote_base_dir}/chatmail.ini"
root_owned = dict(user="root", group="root", mode="644")
apt.packages(
@@ -50,6 +65,13 @@ def _install_remote_venv_with_chatmaild() -> None:
**root_owned,
)
files.put(
name=f"Upload {remote_chatmail_inipath}",
src=config._getbytefile(),
dest=remote_chatmail_inipath,
**root_owned,
)
pip.virtualenv(
name=f"chatmaild virtualenv {remote_venv_dir}",
path=remote_venv_dir,
@@ -63,24 +85,17 @@ def _install_remote_venv_with_chatmaild() -> None:
],
)
# disable legacy doveauth-dictproxy.service
if host.get_fact(SystemdEnabled).get("doveauth-dictproxy.service"):
systemd.service(
name="Disable legacy doveauth-dictproxy.service",
service="doveauth-dictproxy.service",
running=False,
enabled=False,
)
# install systemd units
for fn in (
"doveauth",
"filtermail",
):
execpath = f"{remote_venv_dir}/bin/{fn}"
params = dict(
execpath=f"{remote_venv_dir}/bin/{fn}",
config_path=remote_chatmail_inipath,
)
source_path = importlib.resources.files("chatmaild").joinpath(f"{fn}.service.f")
content = source_path.read_text().format(execpath=execpath).encode()
content = source_path.read_text().format(**params).encode()
files.put(
name=f"Upload {fn}.service",
@@ -201,7 +216,7 @@ def _install_mta_sts_daemon() -> bool:
return need_restart
def _configure_postfix(domain: str, debug: bool = False) -> bool:
def _configure_postfix(config: chatmaild.config.Config, debug: bool = False) -> bool:
"""Configures Postfix SMTP server."""
need_restart = False
@@ -211,7 +226,7 @@ def _configure_postfix(domain: str, debug: bool = False) -> bool:
user="root",
group="root",
mode="644",
config={"domain_name": domain},
config=config,
)
need_restart |= main_config.changed
@@ -222,6 +237,7 @@ def _configure_postfix(domain: str, debug: bool = False) -> bool:
group="root",
mode="644",
debug=debug,
config=config,
)
need_restart |= master_config.changed
@@ -331,19 +347,16 @@ def _configure_nginx(domain: str, debug: bool = False) -> bool:
return need_restart
def get_ini_settings(mail_domain, inipath):
parser = configparser.ConfigParser()
parser.read(inipath)
settings = {key: value.strip() for (key, value) in parser["config"].items()}
if mail_domain != "testrun.org" and not mail_domain.endswith(".testrun.org"):
for value in settings.values():
value = value.lower()
if "merlinux" in value or "schmieder" in value or "@testrun.org" in value:
def check_config(config):
mailname = config.mailname
if mailname != "testrun.org" and not mailname.endswith(".testrun.org"):
blocked_words = "merlinux schmieder testrun.org".split()
for value in config.__dict__.values():
if any(x in value for x in blocked_words):
raise ValueError(
f"please set your own privacy contacts/addresses in {inipath}"
f"please set your own privacy contacts/addresses in {config._inipath}"
)
settings["mail_domain"] = mail_domain
return settings
return config
def deploy_chatmail(mail_domain: str, mail_server: str, dkim_selector: str) -> None:
@@ -400,7 +413,8 @@ def deploy_chatmail(mail_domain: str, mail_server: str, dkim_selector: str) -> N
pkg_root = importlib.resources.files(__package__)
chatmail_ini = pkg_root.joinpath("../../../chatmail.ini").resolve()
config = get_ini_settings(mail_domain, chatmail_ini)
config = read_config(chatmail_ini, mailname=mail_domain)
check_config(config)
www_path = pkg_root.joinpath("../../../www").resolve()
build_dir = www_path.joinpath("build")
@@ -408,10 +422,10 @@ def deploy_chatmail(mail_domain: str, mail_server: str, dkim_selector: str) -> N
build_webpages(src_dir, build_dir, config)
files.rsync(f"{build_dir}/", "/var/www/html", flags=["-avz"])
_install_remote_venv_with_chatmaild()
_install_remote_venv_with_chatmaild(config)
debug = False
dovecot_need_restart = _configure_dovecot(mail_server, debug=debug)
postfix_need_restart = _configure_postfix(mail_domain, debug=debug)
postfix_need_restart = _configure_postfix(config, debug=debug)
opendkim_need_restart = _configure_opendkim(mail_domain, dkim_selector)
mta_sts_need_restart = _install_mta_sts_daemon()
nginx_need_restart = _configure_nginx(mail_domain)

View File

@@ -1,4 +1,4 @@
myorigin = {{ config.domain_name }}
myorigin = {{ config.mailname }}
smtpd_banner = $myhostname ESMTP $mail_name (Debian/GNU)
biff = no
@@ -16,8 +16,8 @@ readme_directory = no
compatibility_level = 2
# TLS parameters
smtpd_tls_cert_file=/var/lib/acme/live/{{ config.domain_name }}/fullchain
smtpd_tls_key_file=/var/lib/acme/live/{{ config.domain_name }}/privkey
smtpd_tls_cert_file=/var/lib/acme/live/{{ config.mailname }}/fullchain
smtpd_tls_key_file=/var/lib/acme/live/{{ config.mailname }}/privkey
smtpd_tls_security_level=may
smtp_tls_CApath=/etc/ssl/certs
@@ -26,7 +26,7 @@ smtp_tls_session_cache_database = btree:${data_directory}/smtp_scache
smtp_tls_policy_maps = socketmap:inet:127.0.0.1:8461:postfix
smtpd_relay_restrictions = permit_mynetworks permit_sasl_authenticated defer_unauth_destination
myhostname = {{ config.domain_name }}
myhostname = {{ config.mailname }}
alias_maps = hash:/etc/aliases
alias_database = hash:/etc/aliases
@@ -45,7 +45,7 @@ inet_interfaces = all
inet_protocols = all
virtual_transport = lmtp:unix:private/dovecot-lmtp
virtual_mailbox_domains = {{ config.domain_name }}
virtual_mailbox_domains = {{ config.mailname }}
smtpd_milters = unix:opendkim/opendkim.sock
non_smtpd_milters = $smtpd_milters

View File

@@ -33,7 +33,7 @@ submission inet n - y - - smtpd
-o smtpd_relay_restrictions=permit_sasl_authenticated,reject
-o milter_macro_daemon_name=ORIGINATING
-o smtpd_client_connection_count_limit=1000
-o smtpd_proxy_filter=127.0.0.1:10080
-o smtpd_proxy_filter=127.0.0.1:{{ config.filtermail_smtp_port }}
smtps inet n - y - - smtpd
-o syslog_name=postfix/smtps
-o smtpd_tls_wrappermode=yes
@@ -49,7 +49,7 @@ smtps inet n - y - - smtpd
-o smtpd_relay_restrictions=permit_sasl_authenticated,reject
-o smtpd_client_connection_count_limit=1000
-o milter_macro_daemon_name=ORIGINATING
-o smtpd_proxy_filter=127.0.0.1:10080
-o smtpd_proxy_filter=127.0.0.1:{{ config.filtermail_smtp_port }}
#628 inet n - y - - qmqpd
pickup unix n - y 60 1 pickup
cleanup unix n - y - 0 cleanup
@@ -78,5 +78,5 @@ scache unix - - y - 1 scache
postlog unix-dgram n - n - 1 postlogd
filter unix - n n - - lmtp
# Local SMTP server for reinjecting filered mail.
localhost:10025 inet n - n - 10 smtpd
localhost:{{ config.postfix_reinject_port }} inet n - n - 10 smtpd
-o syslog_name=postfix/reinject

View File

@@ -7,7 +7,7 @@ import traceback
import markdown
from jinja2 import Template
from .genqr import gen_qr_png_data
from deploy_chatmail import get_ini_settings
from chatmaild.config import read_config
def snapshot_dir_stats(somedir):
@@ -37,7 +37,7 @@ def build_webpages(src_dir, build_dir, config):
def _build_webpages(src_dir, build_dir, config):
mail_domain = config["mail_domain"]
mail_domain = config.mailname
assert src_dir.exists(), src_dir
if not build_dir.exists():
build_dir.mkdir()
@@ -70,7 +70,7 @@ def main():
path = importlib.resources.files(__package__)
reporoot = path.joinpath("../../../").resolve()
inipath = reporoot.joinpath("chatmail.ini")
config = get_ini_settings(chatmail_domain, inipath)
config = read_config(inipath, mailname=chatmail_domain)
config["webdev"] = True
www_path = reporoot.joinpath("www")
src_path = www_path.joinpath("src")

View File

@@ -0,0 +1,88 @@
from chatmaild.config import read_config
import chatmaild.config
def test_read_config_without_mailname(tmp_path, create_ini, monkeypatch):
mailname_path = tmp_path.joinpath("mailname")
mailname_path.write_text("something.example.org")
monkeypatch.setattr(chatmaild.config, "system_mailname_path", mailname_path)
inipath = create_ini(
"""
[params]
max_user_send_per_minute = 40
filtermail_smtp_port = 9875
postfix_reinject_port = 9999
passthrough_recipients =
"""
)
config = read_config(inipath)
assert config.mailname == "something.example.org"
def test_read_config_without_privacy_policy(tmp_path, create_ini):
inipath = create_ini(
"""
[params]
max_user_send_per_minute = 40
filtermail_smtp_port = 9875
postfix_reinject_port = 9999
passthrough_recipients =
[privacy:testrun]
domain = *.example.org
"""
)
config = read_config(inipath, "something.example.org")
assert config.mailname == "something.example.org"
assert config.max_user_send_per_minute == 40
assert config.filtermail_smtp_port == 9875
assert config.postfix_reinject_port == 9999
assert config.passthrough_recipients == []
assert not config.privacy_postal
assert not config.privacy_mail
assert not config.privacy_pdo
assert not config.privacy_supervisor
def test_read_config(create_ini):
inipath = create_ini(
"""
[params]
max_user_send_per_minute = 40
filtermail_smtp_port = 10080
postfix_reinject_port = 10025
passthrough_recipients = x@example.org y@example.org
[privacy:testrun]
domain = *.testrun.org
privacy_postal =
Postal Ltd
privacy_mail = privacy@merlinux.eu
privacy_pdo =
Postal PDO
You can contact him at *delta-privacy@merlinux.eu* (Keyword: DPO)
privacy_supervisor =
line1
line2 with space
"""
)
config = read_config(inipath, "something.testrun.org")
assert config.mailname == "something.testrun.org"
assert config.filtermail_smtp_port == 10080
assert config.postfix_reinject_port == 10025
assert config.passthrough_recipients == ["x@example.org", "y@example.org"]
assert config.privacy_postal == "Postal Ltd"
assert config.privacy_mail == "privacy@merlinux.eu"
lines = config.privacy_pdo.split("\n")
assert lines[0] == "Postal PDO"
assert lines[1].startswith("You can ")
lines = config.privacy_supervisor.split("\n")
assert lines[0] == "line1"
assert lines[1] == "line2 with space"

View File

@@ -1,5 +1,4 @@
import json
import sys
import pytest
import threading
import queue
@@ -7,7 +6,7 @@ import traceback
import chatmaild.doveauth
from chatmaild.doveauth import get_user_data, lookup_passdb, handle_dovecot_request
from chatmaild.database import Database, DBError
from chatmaild.database import DBError
def test_basic(db):

View File

@@ -1,4 +1,12 @@
from chatmaild.filtermail import check_encrypted, check_DATA, SendRateLimiter, check_mdn, is_passthrough_recipient
from chatmaild.filtermail import (
check_encrypted,
BeforeQueueHandler,
SendRateLimiter,
check_mdn,
)
from chatmaild.config import read_config
import pytest
@@ -8,18 +16,25 @@ def maildomain():
return "chatmail.example.org"
def test_reject_forged_from(maildata, gencreds):
@pytest.fixture
def handler(create_ini, maildomain):
config = read_config(create_ini(), maildomain)
return BeforeQueueHandler(config)
def test_reject_forged_from(maildata, gencreds, handler):
class env:
mail_from = gencreds()[0]
rcpt_tos = [gencreds()[0]]
# test that the filter lets good mail through
env.content = maildata("plain.eml", from_addr=env.mail_from).as_bytes()
assert not check_DATA(envelope=env)
assert not handler.check_DATA(envelope=env)
# test that the filter rejects forged mail
env.content = maildata("plain.eml", from_addr="forged@c3.testrun.org").as_bytes()
error = check_DATA(envelope=env)
error = handler.check_DATA(envelope=env)
assert "500" in error
@@ -41,7 +56,7 @@ def test_filtermail_encryption_detection(maildata):
assert not check_encrypted(msg)
def test_filtermail_is_mdn(maildata, gencreds):
def test_filtermail_is_mdn(maildata, gencreds, handler):
from_addr = gencreds()[0]
to_addr = gencreds()[0] + ".other"
msg = maildata("mdn.eml", from_addr, to_addr)
@@ -53,7 +68,8 @@ def test_filtermail_is_mdn(maildata, gencreds):
assert check_mdn(msg, env)
print(msg.as_string())
assert not check_DATA(env)
assert not handler.check_DATA(env)
def test_filtermail_to_multiple_recipients_no_mdn(maildata, gencreds):
@@ -73,21 +89,21 @@ def test_filtermail_to_multiple_recipients_no_mdn(maildata, gencreds):
def test_send_rate_limiter():
limiter = SendRateLimiter()
for i in range(100):
if limiter.is_sending_allowed("some@example.org"):
if i <= SendRateLimiter.MAX_USER_SEND_PER_MINUTE:
if limiter.is_sending_allowed("some@example.org", 10):
if i <= 10:
continue
pytest.fail("limiter didn't work")
else:
assert i == SendRateLimiter.MAX_USER_SEND_PER_MINUTE + 1
assert i == 11
break
def test_excempt_privacy(maildata, gencreds):
def test_excempt_privacy(maildata, gencreds, handler):
from_addr = gencreds()[0]
to_addr = "privacy@testrun.org"
false_to = "privacy@tstrn.org"
false_to2 = "prvcy@testrun.org"
assert is_passthrough_recipient(to_addr)
assert to_addr in handler.config.passthrough_recipients
msg = maildata("plain.eml", from_addr, to_addr)
@@ -97,11 +113,11 @@ def test_excempt_privacy(maildata, gencreds):
content = msg.as_bytes()
# assert that None/no error is returned
assert not check_DATA(envelope=env)
assert not handler.check_DATA(envelope=env)
class env2:
mail_from = from_addr
rcpt_tos = [to_addr, false_to, false_to2]
content = msg.as_bytes()
assert "500" in check_DATA(envelope=env2)
assert "500" in handler.check_DATA(envelope=env2)

View File

@@ -3,8 +3,10 @@ import io
import time
import random
import subprocess
import textwrap
import imaplib
import smtplib
import importlib.resources
import itertools
from email.parser import BytesParser
from email import policy
@@ -37,6 +39,14 @@ def pytest_runtest_setup(item):
pytest.skip("skipping slow test, use --slow to run")
@pytest.fixture
def inipath():
dpath = importlib.resources.files("chatmaild")
inipath = dpath.joinpath("../../../chatmail.ini").resolve()
assert inipath.exists()
return inipath
@pytest.fixture
def maildomain():
domain = os.environ.get("CHATMAIL_DOMAIN")
@@ -402,3 +412,16 @@ class CMUser:
imap.login(self.addr, self.password)
self._imap = imap
return self._imap
@pytest.fixture
def create_ini(tmp_path, inipath):
def create_ini_func(source=None):
if source is None:
source = inipath.read_text()
p = tmp_path.joinpath("chatmail.ini")
assert not p.exists(), p
p.write_text(textwrap.dedent(source))
return p
return create_ini_func

View File

@@ -54,7 +54,7 @@ def test_exceed_rate_limit(cmsetup, gencreds, maildata):
try:
user1.smtp.sendmail(user1.addr, [user2.addr], mail)
except smtplib.SMTPException as e:
if i < 80:
if i < 60:
pytest.fail(f"rate limit was exceeded too early with msg {i}")
outcome = e.recipients[user2.addr]
assert outcome[0] == 450

View File

@@ -2,46 +2,47 @@ import textwrap
import importlib.resources
from deploy_chatmail.www import build_webpages
from deploy_chatmail import get_ini_settings
from chatmaild.config import read_config
def create_ini(inipath):
inipath.write_text(
def make_config(create_ini, domain="example.org"):
inipath = create_ini(
textwrap.dedent(
"""\
[config]
f"""\
[params]
max_user_send_per_minute = 60
filtermail_smtp_port = 10080
postfix_reinject_port = 10025
passthrough_recipients =
[privacy:{domain}]
domain = example.org
privacy_postal =
address-line1
address-line2
privacy_mail = privacy@example.org
privacy_mail = privacy@{domain}
privacy_pdo =
address-line3
"""
)
)
return read_config(inipath, domain)
def test_build_webpages(tmp_path):
def test_build_webpages(tmp_path, create_ini):
pkgroot = importlib.resources.files("deploy_chatmail")
src_dir = pkgroot.joinpath("../../../www/src").resolve()
assert src_dir.exists(), src_dir
inipath = tmp_path.joinpath("chatmail.ini")
create_ini(inipath)
config = get_ini_settings("example.org", inipath)
config = make_config(create_ini, "example.org")
build_dir = tmp_path.joinpath("build")
build_webpages(src_dir, build_dir, config)
def test_get_settings(tmp_path):
inipath = tmp_path.joinpath("chatmail.ini")
create_ini(inipath)
d = get_ini_settings("x.testrun.org", inipath)
assert d["privacy_postal"] == "address-line1\naddress-line2"
assert d["privacy_mail"] == "privacy@example.org"
assert d["privacy_pdo"] == "address-line3"
assert d["mail_domain"] == "x.testrun.org"
def test_get_settings(tmp_path, create_ini):
config = make_config(create_ini, "example.org")
assert config.privacy_postal == "address-line1\naddress-line2"
assert config.privacy_mail == "privacy@example.org"
assert config.privacy_pdo == "address-line3"
assert config.mailname == "example.org"