Compare commits

..

7 Commits

Author SHA1 Message Date
holger krekel 9531d62f57 in webdev mode make page auto-refresh every 3 seconds 2023-12-08 14:31:03 +01:00
holger krekel 931305d454 avoid bailing out on jinja2 errors, and provide file-url for instant clickability 2023-12-08 14:02:51 +01:00
holger krekel 648174c226 don't depend on deltachat python package 2023-12-08 12:15:39 +01:00
holger krekel 2b5cdd143e add a note 2023-12-07 17:41:25 +01:00
holger krekel f506a62196 fix README 2023-12-07 17:39:03 +01:00
holger krekel e8107c6854 rename script 2023-12-07 17:34:43 +01:00
holger krekel 858e079418 create a wwwdev.sh entry point for developing the web part 2023-12-07 17:34:43 +01:00
19 changed files with 172 additions and 420 deletions
+1 -1
View File
@@ -1,5 +1,5 @@
<img width="800px" src="www/src/collage-top.png"/>
<img width="800px" src="www/collage-top.png"/>
# Chatmail instances optimized for Delta Chat apps
+3 -20
View File
@@ -1,32 +1,15 @@
[config]
[params]
# how many mails a user can send out per minute
max_user_send_per_minute = 60
# list of e-mail recipients for which to accept outbound un-encrypted mails
passthrough_recipients = privacy@testrun.org xstore@testrun.org
# where the filtermail SMTP service listens
filtermail_smtp_port = 10080
# to which port to re-inject messages after they passed filtermail
postfix_reinject_port = 10025
[privacy:testrun]
# the settings in this section are only applied
# if the instantiated mail domain shell-matches the 'domain' setting
domain = *.testrun.org
privacy_postal =
Merlinux GmbH, Represented by the managing director H. Krekel,
Reichgrafen Str. 20, 79102 Freiburg, Germany
privacy_mail = delta-privacy@merlinux.eu
privacy_pdo =
Prof. Dr. Fabian Schmieder, lexICT UG (limited), Ostfeldstr. 49, 30559 Hannover.
You can contact him at *delta-privacy@merlinux.eu* (Keyword: DPO)
privacy_supervisor =
State Commissioner for Data Protection and Freedom of Information of
Baden-Württemberg in 70173 Stuttgart, Germany.
+1 -2
View File
@@ -4,10 +4,9 @@ build-backend = "setuptools.build_meta"
[project]
name = "chatmaild"
version = "0.2"
version = "0.1"
dependencies = [
"aiosmtpd",
"iniconfig",
]
[project.scripts]
-41
View File
@@ -1,41 +0,0 @@
from pathlib import Path
from fnmatch import fnmatch
import iniconfig
system_mailname_path = Path("/etc/mailname")
def read_config(inipath, mailname=None):
if mailname is None:
with open(system_mailname_path) as f:
mailname = f.read().strip()
ini = iniconfig.IniConfig(inipath)
privacy = {}
for section in ini:
if section.name.startswith("privacy:"):
domain = section["domain"]
if fnmatch(mailname, domain):
privacy = section
break
return Config(inipath, mailname, privacy, params=ini.sections["params"])
class Config:
def __init__(self, inipath, mailname, privacy, params):
self._inipath = inipath
self.mailname = mailname
self.privacy_postal = privacy.get("privacy_postal")
self.privacy_mail = privacy.get("privacy_mail")
self.privacy_pdo = privacy.get("privacy_pdo")
self.privacy_supervisor = privacy.get("privacy_supervisor")
self.max_user_send_per_minute = int(params["max_user_send_per_minute"])
self.filtermail_smtp_port = int(params["filtermail_smtp_port"])
self.postfix_reinject_port = int(params["postfix_reinject_port"])
self.passthrough_recipients = params["passthrough_recipients"].split()
def _getbytefile(self):
return open(self._inipath, "rb")
@@ -2,7 +2,7 @@
Description=Dict authentication proxy for dovecot
[Service]
ExecStart={execpath} /run/dovecot/doveauth.socket vmail /home/vmail/passdb.sqlite
ExecStart=/usr/local/bin/doveauth /run/dovecot/doveauth.socket vmail /home/vmail/passdb.sqlite
Restart=always
RestartSec=30
+42 -43
View File
@@ -11,8 +11,6 @@ from aiosmtpd.smtp import SMTP
from aiosmtpd.controller import Controller
from smtplib import SMTP as SMTPClient
from .config import read_config
def check_encrypted(message):
"""Check that the message is an OpenPGP-encrypted message."""
@@ -64,21 +62,19 @@ def check_mdn(message, envelope):
return True
async def asyncmain_beforequeue(config):
port = config.filtermail_smtp_port
Controller(BeforeQueueHandler(config), hostname="127.0.0.1", port=port).start()
class SMTPController(Controller):
def factory(self):
return SMTP(self.handler, **self.SMTP_kwargs)
class BeforeQueueHandler:
def __init__(self, config):
self.config = config
def __init__(self):
self.send_rate_limiter = SendRateLimiter()
async def handle_MAIL(self, server, session, envelope, address, mail_options):
logging.info(f"handle_MAIL from {address}")
envelope.mail_from = address
max_sent = self.config.max_user_send_per_minute
if not self.send_rate_limiter.is_sending_allowed(address, max_sent):
if not self.send_rate_limiter.is_sending_allowed(address):
return f"450 4.7.1: Too much mail from {address}"
parts = envelope.mail_from.split("@")
@@ -89,58 +85,62 @@ class BeforeQueueHandler:
async def handle_DATA(self, server, session, envelope):
logging.info("handle_DATA before-queue")
error = self.check_DATA(envelope)
error = check_DATA(envelope)
if error:
return error
logging.info("re-injecting the mail that passed checks")
client = SMTPClient("localhost", self.config.postfix_reinject_port)
client = SMTPClient("localhost", "10025")
client.sendmail(envelope.mail_from, envelope.rcpt_tos, envelope.content)
return "250 OK"
def check_DATA(self, envelope):
"""the central filtering function for e-mails."""
logging.info(f"Processing DATA message from {envelope.mail_from}")
message = BytesParser(policy=policy.default).parsebytes(envelope.content)
mail_encrypted = check_encrypted(message)
async def asyncmain_beforequeue(port):
Controller(BeforeQueueHandler(), hostname="127.0.0.1", port=port).start()
_, from_addr = parseaddr(message.get("from").strip())
logging.info(f"mime-from: {from_addr} envelope-from: {envelope.mail_from!r}")
if envelope.mail_from.lower() != from_addr.lower():
return f"500 Invalid FROM <{from_addr!r}> for <{envelope.mail_from!r}>"
if not mail_encrypted and check_mdn(message, envelope):
return
def check_DATA(envelope):
"""the central filtering function for e-mails."""
logging.info(f"Processing DATA message from {envelope.mail_from}")
passthrough_recipients = self.config.passthrough_recipients
envelope_from_domain = from_addr.split("@").pop()
for recipient in envelope.rcpt_tos:
if envelope.mail_from == recipient:
# Always allow sending emails to self.
continue
if recipient in passthrough_recipients:
continue
res = recipient.split("@")
if len(res) != 2:
return f"500 Invalid address <{recipient}>"
_recipient_addr, recipient_domain = res
message = BytesParser(policy=policy.default).parsebytes(envelope.content)
mail_encrypted = check_encrypted(message)
is_outgoing = recipient_domain != envelope_from_domain
if is_outgoing and not mail_encrypted:
is_securejoin = message.get("secure-join") in ["vc-request", "vg-request"]
if not is_securejoin:
return f"500 Invalid unencrypted mail to <{recipient}>"
_, from_addr = parseaddr(message.get("from").strip())
logging.info(f"mime-from: {from_addr} envelope-from: {envelope.mail_from!r}")
if envelope.mail_from.lower() != from_addr.lower():
return f"500 Invalid FROM <{from_addr!r}> for <{envelope.mail_from!r}>"
if not mail_encrypted and check_mdn(message, envelope):
return
envelope_from_domain = from_addr.split("@").pop()
for recipient in envelope.rcpt_tos:
if envelope.mail_from == recipient:
# Always allow sending emails to self.
continue
res = recipient.split("@")
if len(res) != 2:
return f"500 Invalid address <{recipient}>"
_recipient_addr, recipient_domain = res
is_outgoing = recipient_domain != envelope_from_domain
if is_outgoing and not mail_encrypted:
is_securejoin = message.get("secure-join") in ["vc-request", "vg-request"]
if not is_securejoin:
return f"500 Invalid unencrypted mail to <{recipient}>"
class SendRateLimiter:
MAX_USER_SEND_PER_MINUTE = 80
def __init__(self):
self.addr2timestamps = {}
def is_sending_allowed(self, mail_from, max_send_per_minute):
def is_sending_allowed(self, mail_from):
last = self.addr2timestamps.setdefault(mail_from, [])
now = time.time()
last[:] = [ts for ts in last if ts >= (now - 60)]
if len(last) <= max_send_per_minute:
if len(last) <= self.MAX_USER_SEND_PER_MINUTE:
last.append(now)
return True
return False
@@ -149,10 +149,9 @@ class SendRateLimiter:
def main():
args = sys.argv[1:]
assert len(args) == 1
config = read_config(args[0])
logging.basicConfig(level=logging.WARN)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncmain_beforequeue(config)
task = asyncmain_beforequeue(port=int(args[0]))
loop.create_task(task)
loop.run_forever()
@@ -2,7 +2,7 @@
Description=Chatmail Postfix BeforeQeue filter
[Service]
ExecStart={execpath} {config_path}
ExecStart=/usr/local/bin/filtermail 10080
Restart=always
RestartSec=30
+71 -112
View File
@@ -1,117 +1,75 @@
"""
Chat Mail pyinfra deploy.
"""
import sys
import importlib.resources
import subprocess
import shutil
import io
import configparser
from pathlib import Path
from pyinfra import host
from pyinfra.operations import apt, files, server, systemd, pip
from pyinfra.operations import apt, files, server, systemd
from pyinfra.facts.files import File
from pyinfra.facts.systemd import SystemdEnabled
from .acmetool import deploy_acmetool
import chatmaild.filtermail
from chatmaild.config import read_config
def _build_chatmaild(dist_dir) -> None:
dist_dir = Path(dist_dir).resolve()
if dist_dir.exists():
shutil.rmtree(dist_dir)
dist_dir.mkdir()
subprocess.check_output(
[sys.executable, "-m", "build", "-n"]
+ ["--sdist", "chatmaild", "--outdir", str(dist_dir)]
def _install_chatmaild() -> None:
chatmaild_filename = "chatmaild-0.1.tar.gz"
chatmaild_path = importlib.resources.files(__package__).joinpath(
f"../../../dist/{chatmaild_filename}"
)
entries = list(dist_dir.iterdir())
assert len(entries) == 1
return entries[0]
def remove_legacy_artifacts():
# disable legacy doveauth-dictproxy.service
if host.get_fact(SystemdEnabled).get("doveauth-dictproxy.service"):
systemd.service(
name="Disable legacy doveauth-dictproxy.service",
service="doveauth-dictproxy.service",
running=False,
enabled=False,
)
def _install_remote_venv_with_chatmaild(config) -> None:
remove_legacy_artifacts()
dist_file = _build_chatmaild(dist_dir=Path("chatmaild/dist"))
remote_base_dir = "/usr/local/lib/chatmaild"
remote_dist_file = f"{remote_base_dir}/dist/{dist_file.name}"
remote_venv_dir = f"{remote_base_dir}/venv"
remote_chatmail_inipath = f"{remote_base_dir}/chatmail.ini"
root_owned = dict(user="root", group="root", mode="644")
apt.packages(
name="apt install python3-virtualenv",
packages=["python3-virtualenv"],
)
files.put(
name="Upload chatmaild source package",
src=dist_file.open("rb"),
dest=remote_dist_file,
create_remote_dir=True,
**root_owned,
)
files.put(
name=f"Upload {remote_chatmail_inipath}",
src=config._getbytefile(),
dest=remote_chatmail_inipath,
**root_owned,
)
pip.virtualenv(
name=f"chatmaild virtualenv {remote_venv_dir}",
path=remote_venv_dir,
always_copy=True,
)
server.shell(
name=f"forced pip-install {dist_file.name}",
commands=[
f"{remote_venv_dir}/bin/pip install --force-reinstall {remote_dist_file}"
],
)
# install systemd units
for fn in (
"doveauth",
"filtermail",
):
params = dict(
execpath=f"{remote_venv_dir}/bin/{fn}",
config_path=remote_chatmail_inipath,
)
source_path = importlib.resources.files("chatmaild").joinpath(f"{fn}.service.f")
content = source_path.read_text().format(**params).encode()
remote_path = f"/tmp/{chatmaild_filename}"
if Path(str(chatmaild_path)).exists():
files.put(
name=f"Upload {fn}.service",
src=io.BytesIO(content),
dest=f"/etc/systemd/system/{fn}.service",
**root_owned,
name="Upload chatmaild source package",
src=chatmaild_path.open("rb"),
dest=remote_path,
)
systemd.service(
name=f"Setup {fn} service",
service=f"{fn}.service",
running=True,
enabled=True,
restarted=True,
daemon_reload=True,
apt.packages(
name="apt install python3-aiosmtpd python3-pip python3-venv",
packages=["python3-aiosmtpd", "python3-pip", "python3-venv"],
)
# --no-deps because aiosmtplib is installed with `apt`.
server.shell(
name="install chatmaild with pip",
commands=[f"pip install --break-system-packages {remote_path}"],
)
# disable legacy doveauth-dictproxy.service
if host.get_fact(SystemdEnabled).get("doveauth-dictproxy.service"):
systemd.service(
name="Disable legacy doveauth-dictproxy.service",
service="doveauth-dictproxy.service",
running=False,
enabled=False,
)
# install systemd units
for fn in (
"doveauth",
"filtermail",
):
files.put(
name=f"Upload {fn}.service",
src=importlib.resources.files("chatmaild")
.joinpath(f"{fn}.service")
.open("rb"),
dest=f"/etc/systemd/system/{fn}.service",
user="root",
group="root",
mode="644",
)
systemd.service(
name=f"Setup {fn} service",
service=f"{fn}.service",
running=True,
enabled=True,
restarted=True,
daemon_reload=True,
)
def _configure_opendkim(domain: str, dkim_selector: str) -> bool:
"""Configures OpenDKIM"""
@@ -216,7 +174,7 @@ def _install_mta_sts_daemon() -> bool:
return need_restart
def _configure_postfix(config: chatmaild.config.Config, debug: bool = False) -> bool:
def _configure_postfix(domain: str, debug: bool = False) -> bool:
"""Configures Postfix SMTP server."""
need_restart = False
@@ -226,7 +184,7 @@ def _configure_postfix(config: chatmaild.config.Config, debug: bool = False) ->
user="root",
group="root",
mode="644",
config=config,
config={"domain_name": domain},
)
need_restart |= main_config.changed
@@ -237,7 +195,6 @@ def _configure_postfix(config: chatmaild.config.Config, debug: bool = False) ->
group="root",
mode="644",
debug=debug,
config=config,
)
need_restart |= master_config.changed
@@ -347,16 +304,19 @@ def _configure_nginx(domain: str, debug: bool = False) -> bool:
return need_restart
def check_config(config):
mailname = config.mailname
if mailname != "testrun.org" and not mailname.endswith(".testrun.org"):
blocked_words = "merlinux schmieder testrun.org".split()
for value in config.__dict__.values():
if any(x in value for x in blocked_words):
def get_ini_settings(mail_domain, inipath):
parser = configparser.ConfigParser()
parser.read(inipath)
settings = {key: value.strip() for (key, value) in parser["config"].items()}
if mail_domain != "testrun.org" and not mail_domain.endswith(".testrun.org"):
for value in settings.values():
value = value.lower()
if "merlinux" in value or "schmieder" in value or "@testrun.org" in value:
raise ValueError(
f"please set your own privacy contacts/addresses in {config._inipath}"
f"please set your own privacy contacts/addresses in {inipath}"
)
return config
settings["mail_domain"] = mail_domain
return settings
def deploy_chatmail(mail_domain: str, mail_server: str, dkim_selector: str) -> None:
@@ -413,8 +373,7 @@ def deploy_chatmail(mail_domain: str, mail_server: str, dkim_selector: str) -> N
pkg_root = importlib.resources.files(__package__)
chatmail_ini = pkg_root.joinpath("../../../chatmail.ini").resolve()
config = read_config(chatmail_ini, mailname=mail_domain)
check_config(config)
config = get_ini_settings(mail_domain, chatmail_ini)
www_path = pkg_root.joinpath("../../../www").resolve()
build_dir = www_path.joinpath("build")
@@ -422,10 +381,10 @@ def deploy_chatmail(mail_domain: str, mail_server: str, dkim_selector: str) -> N
build_webpages(src_dir, build_dir, config)
files.rsync(f"{build_dir}/", "/var/www/html", flags=["-avz"])
_install_remote_venv_with_chatmaild(config)
_install_chatmaild()
debug = False
dovecot_need_restart = _configure_dovecot(mail_server, debug=debug)
postfix_need_restart = _configure_postfix(config, debug=debug)
postfix_need_restart = _configure_postfix(mail_domain, debug=debug)
opendkim_need_restart = _configure_opendkim(mail_domain, dkim_selector)
mta_sts_need_restart = _install_mta_sts_daemon()
nginx_need_restart = _configure_nginx(mail_domain)
+1 -1
View File
@@ -52,7 +52,7 @@ def gen_qr(maildomain, url):
height = size + text_height
image = Image.new("RGBA", (width, height), "white")
qr_final_size = width - (qr_padding * 2)
qr_final_size = width
if num_lines:
draw = ImageDraw.Draw(image)
@@ -1,4 +1,4 @@
myorigin = {{ config.mailname }}
myorigin = {{ config.domain_name }}
smtpd_banner = $myhostname ESMTP $mail_name (Debian/GNU)
biff = no
@@ -16,8 +16,8 @@ readme_directory = no
compatibility_level = 2
# TLS parameters
smtpd_tls_cert_file=/var/lib/acme/live/{{ config.mailname }}/fullchain
smtpd_tls_key_file=/var/lib/acme/live/{{ config.mailname }}/privkey
smtpd_tls_cert_file=/var/lib/acme/live/{{ config.domain_name }}/fullchain
smtpd_tls_key_file=/var/lib/acme/live/{{ config.domain_name }}/privkey
smtpd_tls_security_level=may
smtp_tls_CApath=/etc/ssl/certs
@@ -26,7 +26,7 @@ smtp_tls_session_cache_database = btree:${data_directory}/smtp_scache
smtp_tls_policy_maps = socketmap:inet:127.0.0.1:8461:postfix
smtpd_relay_restrictions = permit_mynetworks permit_sasl_authenticated defer_unauth_destination
myhostname = {{ config.mailname }}
myhostname = {{ config.domain_name }}
alias_maps = hash:/etc/aliases
alias_database = hash:/etc/aliases
@@ -45,7 +45,7 @@ inet_interfaces = all
inet_protocols = all
virtual_transport = lmtp:unix:private/dovecot-lmtp
virtual_mailbox_domains = {{ config.mailname }}
virtual_mailbox_domains = {{ config.domain_name }}
smtpd_milters = unix:opendkim/opendkim.sock
non_smtpd_milters = $smtpd_milters
@@ -33,7 +33,7 @@ submission inet n - y - - smtpd
-o smtpd_relay_restrictions=permit_sasl_authenticated,reject
-o milter_macro_daemon_name=ORIGINATING
-o smtpd_client_connection_count_limit=1000
-o smtpd_proxy_filter=127.0.0.1:{{ config.filtermail_smtp_port }}
-o smtpd_proxy_filter=127.0.0.1:10080
smtps inet n - y - - smtpd
-o syslog_name=postfix/smtps
-o smtpd_tls_wrappermode=yes
@@ -49,7 +49,7 @@ smtps inet n - y - - smtpd
-o smtpd_relay_restrictions=permit_sasl_authenticated,reject
-o smtpd_client_connection_count_limit=1000
-o milter_macro_daemon_name=ORIGINATING
-o smtpd_proxy_filter=127.0.0.1:{{ config.filtermail_smtp_port }}
-o smtpd_proxy_filter=127.0.0.1:10080
#628 inet n - y - - qmqpd
pickup unix n - y 60 1 pickup
cleanup unix n - y - 0 cleanup
@@ -78,5 +78,5 @@ scache unix - - y - 1 scache
postlog unix-dgram n - n - 1 postlogd
filter unix - n n - - lmtp
# Local SMTP server for reinjecting filered mail.
localhost:{{ config.postfix_reinject_port }} inet n - n - 10 smtpd
localhost:10025 inet n - n - 10 smtpd
-o syslog_name=postfix/reinject
+3 -3
View File
@@ -7,7 +7,7 @@ import traceback
import markdown
from jinja2 import Template
from .genqr import gen_qr_png_data
from chatmaild.config import read_config
from deploy_chatmail import get_ini_settings
def snapshot_dir_stats(somedir):
@@ -37,7 +37,7 @@ def build_webpages(src_dir, build_dir, config):
def _build_webpages(src_dir, build_dir, config):
mail_domain = config.mailname
mail_domain = config["mail_domain"]
assert src_dir.exists(), src_dir
if not build_dir.exists():
build_dir.mkdir()
@@ -70,7 +70,7 @@ def main():
path = importlib.resources.files(__package__)
reporoot = path.joinpath("../../../").resolve()
inipath = reporoot.joinpath("chatmail.ini")
config = read_config(inipath, mailname=chatmail_domain)
config = get_ini_settings(chatmail_domain, inipath)
config["webdev"] = True
www_path = reporoot.joinpath("www")
src_path = www_path.joinpath("src")
+7
View File
@@ -4,5 +4,12 @@ echo -----------------------------------------
echo deploying to $CHATMAIL_DOMAIN
echo -----------------------------------------
echo WARNING: in five seconds deploy to $CHATMAIL_DOMAIN starts
sleep 5
venv/bin/python3 -m build -n --sdist chatmaild --outdir dist
venv/bin/pyinfra --ssh-user root "$CHATMAIL_DOMAIN" \
deploy-chatmail/src/deploy_chatmail/deploy.py
rm -r dist/
-88
View File
@@ -1,88 +0,0 @@
from chatmaild.config import read_config
import chatmaild.config
def test_read_config_without_mailname(tmp_path, create_ini, monkeypatch):
mailname_path = tmp_path.joinpath("mailname")
mailname_path.write_text("something.example.org")
monkeypatch.setattr(chatmaild.config, "system_mailname_path", mailname_path)
inipath = create_ini(
"""
[params]
max_user_send_per_minute = 40
filtermail_smtp_port = 9875
postfix_reinject_port = 9999
passthrough_recipients =
"""
)
config = read_config(inipath)
assert config.mailname == "something.example.org"
def test_read_config_without_privacy_policy(tmp_path, create_ini):
inipath = create_ini(
"""
[params]
max_user_send_per_minute = 40
filtermail_smtp_port = 9875
postfix_reinject_port = 9999
passthrough_recipients =
[privacy:testrun]
domain = *.example.org
"""
)
config = read_config(inipath, "something.example.org")
assert config.mailname == "something.example.org"
assert config.max_user_send_per_minute == 40
assert config.filtermail_smtp_port == 9875
assert config.postfix_reinject_port == 9999
assert config.passthrough_recipients == []
assert not config.privacy_postal
assert not config.privacy_mail
assert not config.privacy_pdo
assert not config.privacy_supervisor
def test_read_config(create_ini):
inipath = create_ini(
"""
[params]
max_user_send_per_minute = 40
filtermail_smtp_port = 10080
postfix_reinject_port = 10025
passthrough_recipients = x@example.org y@example.org
[privacy:testrun]
domain = *.testrun.org
privacy_postal =
Postal Ltd
privacy_mail = privacy@merlinux.eu
privacy_pdo =
Postal PDO
You can contact him at *delta-privacy@merlinux.eu* (Keyword: DPO)
privacy_supervisor =
line1
line2 with space
"""
)
config = read_config(inipath, "something.testrun.org")
assert config.mailname == "something.testrun.org"
assert config.filtermail_smtp_port == 10080
assert config.postfix_reinject_port == 10025
assert config.passthrough_recipients == ["x@example.org", "y@example.org"]
assert config.privacy_postal == "Postal Ltd"
assert config.privacy_mail == "privacy@merlinux.eu"
lines = config.privacy_pdo.split("\n")
assert lines[0] == "Postal PDO"
assert lines[1].startswith("You can ")
lines = config.privacy_supervisor.split("\n")
assert lines[0] == "line1"
assert lines[1] == "line2 with space"
+2 -1
View File
@@ -1,4 +1,5 @@
import json
import sys
import pytest
import threading
import queue
@@ -6,7 +7,7 @@ import traceback
import chatmaild.doveauth
from chatmaild.doveauth import get_user_data, lookup_passdb, handle_dovecot_request
from chatmaild.database import DBError
from chatmaild.database import Database, DBError
def test_basic(db):
+9 -50
View File
@@ -1,12 +1,4 @@
from chatmaild.filtermail import (
check_encrypted,
BeforeQueueHandler,
SendRateLimiter,
check_mdn,
)
from chatmaild.config import read_config
from chatmaild.filtermail import check_encrypted, check_DATA, SendRateLimiter, check_mdn
import pytest
@@ -16,25 +8,18 @@ def maildomain():
return "chatmail.example.org"
@pytest.fixture
def handler(create_ini, maildomain):
config = read_config(create_ini(), maildomain)
return BeforeQueueHandler(config)
def test_reject_forged_from(maildata, gencreds, handler):
def test_reject_forged_from(maildata, gencreds):
class env:
mail_from = gencreds()[0]
rcpt_tos = [gencreds()[0]]
# test that the filter lets good mail through
env.content = maildata("plain.eml", from_addr=env.mail_from).as_bytes()
assert not handler.check_DATA(envelope=env)
assert not check_DATA(envelope=env)
# test that the filter rejects forged mail
env.content = maildata("plain.eml", from_addr="forged@c3.testrun.org").as_bytes()
error = handler.check_DATA(envelope=env)
error = check_DATA(envelope=env)
assert "500" in error
@@ -56,7 +41,7 @@ def test_filtermail_encryption_detection(maildata):
assert not check_encrypted(msg)
def test_filtermail_is_mdn(maildata, gencreds, handler):
def test_filtermail_is_mdn(maildata, gencreds):
from_addr = gencreds()[0]
to_addr = gencreds()[0] + ".other"
msg = maildata("mdn.eml", from_addr, to_addr)
@@ -68,8 +53,7 @@ def test_filtermail_is_mdn(maildata, gencreds, handler):
assert check_mdn(msg, env)
print(msg.as_string())
assert not handler.check_DATA(env)
assert not check_DATA(env)
def test_filtermail_to_multiple_recipients_no_mdn(maildata, gencreds):
@@ -89,35 +73,10 @@ def test_filtermail_to_multiple_recipients_no_mdn(maildata, gencreds):
def test_send_rate_limiter():
limiter = SendRateLimiter()
for i in range(100):
if limiter.is_sending_allowed("some@example.org", 10):
if i <= 10:
if limiter.is_sending_allowed("some@example.org"):
if i <= SendRateLimiter.MAX_USER_SEND_PER_MINUTE:
continue
pytest.fail("limiter didn't work")
else:
assert i == 11
assert i == SendRateLimiter.MAX_USER_SEND_PER_MINUTE + 1
break
def test_excempt_privacy(maildata, gencreds, handler):
from_addr = gencreds()[0]
to_addr = "privacy@testrun.org"
false_to = "privacy@tstrn.org"
false_to2 = "prvcy@testrun.org"
assert to_addr in handler.config.passthrough_recipients
msg = maildata("plain.eml", from_addr, to_addr)
class env:
mail_from = from_addr
rcpt_tos = [to_addr]
content = msg.as_bytes()
# assert that None/no error is returned
assert not handler.check_DATA(envelope=env)
class env2:
mail_from = from_addr
rcpt_tos = [to_addr, false_to, false_to2]
content = msg.as_bytes()
assert "500" in handler.check_DATA(envelope=env2)
+1 -26
View File
@@ -3,10 +3,8 @@ import io
import time
import random
import subprocess
import textwrap
import imaplib
import smtplib
import importlib.resources
import itertools
from email.parser import BytesParser
from email import policy
@@ -39,14 +37,6 @@ def pytest_runtest_setup(item):
pytest.skip("skipping slow test, use --slow to run")
@pytest.fixture
def inipath():
dpath = importlib.resources.files("chatmaild")
inipath = dpath.joinpath("../../../chatmail.ini").resolve()
assert inipath.exists()
return inipath
@pytest.fixture
def maildomain():
domain = os.environ.get("CHATMAIL_DOMAIN")
@@ -293,10 +283,8 @@ def cmfactory(request, gencreds, tmpdir, maildomain):
pytest.importorskip("deltachat")
from deltachat.testplugin import ACFactory
data = request.getfixturevalue("data")
testproc = ChatmailTestProcess(request.config, maildomain, gencreds)
am = ACFactory(request=request, tmpdir=tmpdir, testprocess=testproc, data=data)
am = ACFactory(request=request, tmpdir=tmpdir, testprocess=testproc, data=None)
# nb. a bit hacky
# would probably be better if deltachat's test machinery grows native support
@@ -412,16 +400,3 @@ class CMUser:
imap.login(self.addr, self.password)
self._imap = imap
return self._imap
@pytest.fixture
def create_ini(tmp_path, inipath):
def create_ini_func(source=None):
if source is None:
source = inipath.read_text()
p = tmp_path.joinpath("chatmail.ini")
assert not p.exists(), p
p.write_text(textwrap.dedent(source))
return p
return create_ini_func
+1 -1
View File
@@ -54,7 +54,7 @@ def test_exceed_rate_limit(cmsetup, gencreds, maildata):
try:
user1.smtp.sendmail(user1.addr, [user2.addr], mail)
except smtplib.SMTPException as e:
if i < 60:
if i < 80:
pytest.fail(f"rate limit was exceeded too early with msg {i}")
outcome = e.recipients[user2.addr]
assert outcome[0] == 450
+20 -21
View File
@@ -2,47 +2,46 @@ import textwrap
import importlib.resources
from deploy_chatmail.www import build_webpages
from chatmaild.config import read_config
from deploy_chatmail import get_ini_settings
def make_config(create_ini, domain="example.org"):
inipath = create_ini(
def create_ini(inipath):
inipath.write_text(
textwrap.dedent(
f"""\
[params]
max_user_send_per_minute = 60
filtermail_smtp_port = 10080
postfix_reinject_port = 10025
passthrough_recipients =
"""\
[config]
[privacy:{domain}]
domain = example.org
privacy_postal =
address-line1
address-line2
privacy_mail = privacy@{domain}
privacy_mail = privacy@example.org
privacy_pdo =
address-line3
"""
)
)
return read_config(inipath, domain)
def test_build_webpages(tmp_path, create_ini):
def test_build_webpages(tmp_path):
pkgroot = importlib.resources.files("deploy_chatmail")
src_dir = pkgroot.joinpath("../../../www/src").resolve()
assert src_dir.exists(), src_dir
config = make_config(create_ini, "example.org")
inipath = tmp_path.joinpath("chatmail.ini")
create_ini(inipath)
config = get_ini_settings("example.org", inipath)
build_dir = tmp_path.joinpath("build")
build_webpages(src_dir, build_dir, config)
def test_get_settings(tmp_path, create_ini):
config = make_config(create_ini, "example.org")
assert config.privacy_postal == "address-line1\naddress-line2"
assert config.privacy_mail == "privacy@example.org"
assert config.privacy_pdo == "address-line3"
assert config.mailname == "example.org"
def test_get_settings(tmp_path):
inipath = tmp_path.joinpath("chatmail.ini")
create_ini(inipath)
d = get_ini_settings("x.testrun.org", inipath)
assert d["privacy_postal"] == "address-line1\naddress-line2"
assert d["privacy_mail"] == "privacy@example.org"
assert d["privacy_pdo"] == "address-line3"
assert d["mail_domain"] == "x.testrun.org"