mirror of
https://github.com/chatmail/relay.git
synced 2026-05-19 04:18:09 +00:00
splitout base class for dictproxy
This commit is contained in:
68
chatmaild/src/chatmaild/dictproxy.py
Normal file
68
chatmaild/src/chatmaild/dictproxy.py
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
class DictProxy:
|
||||||
|
def __init__(self):
|
||||||
|
self.transactions = {}
|
||||||
|
|
||||||
|
def loop_forever(self, rfile, wfile):
|
||||||
|
while True:
|
||||||
|
msg = rfile.readline().strip().decode()
|
||||||
|
if not msg:
|
||||||
|
break
|
||||||
|
|
||||||
|
res = self.handle_dovecot_request(msg)
|
||||||
|
if res:
|
||||||
|
wfile.write(res.encode("ascii"))
|
||||||
|
wfile.flush()
|
||||||
|
|
||||||
|
def handle_dovecot_request(self, msg):
|
||||||
|
# see https://doc.dovecot.org/3.0/developer_manual/design/dict_protocol/
|
||||||
|
short_command = msg[0]
|
||||||
|
parts = msg[1:].split("\t")
|
||||||
|
|
||||||
|
if short_command == "L":
|
||||||
|
return self.handle_lookup(parts)
|
||||||
|
elif short_command == "I":
|
||||||
|
return self.handle_iterate(parts)
|
||||||
|
elif short_command == "H":
|
||||||
|
return # no version checking
|
||||||
|
|
||||||
|
if short_command not in ("BSC"):
|
||||||
|
logging.warning(f"unknown dictproxy request: {msg!r}")
|
||||||
|
return
|
||||||
|
|
||||||
|
transaction_id = parts[0]
|
||||||
|
|
||||||
|
if short_command == "B":
|
||||||
|
return self.handle_begin_transaction(transaction_id, parts)
|
||||||
|
elif short_command == "C":
|
||||||
|
return self.handle_commit_transaction(transaction_id, parts)
|
||||||
|
elif short_command == "S":
|
||||||
|
return self.handle_set(transaction_id, parts)
|
||||||
|
|
||||||
|
def handle_lookup(self, parts):
|
||||||
|
logging.warning(f"lookup ignored: {parts!r}")
|
||||||
|
return "N\n"
|
||||||
|
|
||||||
|
def handle_iterate(self, parts):
|
||||||
|
# Empty line means ITER_FINISHED.
|
||||||
|
# If we don't return empty line Dovecot will timeout.
|
||||||
|
return "\n"
|
||||||
|
|
||||||
|
def handle_begin_transaction(self, transaction_id, parts):
|
||||||
|
addr = parts[1]
|
||||||
|
self.transactions[transaction_id] = dict(addr=addr, res="O\n")
|
||||||
|
|
||||||
|
def handle_set(self, transaction_id, parts):
|
||||||
|
# For documentation on key structure see
|
||||||
|
# https://github.com/dovecot/core/blob/main/src/lib-storage/mailbox-attribute.h
|
||||||
|
|
||||||
|
self.transactions[transaction_id]["res"] = "F\n"
|
||||||
|
|
||||||
|
def handle_commit_transaction(self, transaction_id, parts):
|
||||||
|
# each set devicetoken operation persists directly
|
||||||
|
# and does not wait until a "commit" comes
|
||||||
|
# because our dovecot config does not involve
|
||||||
|
# multiple set-operations in a single commit
|
||||||
|
return self.transactions.pop(transaction_id)["res"]
|
||||||
@@ -8,17 +8,10 @@ from socketserver import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from .config import read_config
|
from .config import read_config
|
||||||
|
from .dictproxy import DictProxy
|
||||||
from .filedict import FileDict
|
from .filedict import FileDict
|
||||||
from .notifier import Notifier
|
from .notifier import Notifier
|
||||||
|
|
||||||
DICTPROXY_HELLO_CHAR = "H"
|
|
||||||
DICTPROXY_LOOKUP_CHAR = "L"
|
|
||||||
DICTPROXY_ITERATE_CHAR = "I"
|
|
||||||
DICTPROXY_BEGIN_TRANSACTION_CHAR = "B"
|
|
||||||
DICTPROXY_SET_CHAR = "S"
|
|
||||||
DICTPROXY_COMMIT_TRANSACTION_CHAR = "C"
|
|
||||||
DICTPROXY_TRANSACTION_CHARS = "BSC"
|
|
||||||
|
|
||||||
|
|
||||||
class Metadata:
|
class Metadata:
|
||||||
# each SETMETADATA on this key appends to a list of unique device tokens
|
# each SETMETADATA on this key appends to a list of unique device tokens
|
||||||
@@ -48,48 +41,12 @@ class Metadata:
|
|||||||
return mdict.get(self.DEVICETOKEN_KEY, [])
|
return mdict.get(self.DEVICETOKEN_KEY, [])
|
||||||
|
|
||||||
|
|
||||||
class DovecotDictProxy:
|
class MetadataDictProxy(DictProxy):
|
||||||
def __init__(self, notifier, metadata, iroh_relay=None):
|
def __init__(self, notifier, metadata, iroh_relay=None):
|
||||||
|
super().__init__()
|
||||||
self.notifier = notifier
|
self.notifier = notifier
|
||||||
self.metadata = metadata
|
self.metadata = metadata
|
||||||
self.iroh_relay = iroh_relay
|
self.iroh_relay = iroh_relay
|
||||||
self.transactions = {}
|
|
||||||
|
|
||||||
def loop_forever(self, rfile, wfile):
|
|
||||||
while True:
|
|
||||||
msg = rfile.readline().strip().decode()
|
|
||||||
if not msg:
|
|
||||||
break
|
|
||||||
|
|
||||||
res = self.handle_dovecot_request(msg)
|
|
||||||
if res:
|
|
||||||
wfile.write(res.encode("ascii"))
|
|
||||||
wfile.flush()
|
|
||||||
|
|
||||||
def handle_dovecot_request(self, msg):
|
|
||||||
# see https://doc.dovecot.org/3.0/developer_manual/design/dict_protocol/
|
|
||||||
short_command = msg[0]
|
|
||||||
parts = msg[1:].split("\t")
|
|
||||||
|
|
||||||
if short_command == DICTPROXY_LOOKUP_CHAR:
|
|
||||||
return self.handle_lookup(parts)
|
|
||||||
elif short_command == DICTPROXY_ITERATE_CHAR:
|
|
||||||
return self.handle_iterate(parts)
|
|
||||||
elif short_command == DICTPROXY_HELLO_CHAR:
|
|
||||||
return # no version checking
|
|
||||||
|
|
||||||
if short_command not in (DICTPROXY_TRANSACTION_CHARS):
|
|
||||||
logging.warning(f"unknown dictproxy request: {msg!r}")
|
|
||||||
return
|
|
||||||
|
|
||||||
transaction_id = parts[0]
|
|
||||||
|
|
||||||
if short_command == DICTPROXY_BEGIN_TRANSACTION_CHAR:
|
|
||||||
return self.handle_begin_transaction(transaction_id, parts)
|
|
||||||
elif short_command == DICTPROXY_COMMIT_TRANSACTION_CHAR:
|
|
||||||
return self.handle_commit_transaction(transaction_id, parts)
|
|
||||||
elif short_command == DICTPROXY_SET_CHAR:
|
|
||||||
return self.handle_set(transaction_id, parts)
|
|
||||||
|
|
||||||
def handle_lookup(self, parts):
|
def handle_lookup(self, parts):
|
||||||
# Lpriv/43f5f508a7ea0366dff30200c15250e3/devicetoken\tlkj123poi@c2.testrun.org
|
# Lpriv/43f5f508a7ea0366dff30200c15250e3/devicetoken\tlkj123poi@c2.testrun.org
|
||||||
@@ -111,19 +68,9 @@ class DovecotDictProxy:
|
|||||||
logging.warning(f"lookup ignored: {parts!r}")
|
logging.warning(f"lookup ignored: {parts!r}")
|
||||||
return "N\n"
|
return "N\n"
|
||||||
|
|
||||||
def handle_iterate(self, parts):
|
|
||||||
# Empty line means ITER_FINISHED.
|
|
||||||
# If we don't return empty line Dovecot will timeout.
|
|
||||||
return "\n"
|
|
||||||
|
|
||||||
def handle_begin_transaction(self, transaction_id, parts):
|
|
||||||
addr = parts[1]
|
|
||||||
self.transactions[transaction_id] = dict(addr=addr, res="O\n")
|
|
||||||
|
|
||||||
def handle_set(self, transaction_id, parts):
|
def handle_set(self, transaction_id, parts):
|
||||||
# For documentation on key structure see
|
# For documentation on key structure see
|
||||||
# https://github.com/dovecot/core/blob/main/src/lib-storage/mailbox-attribute.h
|
# https://github.com/dovecot/core/blob/main/src/lib-storage/mailbox-attribute.h
|
||||||
|
|
||||||
keyname = parts[1].split("/")
|
keyname = parts[1].split("/")
|
||||||
value = parts[2] if len(parts) > 2 else ""
|
value = parts[2] if len(parts) > 2 else ""
|
||||||
addr = self.transactions[transaction_id]["addr"]
|
addr = self.transactions[transaction_id]["addr"]
|
||||||
@@ -135,13 +82,6 @@ class DovecotDictProxy:
|
|||||||
# Transaction failed.
|
# Transaction failed.
|
||||||
self.transactions[transaction_id]["res"] = "F\n"
|
self.transactions[transaction_id]["res"] = "F\n"
|
||||||
|
|
||||||
def handle_commit_transaction(self, transaction_id, parts):
|
|
||||||
# each set devicetoken operation persists directly
|
|
||||||
# and does not wait until a "commit" comes
|
|
||||||
# because our dovecot config does not involve
|
|
||||||
# multiple set-operations in a single commit
|
|
||||||
return self.transactions.pop(transaction_id)["res"]
|
|
||||||
|
|
||||||
|
|
||||||
class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer):
|
class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer):
|
||||||
request_queue_size = 100
|
request_queue_size = 100
|
||||||
@@ -164,14 +104,14 @@ def main():
|
|||||||
notifier = Notifier(queue_dir)
|
notifier = Notifier(queue_dir)
|
||||||
notifier.start_notification_threads(metadata.remove_token_from_addr)
|
notifier.start_notification_threads(metadata.remove_token_from_addr)
|
||||||
|
|
||||||
dict_proxy = DovecotDictProxy(
|
dictproxy = MetadataDictProxy(
|
||||||
notifier=notifier, metadata=metadata, iroh_relay=iroh_relay
|
notifier=notifier, metadata=metadata, iroh_relay=iroh_relay
|
||||||
)
|
)
|
||||||
|
|
||||||
class Handler(StreamRequestHandler):
|
class Handler(StreamRequestHandler):
|
||||||
def handle(self):
|
def handle(self):
|
||||||
try:
|
try:
|
||||||
dict_proxy.loop_forever(self.rfile, self.wfile)
|
dictproxy.loop_forever(self.rfile, self.wfile)
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.exception("Exception in the dovecot dictproxy handler")
|
logging.exception("Exception in the dovecot dictproxy handler")
|
||||||
raise
|
raise
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import time
|
|||||||
import pytest
|
import pytest
|
||||||
import requests
|
import requests
|
||||||
from chatmaild.metadata import (
|
from chatmaild.metadata import (
|
||||||
DovecotDictProxy,
|
|
||||||
Metadata,
|
Metadata,
|
||||||
|
MetadataDictProxy,
|
||||||
)
|
)
|
||||||
from chatmaild.notifier import (
|
from chatmaild.notifier import (
|
||||||
Notifier,
|
Notifier,
|
||||||
@@ -29,8 +29,8 @@ def metadata(tmp_path):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def dict_proxy(notifier, metadata):
|
def dictproxy(notifier, metadata):
|
||||||
return DovecotDictProxy(notifier=notifier, metadata=metadata)
|
return MetadataDictProxy(notifier=notifier, metadata=metadata)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
@@ -92,48 +92,48 @@ def test_notifier_remove_without_set(metadata, testaddr):
|
|||||||
assert not metadata.get_tokens_for_addr(testaddr)
|
assert not metadata.get_tokens_for_addr(testaddr)
|
||||||
|
|
||||||
|
|
||||||
def test_handle_dovecot_request_lookup_fails(dict_proxy, testaddr):
|
def test_handle_dovecot_request_lookup_fails(dictproxy, testaddr):
|
||||||
res = dict_proxy.handle_dovecot_request(f"Lpriv/123/chatmail\t{testaddr}")
|
res = dictproxy.handle_dovecot_request(f"Lpriv/123/chatmail\t{testaddr}")
|
||||||
assert res == "N\n"
|
assert res == "N\n"
|
||||||
|
|
||||||
|
|
||||||
def test_handle_dovecot_request_happy_path(dict_proxy, testaddr, token):
|
def test_handle_dovecot_request_happy_path(dictproxy, testaddr, token):
|
||||||
metadata = dict_proxy.metadata
|
metadata = dictproxy.metadata
|
||||||
transactions = dict_proxy.transactions
|
transactions = dictproxy.transactions
|
||||||
notifier = dict_proxy.notifier
|
notifier = dictproxy.notifier
|
||||||
|
|
||||||
# set device token in a transaction
|
# set device token in a transaction
|
||||||
tx = "1111"
|
tx = "1111"
|
||||||
msg = f"B{tx}\t{testaddr}"
|
msg = f"B{tx}\t{testaddr}"
|
||||||
res = dict_proxy.handle_dovecot_request(msg)
|
res = dictproxy.handle_dovecot_request(msg)
|
||||||
assert not res and not metadata.get_tokens_for_addr(testaddr)
|
assert not res and not metadata.get_tokens_for_addr(testaddr)
|
||||||
assert transactions == {tx: dict(addr=testaddr, res="O\n")}
|
assert transactions == {tx: dict(addr=testaddr, res="O\n")}
|
||||||
|
|
||||||
msg = f"S{tx}\tpriv/guid00/devicetoken\t{token}"
|
msg = f"S{tx}\tpriv/guid00/devicetoken\t{token}"
|
||||||
res = dict_proxy.handle_dovecot_request(msg)
|
res = dictproxy.handle_dovecot_request(msg)
|
||||||
assert not res
|
assert not res
|
||||||
assert len(transactions) == 1
|
assert len(transactions) == 1
|
||||||
assert metadata.get_tokens_for_addr(testaddr) == [token]
|
assert metadata.get_tokens_for_addr(testaddr) == [token]
|
||||||
|
|
||||||
msg = f"C{tx}"
|
msg = f"C{tx}"
|
||||||
res = dict_proxy.handle_dovecot_request(msg)
|
res = dictproxy.handle_dovecot_request(msg)
|
||||||
assert res == "O\n"
|
assert res == "O\n"
|
||||||
assert len(transactions) == 0
|
assert len(transactions) == 0
|
||||||
assert metadata.get_tokens_for_addr(testaddr) == [token]
|
assert metadata.get_tokens_for_addr(testaddr) == [token]
|
||||||
|
|
||||||
# trigger notification for incoming message
|
# trigger notification for incoming message
|
||||||
tx2 = "2222"
|
tx2 = "2222"
|
||||||
assert dict_proxy.handle_dovecot_request(f"B{tx2}\t{testaddr}") is None
|
assert dictproxy.handle_dovecot_request(f"B{tx2}\t{testaddr}") is None
|
||||||
msg = f"S{tx2}\tpriv/guid00/messagenew"
|
msg = f"S{tx2}\tpriv/guid00/messagenew"
|
||||||
assert dict_proxy.handle_dovecot_request(msg) is None
|
assert dictproxy.handle_dovecot_request(msg) is None
|
||||||
queue_item = notifier.retry_queues[0].get()[1]
|
queue_item = notifier.retry_queues[0].get()[1]
|
||||||
assert queue_item.token == token
|
assert queue_item.token == token
|
||||||
assert dict_proxy.handle_dovecot_request(f"C{tx2}") == "O\n"
|
assert dictproxy.handle_dovecot_request(f"C{tx2}") == "O\n"
|
||||||
assert not transactions
|
assert not transactions
|
||||||
assert queue_item.path.exists()
|
assert queue_item.path.exists()
|
||||||
|
|
||||||
|
|
||||||
def test_handle_dovecot_protocol_set_devicetoken(dict_proxy):
|
def test_handle_dovecot_protocol_set_devicetoken(dictproxy):
|
||||||
rfile = io.BytesIO(
|
rfile = io.BytesIO(
|
||||||
b"\n".join(
|
b"\n".join(
|
||||||
[
|
[
|
||||||
@@ -145,12 +145,12 @@ def test_handle_dovecot_protocol_set_devicetoken(dict_proxy):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
wfile = io.BytesIO()
|
wfile = io.BytesIO()
|
||||||
dict_proxy.loop_forever(rfile, wfile)
|
dictproxy.loop_forever(rfile, wfile)
|
||||||
assert wfile.getvalue() == b"O\n"
|
assert wfile.getvalue() == b"O\n"
|
||||||
assert dict_proxy.metadata.get_tokens_for_addr("user@example.org") == ["01234"]
|
assert dictproxy.metadata.get_tokens_for_addr("user@example.org") == ["01234"]
|
||||||
|
|
||||||
|
|
||||||
def test_handle_dovecot_protocol_set_get_devicetoken(dict_proxy):
|
def test_handle_dovecot_protocol_set_get_devicetoken(dictproxy):
|
||||||
rfile = io.BytesIO(
|
rfile = io.BytesIO(
|
||||||
b"\n".join(
|
b"\n".join(
|
||||||
[
|
[
|
||||||
@@ -162,19 +162,19 @@ def test_handle_dovecot_protocol_set_get_devicetoken(dict_proxy):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
wfile = io.BytesIO()
|
wfile = io.BytesIO()
|
||||||
dict_proxy.loop_forever(rfile, wfile)
|
dictproxy.loop_forever(rfile, wfile)
|
||||||
assert dict_proxy.metadata.get_tokens_for_addr("user@example.org") == ["01234"]
|
assert dictproxy.metadata.get_tokens_for_addr("user@example.org") == ["01234"]
|
||||||
assert wfile.getvalue() == b"O\n"
|
assert wfile.getvalue() == b"O\n"
|
||||||
|
|
||||||
rfile = io.BytesIO(
|
rfile = io.BytesIO(
|
||||||
b"\n".join([b"HELLO", b"Lpriv/0123/devicetoken\tuser@example.org"])
|
b"\n".join([b"HELLO", b"Lpriv/0123/devicetoken\tuser@example.org"])
|
||||||
)
|
)
|
||||||
wfile = io.BytesIO()
|
wfile = io.BytesIO()
|
||||||
dict_proxy.loop_forever(rfile, wfile)
|
dictproxy.loop_forever(rfile, wfile)
|
||||||
assert wfile.getvalue() == b"O01234\n"
|
assert wfile.getvalue() == b"O01234\n"
|
||||||
|
|
||||||
|
|
||||||
def test_handle_dovecot_protocol_iterate(dict_proxy):
|
def test_handle_dovecot_protocol_iterate(dictproxy):
|
||||||
rfile = io.BytesIO(
|
rfile = io.BytesIO(
|
||||||
b"\n".join(
|
b"\n".join(
|
||||||
[
|
[
|
||||||
@@ -184,7 +184,7 @@ def test_handle_dovecot_protocol_iterate(dict_proxy):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
wfile = io.BytesIO()
|
wfile = io.BytesIO()
|
||||||
dict_proxy.loop_forever(rfile, wfile)
|
dictproxy.loop_forever(rfile, wfile)
|
||||||
assert wfile.getvalue() == b"\n"
|
assert wfile.getvalue() == b"\n"
|
||||||
|
|
||||||
|
|
||||||
@@ -299,7 +299,7 @@ def test_persistent_queue_items(tmp_path, testaddr, token):
|
|||||||
assert not queue_item < item2 and not item2 < queue_item
|
assert not queue_item < item2 and not item2 < queue_item
|
||||||
|
|
||||||
|
|
||||||
def test_iroh_relay(dict_proxy):
|
def test_iroh_relay(dictproxy):
|
||||||
rfile = io.BytesIO(
|
rfile = io.BytesIO(
|
||||||
b"\n".join(
|
b"\n".join(
|
||||||
[
|
[
|
||||||
@@ -309,6 +309,6 @@ def test_iroh_relay(dict_proxy):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
wfile = io.BytesIO()
|
wfile = io.BytesIO()
|
||||||
dict_proxy.iroh_relay = "https://example.org/"
|
dictproxy.iroh_relay = "https://example.org/"
|
||||||
dict_proxy.loop_forever(rfile, wfile)
|
dictproxy.loop_forever(rfile, wfile)
|
||||||
assert wfile.getvalue() == b"Ohttps://example.org/\n"
|
assert wfile.getvalue() == b"Ohttps://example.org/\n"
|
||||||
|
|||||||
Reference in New Issue
Block a user