Compare commits

..

6 Commits

Author SHA1 Message Date
holger krekel
10cb099c0e all tests pass 2023-10-19 00:07:22 +02:00
holger krekel
bbd2773506 refactor test and filtermail to prepare it for BeforeQueue handling 2023-10-18 21:43:06 +02:00
missytake
410bc50a8b test: report if rate limit from last test was still active 2023-10-18 19:02:40 +02:00
missytake
015269fa7b test: test that there is no internal limit (xfail for now) 2023-10-18 19:02:40 +02:00
missytake
b8673d8625 postfix: add simple rate limiting without allow list or leaky bucket, also for internal mail 2023-10-18 19:02:40 +02:00
missytake
31c71fa6e9 add test for postfix rate limiting 2023-10-18 19:02:40 +02:00
11 changed files with 171 additions and 137 deletions

View File

@@ -0,0 +1,10 @@
[Unit]
Description=Chatmail Postfix AfterQueue filter
[Service]
ExecStart=/usr/local/bin/filtermail afterqueue /var/spool/postfix/private/filtermail-afterqueue
Restart=always
RestartSec=30
[Install]
WantedBy=multi-user.target

View File

@@ -0,0 +1,10 @@
[Unit]
Description=Chatmail Postfix BeforeQeue filter
[Service]
ExecStart=/usr/local/bin/filtermail beforequeue 10080
Restart=always
RestartSec=30
[Install]
WantedBy=multi-user.target

View File

@@ -1,12 +1,15 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio import asyncio
import logging import logging
import time
import sys
from email.parser import BytesParser from email.parser import BytesParser
from email import policy from email import policy
from email.utils import parseaddr from email.utils import parseaddr
from aiosmtpd.lmtp import LMTP from aiosmtpd.lmtp import LMTP
from aiosmtpd.controller import UnixSocketController from aiosmtpd.smtp import SMTP
from aiosmtpd.controller import UnixSocketController, Controller
from smtplib import SMTP as SMTPClient from smtplib import SMTP as SMTPClient
@@ -32,12 +35,41 @@ def check_encrypted(message):
return True return True
class ExampleController(UnixSocketController): class BeforeQueueHandler:
def factory(self): def __init__(self):
return LMTP(self.handler, **self.SMTP_kwargs) self.send_rate_limiter = SendRateLimiter()
async def handle_MAIL(self, server, session, envelope, address, mail_options):
logging.info(f"handle_MAIL from {address}")
if self.send_rate_limiter.is_sending_allowed(address):
envelope.mail_from = address
return "250 OK"
return f"450 4.7.1: Too much mail from {address}"
async def handle_DATA(self, server, session, envelope):
logging.info("handle_DATA before-queue: re-injecting the mail")
client = SMTPClient("localhost", "10026")
client.sendmail(envelope.mail_from, envelope.rcpt_tos, envelope.content)
return "250 OK"
class ExampleHandler: class SendRateLimiter:
MAX_USER_SEND_PER_MINUTE = 80
def __init__(self):
self.addr2timestamps = {}
def is_sending_allowed(self, mail_from):
last = self.addr2timestamps.setdefault(mail_from, [])
now = time.time()
last[:] = [ts for ts in last if ts >= (now - 60)]
if len(last) <= self.MAX_USER_SEND_PER_MINUTE:
last.append(now)
return True
return False
class AfterQueueHandler:
async def handle_RCPT(self, server, session, envelope, address, rcpt_options): async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
envelope.rcpt_tos.append(address) envelope.rcpt_tos.append(address)
return "250 OK" return "250 OK"
@@ -46,8 +78,8 @@ class ExampleHandler:
valid_recipients, res = lmtp_handle_DATA(envelope) valid_recipients, res = lmtp_handle_DATA(envelope)
# Reinject the mail back into Postfix. # Reinject the mail back into Postfix.
if valid_recipients: if valid_recipients:
logging.info("Reinjecting the mail") logging.info("afterqueue: re-injecting the mail")
client = SMTPClient("localhost", "10026") client = SMTPClient("localhost", "10027")
client.sendmail(envelope.mail_from, valid_recipients, envelope.content) client.sendmail(envelope.mail_from, valid_recipients, envelope.content)
else: else:
logging.info("no valid recipients, ignoring mail") logging.info("no valid recipients, ignoring mail")
@@ -55,13 +87,6 @@ class ExampleHandler:
return "\r\n".join(res) return "\r\n".join(res)
async def asyncmain(loop):
controller = ExampleController(
ExampleHandler(), unix_socket="/var/spool/postfix/private/filtermail"
)
controller.start()
def lmtp_handle_DATA(envelope): def lmtp_handle_DATA(envelope):
"""the central filtering function for e-mails.""" """the central filtering function for e-mails."""
logging.info(f"Processing DATA message from {envelope.mail_from}") logging.info(f"Processing DATA message from {envelope.mail_from}")
@@ -113,13 +138,35 @@ def lmtp_handle_DATA(envelope):
return valid_recipients, res return valid_recipients, res
class UnixController(UnixSocketController):
def factory(self):
return LMTP(self.handler, **self.SMTP_kwargs)
class SMTPController(Controller):
def factory(self):
return SMTP(self.handler, **self.SMTP_kwargs)
async def asyncmain_afterqueue(loop, unix_socket_fn):
UnixController(AfterQueueHandler(), unix_socket=unix_socket_fn).start()
async def asyncmain_beforequeue(loop, port):
Controller(BeforeQueueHandler(), hostname="127.0.0.1", port=port).start()
def main(): def main():
args = sys.argv[1:]
assert len(args) == 2
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
loop.create_task(asyncmain(loop=loop)) if args[0] == "afterqueue":
task = asyncmain_afterqueue(loop, args[1])
elif args[0] == "beforequeue":
task = asyncmain_beforequeue(loop, port=int(args[1]))
else:
raise SystemExit(1)
loop.create_task(task)
loop.run_forever() loop.run_forever()
if __name__ == "__main__":
main()

