implemented suggestion fopr using an absolute deadline instead of retrying but choose 5 hours for now because if our own notification server is down/buggy we have at least a bit of time to fix it

This commit is contained in:
holger krekel
2024-04-02 21:25:59 +02:00
parent bf18905e02
commit 60d7e516dd
2 changed files with 32 additions and 21 deletions

View File

@@ -14,7 +14,8 @@ If a token fails to cause a successful notification
it is moved to a retry-number specific PriorityQueue it is moved to a retry-number specific PriorityQueue
which handles all tokens that failed a particular number of times which handles all tokens that failed a particular number of times
and which are scheduled for retry using exponential back-off timing. and which are scheduled for retry using exponential back-off timing.
If a token exceeds MAX_NUMBER_OF_TRIES it is dropped with a log warning. If a token notification would be scheduled more than DROP_DEADLINE seconds
after its first attempt, it is dropped with a log error.
Note that tokens are completely opaque to the notification machinery here Note that tokens are completely opaque to the notification machinery here
and will in the future be encrypted foreclosing all ability to distinguish and will in the future be encrypted foreclosing all ability to distinguish
@@ -25,6 +26,7 @@ the `notification.delta.chat` service.
""" """
import time import time
import math
import logging import logging
from uuid import uuid4 from uuid import uuid4
from threading import Thread from threading import Thread
@@ -38,38 +40,45 @@ import requests
class PersistentQueueItem: class PersistentQueueItem:
path: Path path: Path
addr: str addr: str
start_ts: float
token: str token: str
def delete(self): def delete(self):
self.path.unlink(missing_ok=True) self.path.unlink(missing_ok=True)
@classmethod @classmethod
def create(cls, queue_dir, addr, token): def create(cls, queue_dir, addr, start_ts, token):
queue_id = uuid4().hex queue_id = uuid4().hex
path = queue_dir.joinpath(queue_id) path = queue_dir.joinpath(queue_id)
path.write_text(f"{addr}\n{token}") path.write_text(f"{addr}\n{start_ts}\n{token}")
return cls(path, addr, token) return cls(path, addr, start_ts, token)
@classmethod @classmethod
def read_from_path(cls, path): def read_from_path(cls, path):
addr, token = path.read_text().split("\n", maxsplit=1) addr, start_ts, token = path.read_text().split("\n", maxsplit=2)
return cls(path, addr, token) return cls(path, addr, float(start_ts), token)
class Notifier: class Notifier:
URL = "https://notifications.delta.chat/notify" URL = "https://notifications.delta.chat/notify"
CONNECTION_TIMEOUT = 60.0 # seconds until http-request is given up CONNECTION_TIMEOUT = 60.0 # seconds until http-request is given up
NOTIFICATION_RETRY_DELAY = 8.0 # seconds with exponential backoff BASE_DELAY = 8.0 # base seconds for exponential back-off delay
MAX_NUMBER_OF_TRIES = 6 DROP_DEADLINE = 5 * 60 * 60 # drop notifications after 5 hours
# exponential backoff means we try for 8^5 seconds, approximately 10 hours
def __init__(self, notification_dir): def __init__(self, notification_dir):
self.notification_dir = notification_dir self.notification_dir = notification_dir
self.retry_queues = [PriorityQueue() for _ in range(self.MAX_NUMBER_OF_TRIES)] max_tries = int(math.log(self.DROP_DEADLINE, self.BASE_DELAY)) + 1
self.retry_queues = [PriorityQueue() for _ in range(max_tries)]
def compute_delay(self, retry_num):
return 0 if retry_num == 0 else pow(self.BASE_DELAY, retry_num)
def new_message_for_addr(self, addr, metadata): def new_message_for_addr(self, addr, metadata):
start_ts = time.time()
for token in metadata.get_tokens_for_addr(addr): for token in metadata.get_tokens_for_addr(addr):
queue_item = PersistentQueueItem.create(self.notification_dir, addr, token) queue_item = PersistentQueueItem.create(
self.notification_dir, addr, start_ts, token
)
self.queue_for_retry(queue_item) self.queue_for_retry(queue_item)
def requeue_persistent_queue_items(self): def requeue_persistent_queue_items(self):
@@ -78,15 +87,14 @@ class Notifier:
self.queue_for_retry(queue_item) self.queue_for_retry(queue_item)
def queue_for_retry(self, queue_item, retry_num=0): def queue_for_retry(self, queue_item, retry_num=0):
if retry_num >= self.MAX_NUMBER_OF_TRIES: delay = self.compute_delay(retry_num)
when = time.time() + delay
deadline = queue_item.start_ts + self.DROP_DEADLINE
if retry_num >= len(self.retry_queues) or when > deadline:
queue_item.delete() queue_item.delete()
logging.warning("dropping after %d tries: %r", retry_num, queue_item.token) logging.error("notification exceeded deadline: %r", queue_item.token)
return return
when = time.time()
if retry_num > 0:
# back off exponentially with number of retries
when += pow(self.NOTIFICATION_RETRY_DELAY, retry_num)
self.retry_queues[retry_num].put((when, queue_item)) self.retry_queues[retry_num].put((when, queue_item))
def start_notification_threads(self, remove_token_from_addr): def start_notification_threads(self, remove_token_from_addr):

