From 1e229ad2dee4331d32714c911bd3e566c93f7d2b Mon Sep 17 00:00:00 2001 From: holger krekel Date: Fri, 8 Mar 2024 02:56:33 +0100 Subject: [PATCH] Add tests to metadata/token handling and post notifications in background thread (#224) --- chatmaild/src/chatmaild/metadata.py | 149 +++++++++++------- .../src/chatmaild/tests/test_metadata.py | 132 ++++++++++++++++ 2 files changed, 228 insertions(+), 53 deletions(-) create mode 100644 chatmaild/src/chatmaild/tests/test_metadata.py diff --git a/chatmaild/src/chatmaild/metadata.py b/chatmaild/src/chatmaild/metadata.py index 52152cc2..63f9d878 100644 --- a/chatmaild/src/chatmaild/metadata.py +++ b/chatmaild/src/chatmaild/metadata.py @@ -1,75 +1,112 @@ import pwd + +from queue import Queue +from threading import Thread from socketserver import ( UnixStreamServer, StreamRequestHandler, ThreadingMixIn, ) -from .config import read_config, Config +from .config import read_config import sys import logging import os import requests -def handle_dovecot_protocol(rfile, wfile, tokens, requests_session, config: Config): +DICTPROXY_LOOKUP_CHAR = "L" +DICTPROXY_SET_CHAR = "S" +DICTPROXY_BEGIN_TRANSACTION_CHAR = "B" +DICTPROXY_COMMIT_TRANSACTION_CHAR = "C" +DICTPROXY_TRANSACTION_CHARS = "SBC" + + +class Notifier: + def __init__(self): + self.guid2token = {} + self.to_notify_queue = Queue() + + def set_token(self, guid, token): + self.guid2token[guid] = token + + def new_message_for_guid(self, guid): + self.to_notify_queue.put(guid) + + def thread_run_loop(self): + requests_session = requests.Session() + while 1: + self.thread_run_one(requests_session) + + def thread_run_one(self, requests_session): + guid = self.to_notify_queue.get() + token = self.guid2token.get(guid) + if token: + response = requests_session.post( + "https://notifications.delta.chat/notify", + data=token, + timeout=60, + ) + if response.status_code == 410: + # 410 Gone status code + # means the token is no longer valid. + del self.guid2token[guid] + + +def handle_dovecot_protocol(rfile, wfile, notifier): # HELLO message, ignored. msg = rfile.readline().strip().decode() transactions = {} - while True: msg = rfile.readline().strip().decode() if not msg: break - short_command = msg[0] - if short_command == "L": - wfile.write(b"N\n") - elif short_command == "S": - # See header of - # - # for the documentation on the structure of the key. + res = handle_dovecot_request(msg, transactions, notifier) + if res: + wfile.write(res.encode("ascii")) + wfile.flush() - # Request GETMETADATA "INBOX" /private/chatmail - # results in a query for - # priv/dd72550f05eadc65542a1200cac67ad7/chatmail - # - # Request GETMETADATA "" /private/chatmail - # results in - # priv/dd72550f05eadc65542a1200cac67ad7/vendor/vendor.dovecot/pvt/server/chatmail - parts = msg[1:].split("\t") - transaction_id = parts[0] - keyname = parts[1].split("/") - value = parts[2] if len(parts) > 2 else "" - if keyname[0] == "priv" and keyname[2] == "devicetoken": - tokens[keyname[1]] = value - elif keyname[0] == "priv" and keyname[2] == "messagenew": - guid = keyname[1] - token = tokens.get(guid) - if token: - response = requests_session.post( - "https://notifications.delta.chat/notify", - data=token, - timeout=60, - ) - if response.status_code == 410: - # 410 Gone status code - # means the token is no longer valid. - del tokens[guid] - else: - # Transaction failed. - transactions[transaction_id] = b"F\n" - elif short_command == "B": - # Begin transaction. - transaction_id = msg[1:].split("\t")[0] - transactions[transaction_id] = b"O\n" - elif short_command == "C": - # Commit transaction. - transaction_id = msg[1:].split("\t")[0] - wfile.write(transactions.pop(transaction_id, b"N\n")) +def handle_dovecot_request(msg, transactions, notifier): + # see https://doc.dovecot.org/3.0/developer_manual/design/dict_protocol/ + short_command = msg[0] + parts = msg[1:].split("\t") + if short_command == DICTPROXY_LOOKUP_CHAR: + return "N\n" - wfile.flush() + if short_command not in (DICTPROXY_TRANSACTION_CHARS): + return + + transaction_id = parts[0] + + if short_command == DICTPROXY_BEGIN_TRANSACTION_CHAR: + transactions[transaction_id] = "O\n" + elif short_command == DICTPROXY_COMMIT_TRANSACTION_CHAR: + # returns whether it failed or succeeded. + return transactions.pop(transaction_id, "N\n") + elif short_command == DICTPROXY_SET_CHAR: + # See header of + # + # for the documentation on the structure of the key. + + # Request GETMETADATA "INBOX" /private/chatmail + # results in a query for + # priv/dd72550f05eadc65542a1200cac67ad7/chatmail + # + # Request GETMETADATA "" /private/chatmail + # results in + # priv/dd72550f05eadc65542a1200cac67ad7/vendor/vendor.dovecot/pvt/server/chatmail + + keyname = parts[1].split("/") + value = parts[2] if len(parts) > 2 else "" + if keyname[0] == "priv" and keyname[2] == "devicetoken": + notifier.set_token(keyname[1], value) + elif keyname[0] == "priv" and keyname[2] == "messagenew": + notifier.new_message_for_guid(keyname[1]) + else: + # Transaction failed. + transactions[transaction_id] = "F\n" class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer): @@ -79,16 +116,15 @@ class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer): def main(): socket, username, config = sys.argv[1:] passwd_entry = pwd.getpwnam(username) + + # XXX config is not currently used config = read_config(config) - tokens = {} - requests_session = requests.Session() + notifier = Notifier() class Handler(StreamRequestHandler): def handle(self): try: - handle_dovecot_protocol( - self.rfile, self.wfile, tokens, requests_session, config - ) + handle_dovecot_protocol(self.rfile, self.wfile, notifier) except Exception: logging.exception("Exception in the handler") raise @@ -98,6 +134,13 @@ def main(): except FileNotFoundError: pass + # start notifier thread for signalling new messages to + # Delta Chat notification server + + t = Thread(target=notifier.thread_run_loop) + t.setDaemon(True) + t.start() + with ThreadedUnixStreamServer(socket, Handler) as server: os.chown(socket, uid=passwd_entry.pw_uid, gid=passwd_entry.pw_gid) try: diff --git a/chatmaild/src/chatmaild/tests/test_metadata.py b/chatmaild/src/chatmaild/tests/test_metadata.py new file mode 100644 index 00000000..8b3389bb --- /dev/null +++ b/chatmaild/src/chatmaild/tests/test_metadata.py @@ -0,0 +1,132 @@ +import io + +from chatmaild.metadata import ( + handle_dovecot_request, + handle_dovecot_protocol, + Notifier, +) + + +def test_handle_dovecot_request_lookup_fails(): + notifier = Notifier() + res = handle_dovecot_request("Lpriv/123/chatmail", {}, notifier) + assert res == "N\n" + + +def test_handle_dovecot_request_happy_path(): + notifier = Notifier() + transactions = {} + + # lookups return the same NOTFOUND result + res = handle_dovecot_request("Lpriv/123/chatmail", transactions, notifier) + assert res == "N\n" + assert not notifier.guid2token and not transactions + + # set device token in a transaction + tx = "1111" + msg = f"B{tx}\tuser" + res = handle_dovecot_request(msg, transactions, notifier) + assert not res and not notifier.guid2token + assert transactions == {tx: "O\n"} + + msg = f"S{tx}\tpriv/guid00/devicetoken\t01234" + res = handle_dovecot_request(msg, transactions, notifier) + assert not res + assert len(transactions) == 1 + assert len(notifier.guid2token) == 1 + assert notifier.guid2token["guid00"] == "01234" + + msg = f"C{tx}" + res = handle_dovecot_request(msg, transactions, notifier) + assert res == "O\n" + assert len(transactions) == 0 + assert notifier.guid2token["guid00"] == "01234" + + # trigger notification for incoming message + assert handle_dovecot_request(f"B{tx}\tuser", transactions, notifier) is None + msg = f"S{tx}\tpriv/guid00/messagenew" + assert handle_dovecot_request(msg, transactions, notifier) is None + assert notifier.to_notify_queue.get() == "guid00" + assert notifier.to_notify_queue.qsize() == 0 + assert handle_dovecot_request(f"C{tx}\tuser", transactions, notifier) == "O\n" + assert not transactions + + +def test_handle_dovecot_protocol_set_devicetoken(): + rfile = io.BytesIO( + b"\n".join( + [ + b"HELLO", + b"Btx00\tuser", + b"Stx00\tpriv/guid00/devicetoken\t01234", + b"Ctx00", + ] + ) + ) + wfile = io.BytesIO() + notifier = Notifier() + handle_dovecot_protocol(rfile, wfile, notifier) + assert notifier.guid2token["guid00"] == "01234" + assert wfile.getvalue() == b"O\n" + + +def test_handle_dovecot_protocol_messagenew(): + rfile = io.BytesIO( + b"\n".join( + [ + b"HELLO", + b"Btx01\tuser", + b"Stx01\tpriv/guid00/messagenew", + b"Ctx01", + ] + ) + ) + wfile = io.BytesIO() + notifier = Notifier() + handle_dovecot_protocol(rfile, wfile, notifier) + assert wfile.getvalue() == b"O\n" + assert notifier.to_notify_queue.get() == "guid00" + assert notifier.to_notify_queue.qsize() == 0 + + +def test_notifier_thread_run(): + requests = [] + + class ReqMock: + def post(self, url, data, timeout): + requests.append((url, data, timeout)) + + class Result: + status_code = 200 + + return Result() + + notifier = Notifier() + notifier.set_token("guid00", "01234") + notifier.new_message_for_guid("guid00") + notifier.thread_run_one(ReqMock()) + url, data, timeout = requests[0] + assert data == "01234" + assert len(notifier.guid2token) == 1 + + +def test_notifier_thread_run_gone_removes_token(): + requests = [] + + class ReqMock: + def post(self, url, data, timeout): + requests.append((url, data, timeout)) + + class Result: + status_code = 410 + + return Result() + + notifier = Notifier() + notifier.set_token("guid00", "01234") + notifier.new_message_for_guid("guid00") + assert notifier.guid2token["guid00"] == "01234" + notifier.thread_run_one(ReqMock()) + url, data, timeout = requests[0] + assert data == "01234" + assert len(notifier.guid2token) == 0