View File

@@ -1,10 +0,0 @@
[Unit]
Description=Email filter for chatmail servers
[Service]
ExecStart=/usr/local/bin/filtermail
Restart=always
RestartSec=30
[Install]
WantedBy=multi-user.target

View File

@@ -1,6 +1,7 @@
from .filtermail import check_encrypted, lmtp_handle_DATA from .filtermail import check_encrypted, lmtp_handle_DATA, SendRateLimiter
from email.parser import BytesParser from email.parser import BytesParser
from email import policy from email import policy
import pytest
def test_reject_forged_from(): def test_reject_forged_from():
@@ -326,3 +327,15 @@ def test_filtermail():
] ]
).encode() ).encode()
) )
def test_send_rate_limiter():
limiter = SendRateLimiter()
for i in range(100):
if limiter.is_sending_allowed("some@example.org"):
if i <= SendRateLimiter.MAX_USER_SEND_PER_MINUTE:
continue
pytest.fail("limiter didn't work")
else:
assert i == SendRateLimiter.MAX_USER_SEND_PER_MINUTE + 1
break

View File

@@ -53,24 +53,25 @@ def _install_chatmaild() -> None:
daemon_reload=True, daemon_reload=True,
) )
files.put( for fn in ("filtermail-after", "filtermail-before"):
name="upload filtermail.service", files.put(
src=importlib.resources.files("chatmaild") name=f"upload {fn}.service",
.joinpath("filtermail.service") src=importlib.resources.files("chatmaild")
.open("rb"), .joinpath(f"{fn}.service")
dest="/etc/systemd/system/filtermail.service", .open("rb"),
user="root", dest=f"/etc/systemd/system/{fn}.service",
group="root", user="root",
mode="644", group="root",
) mode="644",
systemd.service( )
name="Setup filtermail service", systemd.service(
service="filtermail.service", name=f"Setup {fn} service",
running=True, service=f"{fn}.service",
enabled=True, running=True,
restarted=True, enabled=True,
daemon_reload=True, restarted=True,
) daemon_reload=True,
)
def _configure_opendkim(domain: str, dkim_selector: str) -> bool: def _configure_opendkim(domain: str, dkim_selector: str) -> bool:

