From 22d77f4680f1acb2cf3203800ca39f4d651fcaab Mon Sep 17 00:00:00 2001 From: holger krekel Date: Sun, 21 Jul 2024 13:22:58 +0200 Subject: [PATCH] splitout base class for dictproxy --- chatmaild/src/chatmaild/dictproxy.py | 68 ++++++++++++++++++ chatmaild/src/chatmaild/metadata.py | 70 ++----------------- .../src/chatmaild/tests/test_metadata.py | 54 +++++++------- 3 files changed, 100 insertions(+), 92 deletions(-) create mode 100644 chatmaild/src/chatmaild/dictproxy.py diff --git a/chatmaild/src/chatmaild/dictproxy.py b/chatmaild/src/chatmaild/dictproxy.py new file mode 100644 index 00000000..7191f12e --- /dev/null +++ b/chatmaild/src/chatmaild/dictproxy.py @@ -0,0 +1,68 @@ +import logging + + +class DictProxy: + def __init__(self): + self.transactions = {} + + def loop_forever(self, rfile, wfile): + while True: + msg = rfile.readline().strip().decode() + if not msg: + break + + res = self.handle_dovecot_request(msg) + if res: + wfile.write(res.encode("ascii")) + wfile.flush() + + def handle_dovecot_request(self, msg): + # see https://doc.dovecot.org/3.0/developer_manual/design/dict_protocol/ + short_command = msg[0] + parts = msg[1:].split("\t") + + if short_command == "L": + return self.handle_lookup(parts) + elif short_command == "I": + return self.handle_iterate(parts) + elif short_command == "H": + return # no version checking + + if short_command not in ("BSC"): + logging.warning(f"unknown dictproxy request: {msg!r}") + return + + transaction_id = parts[0] + + if short_command == "B": + return self.handle_begin_transaction(transaction_id, parts) + elif short_command == "C": + return self.handle_commit_transaction(transaction_id, parts) + elif short_command == "S": + return self.handle_set(transaction_id, parts) + + def handle_lookup(self, parts): + logging.warning(f"lookup ignored: {parts!r}") + return "N\n" + + def handle_iterate(self, parts): + # Empty line means ITER_FINISHED. + # If we don't return empty line Dovecot will timeout. + return "\n" + + def handle_begin_transaction(self, transaction_id, parts): + addr = parts[1] + self.transactions[transaction_id] = dict(addr=addr, res="O\n") + + def handle_set(self, transaction_id, parts): + # For documentation on key structure see + # https://github.com/dovecot/core/blob/main/src/lib-storage/mailbox-attribute.h + + self.transactions[transaction_id]["res"] = "F\n" + + def handle_commit_transaction(self, transaction_id, parts): + # each set devicetoken operation persists directly + # and does not wait until a "commit" comes + # because our dovecot config does not involve + # multiple set-operations in a single commit + return self.transactions.pop(transaction_id)["res"] diff --git a/chatmaild/src/chatmaild/metadata.py b/chatmaild/src/chatmaild/metadata.py index 2b625f35..a332ccab 100644 --- a/chatmaild/src/chatmaild/metadata.py +++ b/chatmaild/src/chatmaild/metadata.py @@ -8,17 +8,10 @@ from socketserver import ( ) from .config import read_config +from .dictproxy import DictProxy from .filedict import FileDict from .notifier import Notifier -DICTPROXY_HELLO_CHAR = "H" -DICTPROXY_LOOKUP_CHAR = "L" -DICTPROXY_ITERATE_CHAR = "I" -DICTPROXY_BEGIN_TRANSACTION_CHAR = "B" -DICTPROXY_SET_CHAR = "S" -DICTPROXY_COMMIT_TRANSACTION_CHAR = "C" -DICTPROXY_TRANSACTION_CHARS = "BSC" - class Metadata: # each SETMETADATA on this key appends to a list of unique device tokens @@ -48,48 +41,12 @@ class Metadata: return mdict.get(self.DEVICETOKEN_KEY, []) -class DovecotDictProxy: +class MetadataDictProxy(DictProxy): def __init__(self, notifier, metadata, iroh_relay=None): + super().__init__() self.notifier = notifier self.metadata = metadata self.iroh_relay = iroh_relay - self.transactions = {} - - def loop_forever(self, rfile, wfile): - while True: - msg = rfile.readline().strip().decode() - if not msg: - break - - res = self.handle_dovecot_request(msg) - if res: - wfile.write(res.encode("ascii")) - wfile.flush() - - def handle_dovecot_request(self, msg): - # see https://doc.dovecot.org/3.0/developer_manual/design/dict_protocol/ - short_command = msg[0] - parts = msg[1:].split("\t") - - if short_command == DICTPROXY_LOOKUP_CHAR: - return self.handle_lookup(parts) - elif short_command == DICTPROXY_ITERATE_CHAR: - return self.handle_iterate(parts) - elif short_command == DICTPROXY_HELLO_CHAR: - return # no version checking - - if short_command not in (DICTPROXY_TRANSACTION_CHARS): - logging.warning(f"unknown dictproxy request: {msg!r}") - return - - transaction_id = parts[0] - - if short_command == DICTPROXY_BEGIN_TRANSACTION_CHAR: - return self.handle_begin_transaction(transaction_id, parts) - elif short_command == DICTPROXY_COMMIT_TRANSACTION_CHAR: - return self.handle_commit_transaction(transaction_id, parts) - elif short_command == DICTPROXY_SET_CHAR: - return self.handle_set(transaction_id, parts) def handle_lookup(self, parts): # Lpriv/43f5f508a7ea0366dff30200c15250e3/devicetoken\tlkj123poi@c2.testrun.org @@ -111,19 +68,9 @@ class DovecotDictProxy: logging.warning(f"lookup ignored: {parts!r}") return "N\n" - def handle_iterate(self, parts): - # Empty line means ITER_FINISHED. - # If we don't return empty line Dovecot will timeout. - return "\n" - - def handle_begin_transaction(self, transaction_id, parts): - addr = parts[1] - self.transactions[transaction_id] = dict(addr=addr, res="O\n") - def handle_set(self, transaction_id, parts): # For documentation on key structure see # https://github.com/dovecot/core/blob/main/src/lib-storage/mailbox-attribute.h - keyname = parts[1].split("/") value = parts[2] if len(parts) > 2 else "" addr = self.transactions[transaction_id]["addr"] @@ -135,13 +82,6 @@ class DovecotDictProxy: # Transaction failed. self.transactions[transaction_id]["res"] = "F\n" - def handle_commit_transaction(self, transaction_id, parts): - # each set devicetoken operation persists directly - # and does not wait until a "commit" comes - # because our dovecot config does not involve - # multiple set-operations in a single commit - return self.transactions.pop(transaction_id)["res"] - class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer): request_queue_size = 100 @@ -164,14 +104,14 @@ def main(): notifier = Notifier(queue_dir) notifier.start_notification_threads(metadata.remove_token_from_addr) - dict_proxy = DovecotDictProxy( + dictproxy = MetadataDictProxy( notifier=notifier, metadata=metadata, iroh_relay=iroh_relay ) class Handler(StreamRequestHandler): def handle(self): try: - dict_proxy.loop_forever(self.rfile, self.wfile) + dictproxy.loop_forever(self.rfile, self.wfile) except Exception: logging.exception("Exception in the dovecot dictproxy handler") raise diff --git a/chatmaild/src/chatmaild/tests/test_metadata.py b/chatmaild/src/chatmaild/tests/test_metadata.py index b931ad13..7a0573d2 100644 --- a/chatmaild/src/chatmaild/tests/test_metadata.py +++ b/chatmaild/src/chatmaild/tests/test_metadata.py @@ -4,8 +4,8 @@ import time import pytest import requests from chatmaild.metadata import ( - DovecotDictProxy, Metadata, + MetadataDictProxy, ) from chatmaild.notifier import ( Notifier, @@ -29,8 +29,8 @@ def metadata(tmp_path): @pytest.fixture -def dict_proxy(notifier, metadata): - return DovecotDictProxy(notifier=notifier, metadata=metadata) +def dictproxy(notifier, metadata): + return MetadataDictProxy(notifier=notifier, metadata=metadata) @pytest.fixture @@ -92,48 +92,48 @@ def test_notifier_remove_without_set(metadata, testaddr): assert not metadata.get_tokens_for_addr(testaddr) -def test_handle_dovecot_request_lookup_fails(dict_proxy, testaddr): - res = dict_proxy.handle_dovecot_request(f"Lpriv/123/chatmail\t{testaddr}") +def test_handle_dovecot_request_lookup_fails(dictproxy, testaddr): + res = dictproxy.handle_dovecot_request(f"Lpriv/123/chatmail\t{testaddr}") assert res == "N\n" -def test_handle_dovecot_request_happy_path(dict_proxy, testaddr, token): - metadata = dict_proxy.metadata - transactions = dict_proxy.transactions - notifier = dict_proxy.notifier +def test_handle_dovecot_request_happy_path(dictproxy, testaddr, token): + metadata = dictproxy.metadata + transactions = dictproxy.transactions + notifier = dictproxy.notifier # set device token in a transaction tx = "1111" msg = f"B{tx}\t{testaddr}" - res = dict_proxy.handle_dovecot_request(msg) + res = dictproxy.handle_dovecot_request(msg) assert not res and not metadata.get_tokens_for_addr(testaddr) assert transactions == {tx: dict(addr=testaddr, res="O\n")} msg = f"S{tx}\tpriv/guid00/devicetoken\t{token}" - res = dict_proxy.handle_dovecot_request(msg) + res = dictproxy.handle_dovecot_request(msg) assert not res assert len(transactions) == 1 assert metadata.get_tokens_for_addr(testaddr) == [token] msg = f"C{tx}" - res = dict_proxy.handle_dovecot_request(msg) + res = dictproxy.handle_dovecot_request(msg) assert res == "O\n" assert len(transactions) == 0 assert metadata.get_tokens_for_addr(testaddr) == [token] # trigger notification for incoming message tx2 = "2222" - assert dict_proxy.handle_dovecot_request(f"B{tx2}\t{testaddr}") is None + assert dictproxy.handle_dovecot_request(f"B{tx2}\t{testaddr}") is None msg = f"S{tx2}\tpriv/guid00/messagenew" - assert dict_proxy.handle_dovecot_request(msg) is None + assert dictproxy.handle_dovecot_request(msg) is None queue_item = notifier.retry_queues[0].get()[1] assert queue_item.token == token - assert dict_proxy.handle_dovecot_request(f"C{tx2}") == "O\n" + assert dictproxy.handle_dovecot_request(f"C{tx2}") == "O\n" assert not transactions assert queue_item.path.exists() -def test_handle_dovecot_protocol_set_devicetoken(dict_proxy): +def test_handle_dovecot_protocol_set_devicetoken(dictproxy): rfile = io.BytesIO( b"\n".join( [ @@ -145,12 +145,12 @@ def test_handle_dovecot_protocol_set_devicetoken(dict_proxy): ) ) wfile = io.BytesIO() - dict_proxy.loop_forever(rfile, wfile) + dictproxy.loop_forever(rfile, wfile) assert wfile.getvalue() == b"O\n" - assert dict_proxy.metadata.get_tokens_for_addr("user@example.org") == ["01234"] + assert dictproxy.metadata.get_tokens_for_addr("user@example.org") == ["01234"] -def test_handle_dovecot_protocol_set_get_devicetoken(dict_proxy): +def test_handle_dovecot_protocol_set_get_devicetoken(dictproxy): rfile = io.BytesIO( b"\n".join( [ @@ -162,19 +162,19 @@ def test_handle_dovecot_protocol_set_get_devicetoken(dict_proxy): ) ) wfile = io.BytesIO() - dict_proxy.loop_forever(rfile, wfile) - assert dict_proxy.metadata.get_tokens_for_addr("user@example.org") == ["01234"] + dictproxy.loop_forever(rfile, wfile) + assert dictproxy.metadata.get_tokens_for_addr("user@example.org") == ["01234"] assert wfile.getvalue() == b"O\n" rfile = io.BytesIO( b"\n".join([b"HELLO", b"Lpriv/0123/devicetoken\tuser@example.org"]) ) wfile = io.BytesIO() - dict_proxy.loop_forever(rfile, wfile) + dictproxy.loop_forever(rfile, wfile) assert wfile.getvalue() == b"O01234\n" -def test_handle_dovecot_protocol_iterate(dict_proxy): +def test_handle_dovecot_protocol_iterate(dictproxy): rfile = io.BytesIO( b"\n".join( [ @@ -184,7 +184,7 @@ def test_handle_dovecot_protocol_iterate(dict_proxy): ) ) wfile = io.BytesIO() - dict_proxy.loop_forever(rfile, wfile) + dictproxy.loop_forever(rfile, wfile) assert wfile.getvalue() == b"\n" @@ -299,7 +299,7 @@ def test_persistent_queue_items(tmp_path, testaddr, token): assert not queue_item < item2 and not item2 < queue_item -def test_iroh_relay(dict_proxy): +def test_iroh_relay(dictproxy): rfile = io.BytesIO( b"\n".join( [ @@ -309,6 +309,6 @@ def test_iroh_relay(dict_proxy): ) ) wfile = io.BytesIO() - dict_proxy.iroh_relay = "https://example.org/" - dict_proxy.loop_forever(rfile, wfile) + dictproxy.iroh_relay = "https://example.org/" + dictproxy.loop_forever(rfile, wfile) assert wfile.getvalue() == b"Ohttps://example.org/\n"