get passthrough_recipients list from config

This commit is contained in:
holger krekel
2023-12-09 01:06:21 +01:00
parent 1b1f9365c9
commit bc27eb58bf
7 changed files with 82 additions and 71 deletions

View File

@@ -4,6 +4,8 @@ max_user_send_per_minute = 60
filtermail_smtp_port = 10080
postfix_reinject_port = 10025
passthrough_recipients = privacy@testrun.org xstore@testrun.org
[privacy:testrun]
domain = *.testrun.org
privacy_postal =

View File

@@ -35,6 +35,7 @@ class Config:
self.max_user_send_per_minute = int(params["max_user_send_per_minute"])
self.filtermail_smtp_port = int(params["filtermail_smtp_port"])
self.postfix_reinject_port = int(params["postfix_reinject_port"])
self.passthrough_recipients = params["passthrough_recipients"].split()
def _getbytefile(self):
return open(self._inipath, "rb")

View File

@@ -36,14 +36,6 @@ def check_encrypted(message):
return True
def is_passthrough_recipient(recipient):
"""Check whether a recipient is configured as passthrough."""
passthroughlist = ["privacy@testrun.org"]
if recipient in passthroughlist:
return True
return False
def check_mdn(message, envelope):
if len(envelope.rcpt_tos) != 1:
return False
@@ -72,9 +64,9 @@ def check_mdn(message, envelope):
return True
class SMTPController(Controller):
def factory(self):
return SMTP(self.handler, **self.SMTP_kwargs)
async def asyncmain_beforequeue(config):
port = config.filtermail_smtp_port
Controller(BeforeQueueHandler(config), hostname="127.0.0.1", port=port).start()
class BeforeQueueHandler:
@@ -97,7 +89,7 @@ class BeforeQueueHandler:
async def handle_DATA(self, server, session, envelope):
logging.info("handle_DATA before-queue")
error = check_DATA(envelope)
error = self.check_DATA(envelope)
if error:
return error
logging.info("re-injecting the mail that passed checks")
@@ -105,45 +97,39 @@ class BeforeQueueHandler:
client.sendmail(envelope.mail_from, envelope.rcpt_tos, envelope.content)
return "250 OK"
def check_DATA(self, envelope):
"""the central filtering function for e-mails."""
logging.info(f"Processing DATA message from {envelope.mail_from}")
async def asyncmain_beforequeue(config):
port = config.filtermail_smtp_port
Controller(BeforeQueueHandler(config), hostname="127.0.0.1", port=port).start()
message = BytesParser(policy=policy.default).parsebytes(envelope.content)
mail_encrypted = check_encrypted(message)
_, from_addr = parseaddr(message.get("from").strip())
logging.info(f"mime-from: {from_addr} envelope-from: {envelope.mail_from!r}")
if envelope.mail_from.lower() != from_addr.lower():
return f"500 Invalid FROM <{from_addr!r}> for <{envelope.mail_from!r}>"
def check_DATA(envelope):
"""the central filtering function for e-mails."""
logging.info(f"Processing DATA message from {envelope.mail_from}")
if not mail_encrypted and check_mdn(message, envelope):
return
message = BytesParser(policy=policy.default).parsebytes(envelope.content)
mail_encrypted = check_encrypted(message)
passthrough_recipients = self.config.passthrough_recipients
envelope_from_domain = from_addr.split("@").pop()
for recipient in envelope.rcpt_tos:
if envelope.mail_from == recipient:
# Always allow sending emails to self.
continue
if recipient in passthrough_recipients:
continue
res = recipient.split("@")
if len(res) != 2:
return f"500 Invalid address <{recipient}>"
_recipient_addr, recipient_domain = res
_, from_addr = parseaddr(message.get("from").strip())
logging.info(f"mime-from: {from_addr} envelope-from: {envelope.mail_from!r}")
if envelope.mail_from.lower() != from_addr.lower():
return f"500 Invalid FROM <{from_addr!r}> for <{envelope.mail_from!r}>"
if not mail_encrypted and check_mdn(message, envelope):
return
envelope_from_domain = from_addr.split("@").pop()
for recipient in envelope.rcpt_tos:
if envelope.mail_from == recipient:
# Always allow sending emails to self.
continue
if is_passthrough_recipient(recipient):
# Always allow recipients marked as passthrough
continue
res = recipient.split("@")
if len(res) != 2:
return f"500 Invalid address <{recipient}>"
_recipient_addr, recipient_domain = res
is_outgoing = recipient_domain != envelope_from_domain
if is_outgoing and not mail_encrypted:
is_securejoin = message.get("secure-join") in ["vc-request", "vg-request"]
if not is_securejoin:
return f"500 Invalid unencrypted mail to <{recipient}>"
is_outgoing = recipient_domain != envelope_from_domain
if is_outgoing and not mail_encrypted:
is_securejoin = message.get("secure-join") in ["vc-request", "vg-request"]
if not is_securejoin:
return f"500 Invalid unencrypted mail to <{recipient}>"
class SendRateLimiter:

View File

@@ -13,6 +13,7 @@ def test_read_config_without_mailname(tmp_path, create_ini, monkeypatch):
max_user_send_per_minute = 40
filtermail_smtp_port = 9875
postfix_reinject_port = 9999
passthrough_recipients =
"""
)
config = read_config(inipath)
@@ -26,6 +27,7 @@ def test_read_config_without_privacy_policy(tmp_path, create_ini):
max_user_send_per_minute = 40
filtermail_smtp_port = 9875
postfix_reinject_port = 9999
passthrough_recipients =
[privacy:testrun]
domain = *.example.org
@@ -36,6 +38,7 @@ def test_read_config_without_privacy_policy(tmp_path, create_ini):
assert config.max_user_send_per_minute == 40
assert config.filtermail_smtp_port == 9875
assert config.postfix_reinject_port == 9999
assert config.passthrough_recipients == []
assert not config.privacy_postal
assert not config.privacy_mail
assert not config.privacy_pdo
@@ -49,6 +52,7 @@ def test_read_config(create_ini):
max_user_send_per_minute = 40
filtermail_smtp_port = 10080
postfix_reinject_port = 10025
passthrough_recipients = x@example.org y@example.org
[privacy:testrun]
domain = *.testrun.org
@@ -73,6 +77,7 @@ def test_read_config(create_ini):
assert config.mailname == "something.testrun.org"
assert config.filtermail_smtp_port == 10080
assert config.postfix_reinject_port == 10025
assert config.passthrough_recipients == ["x@example.org", "y@example.org"]
assert config.privacy_postal == "Postal Ltd"
assert config.privacy_mail == "privacy@merlinux.eu"
lines = config.privacy_pdo.split("\n")

View File

@@ -1,10 +1,12 @@
from chatmaild.filtermail import (
check_encrypted,
check_DATA,
BeforeQueueHandler,
SendRateLimiter,
check_mdn,
is_passthrough_recipient,
)
from chatmaild.config import read_config
import pytest
@@ -14,18 +16,25 @@ def maildomain():
return "chatmail.example.org"
def test_reject_forged_from(maildata, gencreds):
@pytest.fixture
def handler(create_ini, maildomain):
config = read_config(create_ini(), maildomain)
return BeforeQueueHandler(config)
def test_reject_forged_from(maildata, gencreds, handler):
class env:
mail_from = gencreds()[0]
rcpt_tos = [gencreds()[0]]
# test that the filter lets good mail through
env.content = maildata("plain.eml", from_addr=env.mail_from).as_bytes()
assert not check_DATA(envelope=env)
assert not handler.check_DATA(envelope=env)
# test that the filter rejects forged mail
env.content = maildata("plain.eml", from_addr="forged@c3.testrun.org").as_bytes()
error = check_DATA(envelope=env)
error = handler.check_DATA(envelope=env)
assert "500" in error
@@ -47,7 +56,7 @@ def test_filtermail_encryption_detection(maildata):
assert not check_encrypted(msg)
def test_filtermail_is_mdn(maildata, gencreds):
def test_filtermail_is_mdn(maildata, gencreds, handler):
from_addr = gencreds()[0]
to_addr = gencreds()[0] + ".other"
msg = maildata("mdn.eml", from_addr, to_addr)
@@ -59,7 +68,8 @@ def test_filtermail_is_mdn(maildata, gencreds):
assert check_mdn(msg, env)
print(msg.as_string())
assert not check_DATA(env)
assert not handler.check_DATA(env)
def test_filtermail_to_multiple_recipients_no_mdn(maildata, gencreds):
@@ -88,12 +98,12 @@ def test_send_rate_limiter():
break
def test_excempt_privacy(maildata, gencreds):
def test_excempt_privacy(maildata, gencreds, handler):
from_addr = gencreds()[0]
to_addr = "privacy@testrun.org"
false_to = "privacy@tstrn.org"
false_to2 = "prvcy@testrun.org"
assert is_passthrough_recipient(to_addr)
assert to_addr in handler.config.passthrough_recipients
msg = maildata("plain.eml", from_addr, to_addr)
@@ -103,11 +113,11 @@ def test_excempt_privacy(maildata, gencreds):
content = msg.as_bytes()
# assert that None/no error is returned
assert not check_DATA(envelope=env)
assert not handler.check_DATA(envelope=env)
class env2:
mail_from = from_addr
rcpt_tos = [to_addr, false_to, false_to2]
content = msg.as_bytes()
assert "500" in check_DATA(envelope=env2)
assert "500" in handler.check_DATA(envelope=env2)

View File

@@ -6,6 +6,7 @@ import subprocess
import textwrap
import imaplib
import smtplib
import importlib.resources
import itertools
from email.parser import BytesParser
from email import policy
@@ -38,6 +39,14 @@ def pytest_runtest_setup(item):
pytest.skip("skipping slow test, use --slow to run")
@pytest.fixture
def inipath():
dpath = importlib.resources.files("chatmaild")
inipath = dpath.joinpath("../../../chatmail.ini").resolve()
assert inipath.exists()
return inipath
@pytest.fixture
def maildomain():
domain = os.environ.get("CHATMAIL_DOMAIN")
@@ -406,8 +415,10 @@ class CMUser:
@pytest.fixture
def create_ini(tmp_path):
def create_ini_func(source):
def create_ini(tmp_path, inipath):
def create_ini_func(source=None):
if source is None:
source = inipath.read_text()
p = tmp_path.joinpath("chatmail.ini")
assert not p.exists(), p
p.write_text(textwrap.dedent(source))

View File

@@ -5,14 +5,15 @@ from deploy_chatmail.www import build_webpages
from chatmaild.config import read_config
def create_ini(inipath, domain="example.org"):
inipath.write_text(
def make_config(create_ini, domain="example.org"):
inipath = create_ini(
textwrap.dedent(
f"""\
[params]
max_user_send_per_minute = 60
filtermail_smtp_port = 10080
postfix_reinject_port = 10025
passthrough_recipients =
[privacy:{domain}]
domain = example.org
@@ -27,25 +28,20 @@ def create_ini(inipath, domain="example.org"):
"""
)
)
return read_config(inipath, domain)
def test_build_webpages(tmp_path):
def test_build_webpages(tmp_path, create_ini):
pkgroot = importlib.resources.files("deploy_chatmail")
src_dir = pkgroot.joinpath("../../../www/src").resolve()
assert src_dir.exists(), src_dir
inipath = tmp_path.joinpath("chatmail.ini")
create_ini(inipath, "example.org")
config = read_config(inipath, "example.org")
config = make_config(create_ini, "example.org")
build_dir = tmp_path.joinpath("build")
build_webpages(src_dir, build_dir, config)
def test_get_settings(tmp_path):
inipath = tmp_path.joinpath("chatmail.ini")
create_ini(inipath, "example.org")
config = read_config(inipath, "example.org")
def test_get_settings(tmp_path, create_ini):
config = make_config(create_ini, "example.org")
assert config.privacy_postal == "address-line1\naddress-line2"
assert config.privacy_mail == "privacy@example.org"
assert config.privacy_pdo == "address-line3"