View File

@@ -32,7 +32,8 @@ submission inet n - y - - smtpd
-o smtpd_recipient_restrictions= -o smtpd_recipient_restrictions=
-o smtpd_relay_restrictions=permit_sasl_authenticated,reject -o smtpd_relay_restrictions=permit_sasl_authenticated,reject
-o milter_macro_daemon_name=ORIGINATING -o milter_macro_daemon_name=ORIGINATING
-o content_filter=filter:unix:private/filtermail -o smtpd_proxy_filter=127.0.0.1:10080
-o content_filter=filter:unix:private/filtermail-afterqueue
smtps inet n - y - - smtpd smtps inet n - y - - smtpd
-o syslog_name=postfix/smtps -o syslog_name=postfix/smtps
-o smtpd_tls_wrappermode=yes -o smtpd_tls_wrappermode=yes
@@ -47,7 +48,7 @@ smtps inet n - y - - smtpd
-o smtpd_recipient_restrictions= -o smtpd_recipient_restrictions=
-o smtpd_relay_restrictions=permit_sasl_authenticated,reject -o smtpd_relay_restrictions=permit_sasl_authenticated,reject
-o milter_macro_daemon_name=ORIGINATING -o milter_macro_daemon_name=ORIGINATING
-o content_filter=filter:unix:private/filtermail -o smtpd_proxy_filter=127.0.0.1:10080
#628 inet n - y - - qmqpd #628 inet n - y - - qmqpd
pickup unix n - y 60 1 pickup pickup unix n - y 60 1 pickup
cleanup unix n - y - 0 cleanup cleanup unix n - y - 0 cleanup
@@ -77,4 +78,6 @@ postlog unix-dgram n - n - 1 postlogd
filter unix - n n - - lmtp filter unix - n n - - lmtp
# Local SMTP server for reinjecting filered mail. # Local SMTP server for reinjecting filered mail.
localhost:10026 inet n - n - 10 smtpd localhost:10026 inet n - n - 10 smtpd
-o content_filter=filter:unix:private/filtermail-afterqueue
localhost:10027 inet n - n - 10 smtpd
-o content_filter= -o content_filter=

View File

@@ -1,60 +1,34 @@
def test_tls_imap(benchmark, imap): def test_tls_serialized_connect(benchmark, imap_or_smtp):
def imap_connect(): def connect():
imap.connect() imap_or_smtp.connect()
benchmark(imap_connect, 10) benchmark(connect)
def test_login_imap(benchmark, imap, gencreds): def test_login(benchmark, imap_or_smtp, gencreds):
def imap_connect_and_login(): cls = imap_or_smtp.__class__
imap.connect() conns = []
imap.login(*gencreds()) for i in range(20):
conn = cls(imap_or_smtp.host)
conn.connect()
conns.append(conn)
benchmark(imap_connect_and_login, 10) def login():
conn = conns.pop()
conn.login(*gencreds())
benchmark(login)
def test_tls_smtp(benchmark, smtp): def test_send_and_receive_10(benchmark, cmfactory, lp):
def smtp_connect(): """send many messages between two accounts"""
smtp.connect() ac1, ac2 = cmfactory.get_online_accounts(2)
chat = cmfactory.get_accepted_chat(ac1, ac2)
benchmark(smtp_connect, 10) def send_10_receive_all():
for i in range(10):
chat.send_text(f"hello {i}")
for i in range(10):
ac2.wait_next_incoming_message()
benchmark(send_10_receive_all)
def test_login_smtp(benchmark, smtp, gencreds):
def smtp_connect_and_login():
smtp.connect()
smtp.login(*gencreds())
benchmark(smtp_connect_and_login, 10)
class TestDC:
def test_autoconfigure(self, benchmark, cmfactory):
def autoconfig_and_idle_ready():
cmfactory.get_online_accounts(1)
benchmark(autoconfig_and_idle_ready, 5)
def test_ping_pong(self, benchmark, cmfactory):
ac1, ac2 = cmfactory.get_online_accounts(2)
chat = cmfactory.get_accepted_chat(ac1, ac2)
def ping_pong():
chat.send_text("ping")
msg = ac2.wait_next_incoming_message()
msg.chat.send_text("pong")
ac1.wait_next_incoming_message()
benchmark(ping_pong, 3)
def test_send_10_receive_10(self, benchmark, cmfactory, lp):
ac1, ac2 = cmfactory.get_online_accounts(2)
chat = cmfactory.get_accepted_chat(ac1, ac2)
def send_10_receive_10():
for i in range(10):
chat.send_text(f"hello {i}")
for i in range(10):
ac2.wait_next_incoming_message()
benchmark(send_10_receive_10, 1)

