use class for dispatching lookups

This commit is contained in:
holger krekel
2024-07-20 20:53:26 +02:00
parent 052fb64a3d
commit 46c34bfbea
2 changed files with 99 additions and 74 deletions

View File

@@ -48,46 +48,33 @@ class Metadata:
return mdict.get(self.DEVICETOKEN_KEY, []) return mdict.get(self.DEVICETOKEN_KEY, [])
def handle_dovecot_protocol(rfile, wfile, notifier, metadata, iroh_relay=None): class DovecotDictProxy:
transactions = {} def __init__(self, notifier, metadata, iroh_relay=None):
self.notifier = notifier
self.metadata = metadata
self.iroh_relay = iroh_relay
self.transactions = {}
def loop_forever(self, rfile, wfile):
while True: while True:
msg = rfile.readline().strip().decode() msg = rfile.readline().strip().decode()
if not msg: if not msg:
break break
res = handle_dovecot_request(msg, transactions, notifier, metadata, iroh_relay) res = self.handle_dovecot_request(msg)
if res: if res:
wfile.write(res.encode("ascii")) wfile.write(res.encode("ascii"))
wfile.flush() wfile.flush()
def handle_dovecot_request(self, msg):
def handle_dovecot_request(msg, transactions, notifier, metadata, iroh_relay=None):
# see https://doc.dovecot.org/3.0/developer_manual/design/dict_protocol/ # see https://doc.dovecot.org/3.0/developer_manual/design/dict_protocol/
short_command = msg[0] short_command = msg[0]
parts = msg[1:].split("\t") parts = msg[1:].split("\t")
if short_command == DICTPROXY_LOOKUP_CHAR: if short_command == DICTPROXY_LOOKUP_CHAR:
# Lpriv/43f5f508a7ea0366dff30200c15250e3/devicetoken\tlkj123poi@c2.testrun.org return self.handle_lookup(parts)
keyparts = parts[0].split("/", 2)
if keyparts[0] == "priv":
keyname = keyparts[2]
addr = parts[1]
if keyname == metadata.DEVICETOKEN_KEY:
res = " ".join(metadata.get_tokens_for_addr(addr))
return f"O{res}\n"
elif keyparts[0] == "shared":
keyname = keyparts[2]
if (
keyname == "vendor/vendor.dovecot/pvt/server/vendor/deltachat/irohrelay"
and iroh_relay
):
# Handle `GETMETADATA "" /shared/vendor/deltachat/irohrelay`
return f"O{iroh_relay}\n"
logging.warning(f"lookup ignored: {msg!r}")
return "N\n"
elif short_command == DICTPROXY_ITERATE_CHAR: elif short_command == DICTPROXY_ITERATE_CHAR:
# Empty line means ITER_FINISHED. return self.handle_iterate(parts)
# If we don't return empty line Dovecot will timeout.
return "\n"
elif short_command == DICTPROXY_HELLO_CHAR: elif short_command == DICTPROXY_HELLO_CHAR:
return # no version checking return # no version checking
@@ -98,28 +85,62 @@ def handle_dovecot_request(msg, transactions, notifier, metadata, iroh_relay=Non
transaction_id = parts[0] transaction_id = parts[0]
if short_command == DICTPROXY_BEGIN_TRANSACTION_CHAR: if short_command == DICTPROXY_BEGIN_TRANSACTION_CHAR:
addr = parts[1] return self.handle_begin_transaction(transaction_id, parts)
transactions[transaction_id] = dict(addr=addr, res="O\n")
elif short_command == DICTPROXY_COMMIT_TRANSACTION_CHAR: elif short_command == DICTPROXY_COMMIT_TRANSACTION_CHAR:
# each set devicetoken operation persists directly return self.handle_commit_transaction(transaction_id, parts)
# and does not wait until a "commit" comes
# because our dovecot config does not involve
# multiple set-operations in a single commit
return transactions.pop(transaction_id)["res"]
elif short_command == DICTPROXY_SET_CHAR: elif short_command == DICTPROXY_SET_CHAR:
return self.handle_set(transaction_id, parts)
def handle_lookup(self, parts):
# Lpriv/43f5f508a7ea0366dff30200c15250e3/devicetoken\tlkj123poi@c2.testrun.org
keyparts = parts[0].split("/", 2)
if keyparts[0] == "priv":
keyname = keyparts[2]
addr = parts[1]
if keyname == self.metadata.DEVICETOKEN_KEY:
res = " ".join(self.metadata.get_tokens_for_addr(addr))
return f"O{res}\n"
elif keyparts[0] == "shared":
keyname = keyparts[2]
if (
keyname == "vendor/vendor.dovecot/pvt/server/vendor/deltachat/irohrelay"
and self.iroh_relay
):
# Handle `GETMETADATA "" /shared/vendor/deltachat/irohrelay`
return f"O{self.iroh_relay}\n"
logging.warning(f"lookup ignored: {parts!r}")
return "N\n"
def handle_iterate(self, parts):
# Empty line means ITER_FINISHED.
# If we don't return empty line Dovecot will timeout.
return "\n"
def handle_begin_transaction(self, transaction_id, parts):
addr = parts[1]
self.transactions[transaction_id] = dict(addr=addr, res="O\n")
def handle_set(self, transaction_id, parts):
# For documentation on key structure see # For documentation on key structure see
# https://github.com/dovecot/core/blob/main/src/lib-storage/mailbox-attribute.h # https://github.com/dovecot/core/blob/main/src/lib-storage/mailbox-attribute.h
keyname = parts[1].split("/") keyname = parts[1].split("/")
value = parts[2] if len(parts) > 2 else "" value = parts[2] if len(parts) > 2 else ""
addr = transactions[transaction_id]["addr"] addr = self.transactions[transaction_id]["addr"]
if keyname[0] == "priv" and keyname[2] == metadata.DEVICETOKEN_KEY: if keyname[0] == "priv" and keyname[2] == self.metadata.DEVICETOKEN_KEY:
metadata.add_token_to_addr(addr, value) self.metadata.add_token_to_addr(addr, value)
elif keyname[0] == "priv" and keyname[2] == "messagenew": elif keyname[0] == "priv" and keyname[2] == "messagenew":
notifier.new_message_for_addr(addr, metadata) self.notifier.new_message_for_addr(addr, self.metadata)
else: else:
# Transaction failed. # Transaction failed.
transactions[transaction_id]["res"] = "F\n" self.transactions[transaction_id]["res"] = "F\n"
def handle_commit_transaction(self, transaction_id, parts):
# each set devicetoken operation persists directly
# and does not wait until a "commit" comes
# because our dovecot config does not involve
# multiple set-operations in a single commit
return self.transactions.pop(transaction_id)["res"]
class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer): class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer):
@@ -143,12 +164,14 @@ def main():
notifier = Notifier(queue_dir) notifier = Notifier(queue_dir)
notifier.start_notification_threads(metadata.remove_token_from_addr) notifier.start_notification_threads(metadata.remove_token_from_addr)
dict_proxy = DovecotDictProxy(
notifier=notifier, metadata=metadata, iroh_relay=iroh_relay
)
class Handler(StreamRequestHandler): class Handler(StreamRequestHandler):
def handle(self): def handle(self):
try: try:
handle_dovecot_protocol( dict_proxy.loop_forever(self.rfile, self.wfile)
self.rfile, self.wfile, notifier, metadata, iroh_relay
)
except Exception: except Exception:
logging.exception("Exception in the dovecot dictproxy handler") logging.exception("Exception in the dovecot dictproxy handler")
raise raise

