Compare commits

..

6 Commits

Author SHA1 Message Date
holger krekel
801b035f18 apply most of linkxt review comments 2023-10-18 18:58:39 +02:00
holger krekel
6f8823cbeb doc the test 2023-10-18 15:30:54 +02:00
holger krekel
c0673cc43f make test more readable 2023-10-18 15:19:20 +02:00
holger krekel
6dcd686701 also test that external addresses fail to be forged 2023-10-18 15:12:14 +02:00
holger krekel
087ae78edc refactor test to be more strict 2023-10-18 14:41:49 +02:00
holger krekel
7e93299bce initial forged-from protection 2023-10-18 13:15:22 +02:00
20 changed files with 189 additions and 410 deletions

View File

@@ -1,18 +0,0 @@
name: CI
on:
pull_request:
push:
jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Lint chatmaild
working-directory: chatmaild
run: pipx run tox
- name: Lint deploy-chatmail
working-directory: deploy-chatmail
run: pipx run tox

View File

@@ -2,13 +2,13 @@ import logging
import os
import sys
import json
import crypt
from socketserver import (
UnixStreamServer,
StreamRequestHandler,
ThreadingMixIn,
)
import pwd
import subprocess
from .database import Database
@@ -16,9 +16,17 @@ NOCREATE_FILE = "/etc/chatmail-nocreate"
def encrypt_password(password: str):
password = password.encode("ascii")
# https://doc.dovecot.org/configuration_manual/authentication/password_schemes/
passhash = crypt.crypt(password, crypt.METHOD_SHA512)
return "{SHA512-CRYPT}" + passhash
process = subprocess.Popen(
["doveadm", "pw", "-s", "SHA512-CRYPT"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
stdout_data, _stderr_data = process.communicate(
input=password + b"\n" + password + b"\n"
)
return stdout_data.decode("ascii").strip()
def create_user(db, user, password):

View File

@@ -1,14 +1,12 @@
#!/usr/bin/env python3
import asyncio
import logging
import time
import sys
from email.parser import BytesParser
from email import policy
from email.utils import parseaddr
from aiosmtpd.smtp import SMTP
from aiosmtpd.controller import Controller
from aiosmtpd.lmtp import LMTP
from aiosmtpd.controller import UnixSocketController
from smtplib import SMTP as SMTPClient
@@ -34,93 +32,94 @@ def check_encrypted(message):
return True
class SMTPController(Controller):
class ExampleController(UnixSocketController):
def factory(self):
return SMTP(self.handler, **self.SMTP_kwargs)
return LMTP(self.handler, **self.SMTP_kwargs)
class BeforeQueueHandler:
def __init__(self):
self.send_rate_limiter = SendRateLimiter()
async def handle_MAIL(self, server, session, envelope, address, mail_options):
logging.info(f"handle_MAIL from {address}")
envelope.mail_from = address
if not self.send_rate_limiter.is_sending_allowed(address):
return f"450 4.7.1: Too much mail from {address}"
parts = envelope.mail_from.split("@")
if len(parts) != 2:
return f"500 Invalid from address <{envelope.mail_from!r}>"
class ExampleHandler:
async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
envelope.rcpt_tos.append(address)
return "250 OK"
async def handle_DATA(self, server, session, envelope):
logging.info("handle_DATA before-queue")
error = check_DATA(envelope)
if error:
return error
logging.info("re-injecting the mail that passed checks")
client = SMTPClient("localhost", "10025")
client.sendmail(envelope.mail_from, envelope.rcpt_tos, envelope.content)
return "250 OK"
valid_recipients, res = lmtp_handle_DATA(envelope)
# Reinject the mail back into Postfix.
if valid_recipients:
logging.info("Reinjecting the mail")
client = SMTPClient("localhost", "10026")
client.sendmail(envelope.mail_from, valid_recipients, envelope.content)
else:
logging.info("no valid recipients, ignoring mail")
return "\r\n".join(res)
async def asyncmain_beforequeue(port):
Controller(BeforeQueueHandler(), hostname="127.0.0.1", port=port).start()
async def asyncmain(loop):
controller = ExampleController(
ExampleHandler(), unix_socket="/var/spool/postfix/private/filtermail"
)
controller.start()
def check_DATA(envelope):
def lmtp_handle_DATA(envelope):
"""the central filtering function for e-mails."""
logging.info(f"Processing DATA message from {envelope.mail_from}")
message = BytesParser(policy=policy.default).parsebytes(envelope.content)
mail_encrypted = check_encrypted(message)
_, from_addr = parseaddr(message.get("from").strip())
logging.info(f"mime-from: {from_addr} envelope-from: {envelope.mail_from!r}")
if envelope.mail_from.lower() != from_addr.lower():
return f"500 Invalid FROM <{from_addr!r}> for <{envelope.mail_from!r}>"
envelope_from_domain = from_addr.split("@").pop()
valid_recipients = []
res = []
for recipient in envelope.rcpt_tos:
my_local_domain = envelope.mail_from.split("@")
if len(my_local_domain) != 2:
res += [f"500 Invalid from address <{envelope.mail_from}>"]
continue
_, from_addr = parseaddr(message.get("from").strip())
logging.info(f"mime-from: {from_addr} envelope-from: {envelope.mail_from}")
if envelope.mail_from.lower() != from_addr.lower():
res += [f"500 Invalid FROM <{from_addr}> for <{envelope.mail_from}>"]
continue
if envelope.mail_from == recipient:
# Always allow sending emails to self.
valid_recipients += [recipient]
res += ["250 OK"]
continue
res = recipient.split("@")
if len(res) != 2:
return f"500 Invalid address <{recipient}>"
_recipient_addr, recipient_domain = res
is_outgoing = recipient_domain != envelope_from_domain
if is_outgoing and not mail_encrypted:
is_securejoin = message.get("secure-join") in ["vc-request", "vg-request"]
if not is_securejoin:
return f"500 Invalid unencrypted mail to <{recipient}>"
recipient_local_domain = recipient.split("@")
if len(recipient_local_domain) != 2:
res += [f"500 Invalid address <{recipient}>"]
continue
is_outgoing = recipient_local_domain[1] != my_local_domain[1]
class SendRateLimiter:
MAX_USER_SEND_PER_MINUTE = 80
if (
is_outgoing
and not mail_encrypted
and message.get("secure-join") != "vc-request"
and message.get("secure-join") != "vg-request"
):
res += ["500 Outgoing mail must be encrypted"]
continue
def __init__(self):
self.addr2timestamps = {}
valid_recipients += [recipient]
res += ["250 OK"]
def is_sending_allowed(self, mail_from):
last = self.addr2timestamps.setdefault(mail_from, [])
now = time.time()
last[:] = [ts for ts in last if ts >= (now - 60)]
if len(last) <= self.MAX_USER_SEND_PER_MINUTE:
last.append(now)
return True
return False
assert len(envelope.rcpt_tos) == len(res)
assert len(valid_recipients) <= len(res)
return valid_recipients, res
def main():
args = sys.argv[1:]
assert len(args) == 1
logging.basicConfig(level=logging.INFO)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncmain_beforequeue(port=int(args[0]))
loop.create_task(task)
loop.create_task(asyncmain(loop=loop))
loop.run_forever()
if __name__ == "__main__":
main()

View File

@@ -1,8 +1,8 @@
[Unit]
Description=Chatmail Postfix BeforeQeue filter
Description=Email filter for chatmail servers
[Service]
ExecStart=/usr/local/bin/filtermail 10080
ExecStart=/usr/local/bin/filtermail
Restart=always
RestartSec=30

View File

@@ -1,7 +1,6 @@
from .filtermail import check_encrypted, check_DATA, SendRateLimiter
from .filtermail import check_encrypted, lmtp_handle_DATA
from email.parser import BytesParser
from email import policy
import pytest
def test_reject_forged_from():
@@ -31,12 +30,15 @@ def test_reject_forged_from():
# test that the filter lets good mail through
envelope.content = makemail(envelope.mail_from).as_bytes()
assert not check_DATA(envelope=envelope)
valid_recipients, res = lmtp_handle_DATA(envelope=envelope)
assert valid_recipients == envelope.rcpt_tos
assert len(res) == 1 and "250" in res[0]
# test that the filter rejects forged mail
envelope.content = makemail("forged@c3.testrun.org").as_bytes()
error = check_DATA(envelope=envelope)
assert "500" in error
valid_recipients, res = lmtp_handle_DATA(envelope=envelope)
assert not valid_recipients
assert len(res) == 1 and "500" in res[0]
def test_filtermail():
@@ -324,15 +326,3 @@ def test_filtermail():
]
).encode()
)
def test_send_rate_limiter():
limiter = SendRateLimiter()
for i in range(100):
if limiter.is_sending_allowed("some@example.org"):
if i <= SendRateLimiter.MAX_USER_SEND_PER_MINUTE:
continue
pytest.fail("limiter didn't work")
else:
assert i == SendRateLimiter.MAX_USER_SEND_PER_MINUTE + 1
break