View File

@@ -1,6 +1,5 @@
import os import os
import io import io
import time
import random import random
import subprocess import subprocess
import imaplib import imaplib
@@ -15,10 +14,6 @@ def pytest_addoption(parser):
) )
def pytest_configure(config):
config._benchresults = {}
def pytest_runtest_setup(item): def pytest_runtest_setup(item):
markers = list(item.iter_markers(name="slow")) markers = list(item.iter_markers(name="slow"))
if markers: if markers:
@@ -59,35 +54,6 @@ def pytest_report_header():
return ["-" * len(text), text, "-" * len(text)] return ["-" * len(text), text, "-" * len(text)]
@pytest.fixture
def benchmark(request):
def bench(func, num, name=None):
if name is None:
name = func.__name__
durations = []
for i in range(num):
now = time.time()
func()
durations.append(time.time() - now)
durations.sort()
request.config._benchresults[name] = durations
return bench
def pytest_terminal_summary(terminalreporter):
tr = terminalreporter
results = tr.config._benchresults
tr.section("benchmark results")
headers = f"{'benchmark name': <30} {'median': >6}"
tr.write_line(headers)
tr.write_line("-" * len(headers))
for name, durations in results.items():
median = sorted(durations)[len(durations) // 2]
median = f"{median:2.4f}"
tr.write_line(f"{name: <30} {median: >6}")
@pytest.fixture @pytest.fixture
def imap(maildomain): def imap(maildomain):
return ImapConn(maildomain) return ImapConn(maildomain)

View File

@@ -1,4 +1,5 @@
import pytest import pytest
import smtplib
def test_login_basic_functioning(imap_or_smtp, gencreds, lp): def test_login_basic_functioning(imap_or_smtp, gencreds, lp):
@@ -34,3 +35,22 @@ def test_login_same_password(imap_or_smtp, gencreds):
imap_or_smtp.login(user1, password1) imap_or_smtp.login(user1, password1)
imap_or_smtp.connect() imap_or_smtp.connect()
imap_or_smtp.login(user2, password1) imap_or_smtp.login(user2, password1)
@pytest.mark.slow
def test_exceed_rate_limit(cmsetup, gencreds, mailgen):
"""Test that the per-account send-mail limit is exceeded."""
user1, user2 = cmsetup.gen_users(2)
mail = mailgen.get_encrypted(user1.addr, user2.addr)
for i in range(100):
print("Sending mail", str(i))
try:
user1.smtp.sendmail(user1.addr, [user2.addr], mail)
except smtplib.SMTPException as e:
if i < 80:
pytest.fail(f"rate limit was exceeded too early with msg {i}")
outcome = e.recipients[user2.addr]
assert outcome[0] == 450
assert b'4.7.1: Too much mail from' in outcome[1]
return
pytest.fail("Rate limit was not exceeded")

View File

@@ -11,4 +11,4 @@ chatmaild/venv/bin/pip install --upgrade pytest build 'setuptools>=68'
chatmaild/venv/bin/pip install -e chatmaild chatmaild/venv/bin/pip install -e chatmaild
python3 -m venv online-tests/venv python3 -m venv online-tests/venv
online-tests/venv/bin/pip install pytest pytest-timeout pdbpp deltachat online-tests/venv/bin/pip install pytest pytest-timeout pdbpp deltachat pytest-benchmark