feat: expose metadata "maxsmtprecipients" value

also add metadata tests and make metadata lookup method more readable by using structural match/case syntax
This commit is contained in:
holger krekel
2026-05-08 19:04:40 +02:00
parent 45fafa10a9
commit 3561ef20e0
2 changed files with 48 additions and 36 deletions

View File

@@ -88,29 +88,27 @@ class MetadataDictProxy(DictProxy):
def handle_lookup(self, parts):
# Lpriv/43f5f508a7ea0366dff30200c15250e3/devicetoken\tlkj123poi@c2.testrun.org
keyparts = parts[0].split("/", 2)
if keyparts[0] == "priv":
keyname = keyparts[2]
match parts[0].split("/", 2):
case ["priv", _, keyname] if keyname == self.metadata.DEVICETOKEN_KEY:
addr = parts[1]
if keyname == self.metadata.DEVICETOKEN_KEY:
res = " ".join(self.metadata.get_tokens_for_addr(addr))
return f"O{res}\n"
elif keyparts[0] == "shared":
keyname = keyparts[2]
if (
keyname == "vendor/vendor.dovecot/pvt/server/vendor/deltachat/irohrelay"
and self.iroh_relay
):
# Handle `GETMETADATA "" /shared/vendor/deltachat/irohrelay`
case ["shared", _, keyname]:
prefix = "vendor/vendor.dovecot/pvt/server/vendor/deltachat/"
if keyname.startswith(prefix):
match keyname[len(prefix) :]:
case "irohrelay" if self.iroh_relay:
return f"O{self.iroh_relay}\n"
elif keyname == "vendor/vendor.dovecot/pvt/server/vendor/deltachat/turn":
case "turn":
try:
res = turn_credentials()
except Exception:
logging.exception("failed to get TURN credentials")
return "N\n"
port = 3478
return f"O{self.turn_hostname}:{port}:{res}\n"
return f"O{self.turn_hostname}:3478:{res}\n"
case "maxsmtprecipients":
# postfix default (see "postconf smtpd_recipient_limit")
return "O1000\n"
logging.warning(f"lookup ignored: {parts!r}")
return "N\n"
@@ -120,10 +118,11 @@ class MetadataDictProxy(DictProxy):
# https://github.com/dovecot/core/blob/main/src/lib-storage/mailbox-attribute.h
keyname = parts[1].split("/")
value = parts[2] if len(parts) > 2 else ""
if keyname[0] == "priv" and keyname[2] == self.metadata.DEVICETOKEN_KEY:
match keyname:
case ["priv", _, key] if key == self.metadata.DEVICETOKEN_KEY:
self.metadata.add_token_to_addr(addr, value)
return True
elif keyname[0] == "priv" and keyname[2] == "messagenew":
case ["priv", _, "messagenew"]:
self.notifier.new_message_for_addr(addr, self.metadata)
return True

View File

@@ -360,15 +360,8 @@ def test_turn_credentials_success(notifier, metadata, monkeypatch):
def test_iroh_relay(dictproxy):
rfile = io.BytesIO(
b"\n".join(
[
b"H",
b"Lshared/0123/vendor/vendor.dovecot/pvt/server/vendor/deltachat/irohrelay\tuser@example.org",
]
)
)
wfile = io.BytesIO()
key = b"Lshared/0123/vendor/vendor.dovecot/pvt/server/vendor/deltachat/irohrelay\tuser@example.org"
rfile, wfile = io.BytesIO(b"H\n" + key), io.BytesIO()
dictproxy.iroh_relay = "https://example.org/"
dictproxy.loop_forever(rfile, wfile)
assert wfile.getvalue() == b"Ohttps://example.org/\n"
@@ -383,3 +376,23 @@ def test_legacy_token_migration(metadata, testaddr):
tokens = mdict[metadata.DEVICETOKEN_KEY]
assert isinstance(tokens, dict)
assert "oldtoken1" in tokens and "oldtoken2" in tokens
@pytest.mark.parametrize(
"suffix, expected",
[
(b"vendor/deltachat/maxsmtprecipients", b"O1000\n"),
(b"wrong/prefix/key", b"N\n"),
(b"vendor/deltachat/unknown", b"N\n"),
],
ids=["maxsmtprecipients", "prefix_mismatch", "unknown_name"],
)
def test_shared_lookup(dictproxy, suffix, expected):
key = (
b"Lshared/0123/vendor/vendor.dovecot/pvt/server/"
+ suffix
+ b"\tuser@example.org"
)
rfile, wfile = io.BytesIO(b"H\n" + key), io.BytesIO()
dictproxy.loop_forever(rfile, wfile)
assert wfile.getvalue() == expected