View File

@@ -34,28 +34,43 @@ def _install_chatmaild() -> None:
commands=[f"pip install --break-system-packages {remote_path}"],
)
for fn in (
"doveauth-dictproxy",
"filtermail",
):
files.put(
name=f"Upload {fn}.service",
src=importlib.resources.files("chatmaild")
.joinpath(f"{fn}.service")
.open("rb"),
dest=f"/etc/systemd/system/{fn}.service",
user="root",
group="root",
mode="644",
)
systemd.service(
name=f"Setup {fn} service",
service=f"{fn}.service",
running=True,
enabled=True,
restarted=True,
daemon_reload=True,
)
files.put(
name="upload doveauth-dictproxy.service",
src=importlib.resources.files("chatmaild")
.joinpath("doveauth-dictproxy.service")
.open("rb"),
dest="/etc/systemd/system/doveauth-dictproxy.service",
user="root",
group="root",
mode="644",
)
systemd.service(
name="Setup doveauth-dictproxy service",
service="doveauth-dictproxy.service",
running=True,
enabled=True,
restarted=True,
daemon_reload=True,
)
files.put(
name="upload filtermail.service",
src=importlib.resources.files("chatmaild")
.joinpath("filtermail.service")
.open("rb"),
dest="/etc/systemd/system/filtermail.service",
user="root",
group="root",
mode="644",
)
systemd.service(
name="Setup filtermail service",
service="filtermail.service",
running=True,
enabled=True,
restarted=True,
daemon_reload=True,
)
def _configure_opendkim(domain: str, dkim_selector: str) -> bool:
@@ -158,33 +173,6 @@ def _configure_dovecot(mail_server: str, debug: bool = False) -> bool:
return need_restart
def _configure_nginx(domain: str, debug: bool = False) -> bool:
"""Configures nginx HTTP server."""
need_restart = False
main_config = files.template(
src=importlib.resources.files(__package__).joinpath("nginx.conf.j2"),
dest="/etc/nginx/nginx.conf",
user="root",
group="root",
mode="644",
config={"domain_name": domain},
)
need_restart |= main_config.changed
autoconfig = files.template(
src=importlib.resources.files(__package__).joinpath("autoconfig.xml.j2"),
dest="/var/www/html/.well-known/autoconfig/mail/config-v1.1.xml",
user="root",
group="root",
mode="644",
config={"domain_name": domain},
)
need_restart |= autoconfig.changed
return need_restart
def deploy_chatmail(mail_domain: str, mail_server: str, dkim_selector: str) -> None:
"""Deploy a chat-mail instance.
@@ -206,7 +194,7 @@ def deploy_chatmail(mail_domain: str, mail_server: str, dkim_selector: str) -> N
)
# Deploy acmetool to have TLS certificates.
deploy_acmetool(nginx_hook=True, domains=[mail_server])
deploy_acmetool(domains=[mail_server])
apt.packages(
name="Install Postfix",
@@ -226,17 +214,11 @@ def deploy_chatmail(mail_domain: str, mail_server: str, dkim_selector: str) -> N
],
)
apt.packages(
name="Install nginx",
packages=["nginx"],
)
_install_chatmaild()
debug = False
dovecot_need_restart = _configure_dovecot(mail_server, debug=debug)
postfix_need_restart = _configure_postfix(mail_domain, debug=debug)
opendkim_need_restart = _configure_opendkim(mail_domain, dkim_selector)
nginx_need_restart = _configure_nginx(mail_domain)
systemd.service(
name="Start and enable OpenDKIM",
@@ -262,14 +244,6 @@ def deploy_chatmail(mail_domain: str, mail_server: str, dkim_selector: str) -> N
restarted=dovecot_need_restart,
)
systemd.service(
name="Start and enable nginx",
service="nginx.service",
running=True,
enabled=True,
restarted=nginx_need_restart,
)
# This file is used by auth proxy.
# https://wiki.debian.org/EtcMailName
server.shell(
@@ -277,22 +251,6 @@ def deploy_chatmail(mail_domain: str, mail_server: str, dkim_selector: str) -> N
commands=[f"echo {mail_domain} >/etc/mailname; chmod 644 /etc/mailname"],
)
journald_conf = files.put(
name="Configure journald",
src=importlib.resources.files(__package__).joinpath("journald.conf"),
dest="/etc/systemd/journald.conf",
user="root",
group="root",
mode="644",
)
systemd.service(
name="Start and enable journald",
service="systemd-journald.service",
running=True,
enabled=True,
restarted=journald_conf,
)
def callback():
result = server.shell(
commands=[

View File

@@ -1,6 +1,6 @@
import importlib.resources
from pyinfra.operations import apt, files, server
from pyinfra.operations import apt, files, systemd, server
def deploy_acmetool(nginx_hook=False, email="", domains=[]):
@@ -38,13 +38,22 @@ def deploy_acmetool(nginx_hook=False, email="", domains=[]):
email=email,
)
files.template(
src=importlib.resources.files(__package__).joinpath("target.yaml.j2"),
dest="/var/lib/acme/conf/target",
service_file = files.put(
src=importlib.resources.files(__package__)
.joinpath("acmetool-redirector.service")
.open("rb"),
dest="/etc/systemd/system/acmetool-redirector.service",
user="root",
group="root",
mode="644",
)
systemd.service(
name="Setup acmetool-redirector service",
service="acmetool-redirector.service",
running=False,
enabled=False,
restarted=service_file.changed,
)
for domain in domains:
server.shell(

View File

@@ -0,0 +1,11 @@
[Unit]
Description=acmetool HTTP redirector
[Service]
Type=notify
ExecStart=/usr/bin/acmetool redirector --service.uid=daemon
Restart=always
RestartSec=30
[Install]
WantedBy=multi-user.target

View File

@@ -1,7 +0,0 @@
request:
provider: https://acme-v02.api.letsencrypt.org/directory
key:
type: rsa
challenge:
webroot-paths:
- /var/www/html/.well-known/acme-challenge

View File

@@ -1,37 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<clientConfig version="1.1">
<emailProvider id="{{ config.domain_name }}">
<domain>{{ config.domain_name }}</domain>
<displayName>{{ config.domain_name }} chatmail</displayName>
<displayShortName>{{ config.domain_name }}</displayShortName>
<incomingServer type="imap">
<hostname>{{ config.domain_name }}</hostname>
<port>993</port>
<socketType>SSL</socketType>
<authentication>password-cleartext</authentication>
<username>%EMAILADDRESS%</username>
</incomingServer>
<incomingServer type="imap">
<hostname>{{ config.domain_name }}</hostname>
<port>143</port>
<socketType>STARTTLS</socketType>
<authentication>password-cleartext</authentication>
<username>%EMAILADDRESS%</username>
</incomingServer>
<outgoingServer type="smtp">
<hostname>{{ config.domain_name }}</hostname>
<port>465</port>
<socketType>SSL</socketType>
<authentication>password-cleartext</authentication>
<username>%EMAILADDRESS%</username>
</outgoingServer>
<outgoingServer type="smtp">
<hostname>{{ config.domain_name }}</hostname>
<port>587</port>
<socketType>STARTTLS</socketType>
<authentication>password-cleartext</authentication>
<username>%EMAILADDRESS%</username>
</outgoingServer>
</emailProvider>
</clientConfig>

View File

@@ -1,2 +0,0 @@
[Journal]
MaxRetentionSec=3d

View File

@@ -1,47 +0,0 @@
user www-data;
worker_processes auto;
pid /run/nginx.pid;
error_log /var/log/nginx/error.log;
events {
worker_connections 768;
# multi_accept on;
}
http {
sendfile on;
tcp_nopush on;
# Do not emit nginx version on error pages.
server_tokens off;
include /etc/nginx/mime.types;
default_type application/octet-stream;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3;
ssl_prefer_server_ciphers on;
ssl_certificate /var/lib/acme/live/{{ config.domain_name }}/fullchain;
ssl_certificate_key /var/lib/acme/live/{{ config.domain_name }}/privkey;
gzip on;
server {
listen 80 default_server;
listen [::]:80 default_server;
listen 443 ssl default_server;
listen [::]:443 ssl default_server;
root /var/www/html;
index index.html index.htm;
server_name _;
location / {
# First attempt to serve request as file, then
# as directory, then fall back to displaying a 404.
try_files $uri $uri/ =404;
}
}
}

View File

@@ -32,7 +32,7 @@ submission inet n - y - - smtpd
-o smtpd_recipient_restrictions=
-o smtpd_relay_restrictions=permit_sasl_authenticated,reject
-o milter_macro_daemon_name=ORIGINATING
-o smtpd_proxy_filter=127.0.0.1:10080
-o content_filter=filter:unix:private/filtermail
smtps inet n - y - - smtpd
-o syslog_name=postfix/smtps
-o smtpd_tls_wrappermode=yes
@@ -47,7 +47,7 @@ smtps inet n - y - - smtpd
-o smtpd_recipient_restrictions=
-o smtpd_relay_restrictions=permit_sasl_authenticated,reject
-o milter_macro_daemon_name=ORIGINATING
-o smtpd_proxy_filter=127.0.0.1:10080
-o content_filter=filter:unix:private/filtermail
#628 inet n - y - - qmqpd
pickup unix n - y 60 1 pickup
cleanup unix n - y - 0 cleanup
@@ -76,5 +76,5 @@ scache unix - - y - 1 scache
postlog unix-dgram n - n - 1 postlogd
filter unix - n n - - lmtp
# Local SMTP server for reinjecting filered mail.
localhost:10025 inet n - n - 10 smtpd
-o syslog_name=postfix/reinject
localhost:10026 inet n - n - 10 smtpd
-o content_filter=

View File

@@ -1,60 +1,34 @@
def test_tls_imap(benchmark, imap):
def imap_connect():
imap.connect()
def test_tls_serialized_connect(benchmark, imap_or_smtp):
def connect():
imap_or_smtp.connect()
benchmark(imap_connect, 10)
benchmark(connect)
def test_login_imap(benchmark, imap, gencreds):
def imap_connect_and_login():
imap.connect()
imap.login(*gencreds())
def test_login(benchmark, imap_or_smtp, gencreds):
cls = imap_or_smtp.__class__
conns = []
for i in range(20):
conn = cls(imap_or_smtp.host)
conn.connect()
conns.append(conn)
benchmark(imap_connect_and_login, 10)
def login():
conn = conns.pop()
conn.login(*gencreds())
benchmark(login)
def test_tls_smtp(benchmark, smtp):
def smtp_connect():
smtp.connect()
def test_send_and_receive_10(benchmark, cmfactory, lp):
"""send many messages between two accounts"""
ac1, ac2 = cmfactory.get_online_accounts(2)
chat = cmfactory.get_accepted_chat(ac1, ac2)
benchmark(smtp_connect, 10)
def send_10_receive_all():
for i in range(10):
chat.send_text(f"hello {i}")
for i in range(10):
ac2.wait_next_incoming_message()
def test_login_smtp(benchmark, smtp, gencreds):
def smtp_connect_and_login():
smtp.connect()
smtp.login(*gencreds())
benchmark(smtp_connect_and_login, 10)
class TestDC:
def test_autoconfigure(self, benchmark, cmfactory):
def autoconfig_and_idle_ready():
cmfactory.get_online_accounts(1)
benchmark(autoconfig_and_idle_ready, 5)
def test_ping_pong(self, benchmark, cmfactory):
ac1, ac2 = cmfactory.get_online_accounts(2)
chat = cmfactory.get_accepted_chat(ac1, ac2)
def ping_pong():
chat.send_text("ping")
msg = ac2.wait_next_incoming_message()
msg.chat.send_text("pong")
ac1.wait_next_incoming_message()
benchmark(ping_pong, 5)
def test_send_10_receive_10(self, benchmark, cmfactory, lp):
ac1, ac2 = cmfactory.get_online_accounts(2)
chat = cmfactory.get_accepted_chat(ac1, ac2)
def send_10_receive_10():
for i in range(10):
chat.send_text(f"hello {i}")
for i in range(10):
ac2.wait_next_incoming_message()
benchmark(send_10_receive_10, 5)
benchmark(send_10_receive_all)

View File

@@ -1,12 +1,10 @@
import os
import io
import time
import random
import subprocess
import imaplib
import smtplib
import itertools
from math import ceil
import pytest
@@ -16,13 +14,6 @@ def pytest_addoption(parser):
)
def pytest_configure(config):
config._benchresults = {}
config.addinivalue_line(
"markers", "slow: mark test to require --slow option to run"
)
def pytest_runtest_setup(item):
markers = list(item.iter_markers(name="slow"))
if markers:
@@ -63,49 +54,6 @@ def pytest_report_header():
return ["-" * len(text), text, "-" * len(text)]
@pytest.fixture
def benchmark(request):
def bench(func, num, name=None):
if name is None:
name = func.__name__
durations = []
for i in range(num):
now = time.time()
func()
durations.append(time.time() - now)
durations.sort()
request.config._benchresults[name] = durations
return bench
def pytest_terminal_summary(terminalreporter):
tr = terminalreporter
results = tr.config._benchresults
if not results:
return
tr.section("benchmark results")
float_names = 'median min max'.split()
width = max(map(len, float_names))
def fcol(parts):
return " ".join(part.rjust(width) for part in parts)
headers = f"{'benchmark name': <30} " + fcol(float_names)
tr.write_line(headers)
tr.write_line("-" * len(headers))
for name, durations in results.items():
measures = [
sorted(durations)[len(durations) // 2],
min(durations),
max(durations),
]
line = f"{name: <30} "
line += fcol(f"{float: 2.2f}" for float in measures)
tr.write_line(line)
@pytest.fixture
def imap(maildomain):
return ImapConn(maildomain)

View File

@@ -1,2 +1,3 @@
[pytest]
addopts = -vrsx --strict-markers
markers = slow: mark test as slow (requires --slow option to run)

View File

@@ -20,7 +20,7 @@ def test_use_two_chatmailservers(cmfactory, maildomain2):
@pytest.mark.parametrize("forgeaddr", ["internal", "someone@example.org"])
def test_reject_forged_from(cmsetup, mailgen, lp, forgeaddr):
def test_reject_forged_from(cmsetup, mailgen, lp, remote, forgeaddr):
user1, user3 = cmsetup.gen_users(2)
lp.sec("send encrypted message with forged from")
@@ -36,25 +36,17 @@ def test_reject_forged_from(cmsetup, mailgen, lp, forgeaddr):
print(f" {line}")
lp.sec("Send forged mail and check remote postfix lmtp processing result")
with pytest.raises(smtplib.SMTPException) as e:
user1.smtp.sendmail(from_addr=user1.addr, to_addrs=[user3.addr], msg=msg)
assert "500" in str(e.value)
remote_log = remote.iter_output("journalctl -t postfix/lmtp")
user1.smtp.sendmail(from_addr=user1.addr, to_addrs=[user3.addr], msg=msg)
for line in remote_log:
# print(line)
if "500 invalid from" in line and user3.addr in line:
break
else:
pytest.fail("remote postfix/filtermail failed to reject message")
@pytest.mark.slow
def test_exceed_rate_limit(cmsetup, gencreds, mailgen):
"""Test that the per-account send-mail limit is exceeded."""
user1, user2 = cmsetup.gen_users(2)
mail = mailgen.get_encrypted(user1.addr, user2.addr)
for i in range(100):
print("Sending mail", str(i))
try:
user1.smtp.sendmail(user1.addr, [user2.addr], mail)
except smtplib.SMTPException as e:
if i < 80:
pytest.fail(f"rate limit was exceeded too early with msg {i}")
outcome = e.recipients[user2.addr]
assert outcome[0] == 450
assert b'4.7.1: Too much mail from' in outcome[1]
# check that the logged in user (who sent the forged msg) got a non-delivery notice
for message in user1.imap.fetch_all_messages():
if "Invalid FROM" in message and addr_to_forge in message:
return
pytest.fail("Rate limit was not exceeded")
pytest.fail(f"forged From={addr_to_forge} did not cause non-delivery notice")

View File

@@ -1,5 +1,4 @@
import pytest
import smtplib
def test_login_basic_functioning(imap_or_smtp, gencreds, lp):

View File

@@ -6,8 +6,9 @@ deploy-chatmail/venv/bin/pip install -e deploy-chatmail
deploy-chatmail/venv/bin/pip install -e chatmaild
python3 -m venv chatmaild/venv
sudo apt install -y dovecot-core && sudo systemctl disable --now dovecot
chatmaild/venv/bin/pip install --upgrade pytest build 'setuptools>=68'
chatmaild/venv/bin/pip install -e chatmaild
python3 -m venv online-tests/venv
online-tests/venv/bin/pip install pytest pytest-timeout pdbpp deltachat
online-tests/venv/bin/pip install pytest pytest-timeout pdbpp deltachat pytest-benchmark