View File

@@ -206,7 +206,8 @@ def test_notifier_thread_connection_failures(
metadata.add_token_to_addr(testaddr, "01234") metadata.add_token_to_addr(testaddr, "01234")
notifier.new_message_for_addr(testaddr, metadata) notifier.new_message_for_addr(testaddr, metadata)
notifier.NOTIFICATION_RETRY_DELAY = 5 notifier.NOTIFICATION_RETRY_DELAY = 5
for i in range(notifier.MAX_NUMBER_OF_TRIES): max_tries = len(notifier.retry_queues)
for i in range(max_tries):
caplog.clear() caplog.clear()
reqmock = get_mocked_requests([status]) reqmock = get_mocked_requests([status])
sleep_calls = [] sleep_calls = []
@@ -215,12 +216,12 @@ def test_notifier_thread_connection_failures(
assert "request failed" in caplog.records[0].msg assert "request failed" in caplog.records[0].msg
if i > 0: if i > 0:
assert len(sleep_calls) == 1 assert len(sleep_calls) == 1
if i + 1 < notifier.MAX_NUMBER_OF_TRIES: if i + 1 < max_tries:
assert notifier.retry_queues[i + 1].qsize() == 1 assert notifier.retry_queues[i + 1].qsize() == 1
assert len(caplog.records) == 1 assert len(caplog.records) == 1
else: else:
assert len(caplog.records) == 2 assert len(caplog.records) == 2
assert "dropping" in caplog.records[1].msg assert "deadline" in caplog.records[1].msg
notifier.requeue_persistent_queue_items() notifier.requeue_persistent_queue_items()
assert notifier.retry_queues[0].qsize() == 0 assert notifier.retry_queues[0].qsize() == 0
@@ -267,11 +268,13 @@ def test_notifier_thread_run_gone_removes_token(metadata, notifier, testaddr):
def test_persistent_queue_items(tmp_path, testaddr, token): def test_persistent_queue_items(tmp_path, testaddr, token):
queue_item = PersistentQueueItem.create(tmp_path, testaddr, token) queue_item = PersistentQueueItem.create(tmp_path, testaddr, 432.0, token)
assert queue_item.addr == testaddr assert queue_item.addr == testaddr
assert queue_item.start_ts == 432.0
assert queue_item.token == token assert queue_item.token == token
item2 = PersistentQueueItem.read_from_path(queue_item.path) item2 = PersistentQueueItem.read_from_path(queue_item.path)
assert item2.addr == testaddr assert item2.addr == testaddr
assert item2.start_ts == 432.0
assert item2.token == token assert item2.token == token
assert item2 == queue_item assert item2 == queue_item
item2.delete() item2.delete()