View File

@@ -4,9 +4,8 @@ import time
import pytest import pytest
import requests import requests
from chatmaild.metadata import ( from chatmaild.metadata import (
DovecotDictProxy,
Metadata, Metadata,
handle_dovecot_protocol,
handle_dovecot_request,
) )
from chatmaild.notifier import ( from chatmaild.notifier import (
Notifier, Notifier,
@@ -29,6 +28,11 @@ def metadata(tmp_path):
return Metadata(vmail_dir) return Metadata(vmail_dir)
@pytest.fixture
def dict_proxy(notifier, metadata):
return DovecotDictProxy(notifier=notifier, metadata=metadata)
@pytest.fixture @pytest.fixture
def testaddr(): def testaddr():
return "user.name@example.org" return "user.name@example.org"
@@ -88,51 +92,48 @@ def test_notifier_remove_without_set(metadata, testaddr):
assert not metadata.get_tokens_for_addr(testaddr) assert not metadata.get_tokens_for_addr(testaddr)
def test_handle_dovecot_request_lookup_fails(notifier, metadata, testaddr): def test_handle_dovecot_request_lookup_fails(dict_proxy, testaddr):
res = handle_dovecot_request( res = dict_proxy.handle_dovecot_request(f"Lpriv/123/chatmail\t{testaddr}")
f"Lpriv/123/chatmail\t{testaddr}", {}, notifier, metadata
)
assert res == "N\n" assert res == "N\n"
def test_handle_dovecot_request_happy_path(notifier, metadata, testaddr, token): def test_handle_dovecot_request_happy_path(dict_proxy, testaddr, token):
transactions = {} metadata = dict_proxy.metadata
transactions = dict_proxy.transactions
notifier = dict_proxy.notifier
# set device token in a transaction # set device token in a transaction
tx = "1111" tx = "1111"
msg = f"B{tx}\t{testaddr}" msg = f"B{tx}\t{testaddr}"
res = handle_dovecot_request(msg, transactions, notifier, metadata) res = dict_proxy.handle_dovecot_request(msg)
assert not res and not metadata.get_tokens_for_addr(testaddr) assert not res and not metadata.get_tokens_for_addr(testaddr)
assert transactions == {tx: dict(addr=testaddr, res="O\n")} assert transactions == {tx: dict(addr=testaddr, res="O\n")}
msg = f"S{tx}\tpriv/guid00/devicetoken\t{token}" msg = f"S{tx}\tpriv/guid00/devicetoken\t{token}"
res = handle_dovecot_request(msg, transactions, notifier, metadata) res = dict_proxy.handle_dovecot_request(msg)
assert not res assert not res
assert len(transactions) == 1 assert len(transactions) == 1
assert metadata.get_tokens_for_addr(testaddr) == [token] assert metadata.get_tokens_for_addr(testaddr) == [token]
msg = f"C{tx}" msg = f"C{tx}"
res = handle_dovecot_request(msg, transactions, notifier, metadata) res = dict_proxy.handle_dovecot_request(msg)
assert res == "O\n" assert res == "O\n"
assert len(transactions) == 0 assert len(transactions) == 0
assert metadata.get_tokens_for_addr(testaddr) == [token] assert metadata.get_tokens_for_addr(testaddr) == [token]
# trigger notification for incoming message # trigger notification for incoming message
tx2 = "2222" tx2 = "2222"
assert ( assert dict_proxy.handle_dovecot_request(f"B{tx2}\t{testaddr}") is None
handle_dovecot_request(f"B{tx2}\t{testaddr}", transactions, notifier, metadata)
is None
)
msg = f"S{tx2}\tpriv/guid00/messagenew" msg = f"S{tx2}\tpriv/guid00/messagenew"
assert handle_dovecot_request(msg, transactions, notifier, metadata) is None assert dict_proxy.handle_dovecot_request(msg) is None
queue_item = notifier.retry_queues[0].get()[1] queue_item = notifier.retry_queues[0].get()[1]
assert queue_item.token == token assert queue_item.token == token
assert handle_dovecot_request(f"C{tx2}", transactions, notifier, metadata) == "O\n" assert dict_proxy.handle_dovecot_request(f"C{tx2}") == "O\n"
assert not transactions assert not transactions
assert queue_item.path.exists() assert queue_item.path.exists()
def test_handle_dovecot_protocol_set_devicetoken(metadata, notifier): def test_handle_dovecot_protocol_set_devicetoken(dict_proxy):
rfile = io.BytesIO( rfile = io.BytesIO(
b"\n".join( b"\n".join(
[ [
@@ -144,12 +145,12 @@ def test_handle_dovecot_protocol_set_devicetoken(metadata, notifier):
) )
) )
wfile = io.BytesIO() wfile = io.BytesIO()
handle_dovecot_protocol(rfile, wfile, notifier, metadata) dict_proxy.loop_forever(rfile, wfile)
assert wfile.getvalue() == b"O\n" assert wfile.getvalue() == b"O\n"
assert metadata.get_tokens_for_addr("user@example.org") == ["01234"] assert dict_proxy.metadata.get_tokens_for_addr("user@example.org") == ["01234"]
def test_handle_dovecot_protocol_set_get_devicetoken(metadata, notifier): def test_handle_dovecot_protocol_set_get_devicetoken(dict_proxy):
rfile = io.BytesIO( rfile = io.BytesIO(
b"\n".join( b"\n".join(
[ [
@@ -161,19 +162,19 @@ def test_handle_dovecot_protocol_set_get_devicetoken(metadata, notifier):
) )
) )
wfile = io.BytesIO() wfile = io.BytesIO()
handle_dovecot_protocol(rfile, wfile, notifier, metadata) dict_proxy.loop_forever(rfile, wfile)
assert metadata.get_tokens_for_addr("user@example.org") == ["01234"] assert dict_proxy.metadata.get_tokens_for_addr("user@example.org") == ["01234"]
assert wfile.getvalue() == b"O\n" assert wfile.getvalue() == b"O\n"
rfile = io.BytesIO( rfile = io.BytesIO(
b"\n".join([b"HELLO", b"Lpriv/0123/devicetoken\tuser@example.org"]) b"\n".join([b"HELLO", b"Lpriv/0123/devicetoken\tuser@example.org"])
) )
wfile = io.BytesIO() wfile = io.BytesIO()
handle_dovecot_protocol(rfile, wfile, notifier, metadata) dict_proxy.loop_forever(rfile, wfile)
assert wfile.getvalue() == b"O01234\n" assert wfile.getvalue() == b"O01234\n"
def test_handle_dovecot_protocol_iterate(metadata, notifier): def test_handle_dovecot_protocol_iterate(dict_proxy):
rfile = io.BytesIO( rfile = io.BytesIO(
b"\n".join( b"\n".join(
[ [
@@ -183,7 +184,7 @@ def test_handle_dovecot_protocol_iterate(metadata, notifier):
) )
) )
wfile = io.BytesIO() wfile = io.BytesIO()
handle_dovecot_protocol(rfile, wfile, notifier, metadata) dict_proxy.loop_forever(rfile, wfile)
assert wfile.getvalue() == b"\n" assert wfile.getvalue() == b"\n"
@@ -298,7 +299,7 @@ def test_persistent_queue_items(tmp_path, testaddr, token):
assert not queue_item < item2 and not item2 < queue_item assert not queue_item < item2 and not item2 < queue_item
def test_iroh_relay(metadata): def test_iroh_relay(dict_proxy):
rfile = io.BytesIO( rfile = io.BytesIO(
b"\n".join( b"\n".join(
[ [
@@ -308,5 +309,6 @@ def test_iroh_relay(metadata):
) )
) )
wfile = io.BytesIO() wfile = io.BytesIO()
handle_dovecot_protocol(rfile, wfile, notifier, metadata, "https://example.org/") dict_proxy.iroh_relay = "https://example.org/"
dict_proxy.loop_forever(rfile, wfile)
assert wfile.getvalue() == b"Ohttps://example.org/\n" assert wfile.getvalue() == b"Ohttps://example.org/\n"