Compare commits

..

1 Commits

Author SHA1 Message Date
holger krekel
a91aa39ec1 prepare 1.1.0 tag 2024-03-28 16:40:08 +01:00
15 changed files with 245 additions and 543 deletions

View File

@@ -4,17 +4,12 @@ on:
push: push:
branches: branches:
- main - main
pull_request: - staging-ci
paths-ignore:
- 'scripts/**'
jobs: jobs:
deploy: deploy:
name: deploy on staging.testrun.org, and run tests name: deploy on staging.testrun.org, and run tests
runs-on: ubuntu-latest runs-on: ubuntu-latest
concurrency:
group: staging-deploy
cancel-in-progress: true
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
@@ -24,57 +19,48 @@ jobs:
echo "${{ secrets.STAGING_SSH_KEY }}" >> ~/.ssh/id_ed25519 echo "${{ secrets.STAGING_SSH_KEY }}" >> ~/.ssh/id_ed25519
chmod 600 ~/.ssh/id_ed25519 chmod 600 ~/.ssh/id_ed25519
ssh-keyscan staging.testrun.org > ~/.ssh/known_hosts ssh-keyscan staging.testrun.org > ~/.ssh/known_hosts
# save previous acme & dkim state # rsync -avz root@staging.testrun.org:/var/lib/acme . || true
rsync -avz root@staging.testrun.org:/var/lib/acme . || true # rsync -avz root@staging.testrun.org:/var/lib/rspamd/dkim . || true
rsync -avz root@staging.testrun.org:/etc/dkimkeys . || true
# store previous acme & dkim state on ns.testrun.org, if it contains useful certs
if [ -f dkimkeys/opendkim.private ]; then rsync -avz -e "ssh -o StrictHostKeyChecking=accept-new" dkimkeys root@ns.testrun.org:/tmp/ || true; fi
if [ -z "$(ls -A acme/certs)" ]; then rsync -avz -e "ssh -o StrictHostKeyChecking=accept-new" acme root@ns.testrun.org:/tmp/ || true; fi
- name: rebuild staging.testrun.org to have a clean VPS #- name: rebuild staging.testrun.org to have a clean VPS
run: | # run: |
curl -X POST \ # curl -X POST \
-H "Authorization: Bearer ${{ secrets.HETZNER_API_TOKEN }}" \ # -H "Authorization: Bearer ${{ secrets.HETZNER_API_TOKEN }}" \
-H "Content-Type: application/json" \ # -H "Content-Type: application/json" \
-d '{"image":"debian-12"}' \ # -d '{"image":"debian-12"}' \
"https://api.hetzner.cloud/v1/servers/${{ secrets.STAGING_SERVER_ID }}/actions/rebuild" # "https://api.hetzner.cloud/v1/servers/${{ secrets.STAGING_SERVER_ID }}/actions/rebuild"
- run: scripts/initenv.sh - run: scripts/initenv.sh
- name: append venv/bin to PATH - name: append venv/bin to PATH
run: echo venv/bin >>$GITHUB_PATH run: echo venv/bin >>$GITHUB_PATH
- name: upload TLS cert after rebuilding
run: |
echo " --- wait until staging.testrun.org VPS is rebuilt --- "
rm ~/.ssh/known_hosts
while ! ssh -o ConnectTimeout=180 -o StrictHostKeyChecking=accept-new -v root@staging.testrun.org id -u ; do sleep 1 ; done
ssh -o StrictHostKeyChecking=accept-new -v root@staging.testrun.org id -u
# download acme & dkim state from ns.testrun.org
rsync -e "ssh -o StrictHostKeyChecking=accept-new" -avz root@ns.testrun.org:/tmp/acme acme-restore || true
rsync -avz root@ns.testrun.org:/tmp/dkimkeys dkimkeys-restore || true
# restore acme & dkim state to staging.testrun.org
rsync -avz acme-restore/acme/ root@staging.testrun.org:/var/lib/acme || true
rsync -avz dkimkeys-restore/dkimkeys/ root@staging.testrun.org:/etc/dkimkeys || true
ssh -o StrictHostKeyChecking=accept-new -v root@staging.testrun.org chown root:root -R /var/lib/acme
- name: run formatting checks - name: run formatting checks
run: cmdeploy fmt -v run: cmdeploy fmt -v
- name: run deploy-chatmail offline tests - name: run deploy-chatmail offline tests
run: pytest --pyargs cmdeploy run: pytest --pyargs cmdeploy
#- name: upload TLS cert after rebuilding
# run: |
# echo " --- wait until staging.testrun.org VPS is rebuilt --- "
# rm ~/.ssh/known_hosts
# while ! ssh -o ConnectTimeout=180 -o StrictHostKeyChecking=accept-new -v root@staging.testrun.org id -u ; do sleep 1 ; done
# ssh -o StrictHostKeyChecking=accept-new -v root@staging.testrun.org id -u
# rsync -avz acme root@staging.testrun.org:/var/lib/ || true
# rsync -avz dkim root@staging.testrun.org:/var/lib/rspamd/ || true
- run: cmdeploy init staging.testrun.org - run: cmdeploy init staging.testrun.org
- run: cmdeploy run - run: cmdeploy run
- name: set DNS entries - name: set DNS entries
run: | run: |
ssh -o StrictHostKeyChecking=accept-new -v root@staging.testrun.org chown opendkim:opendkim -R /etc/dkimkeys #ssh -o StrictHostKeyChecking=accept-new -v root@staging.testrun.org chown _rspamd:_rspamd -R /var/lib/rspamd/dkim
cmdeploy dns --zonefile staging-generated.zone cmdeploy dns --zonefile staging-generated.zone
cat staging-generated.zone >> .github/workflows/staging.testrun.org-default.zone cat staging-generated.zone >> .github/workflows/staging.testrun.org-default.zone
cat .github/workflows/staging.testrun.org-default.zone cat .github/workflows/staging.testrun.org-default.zone
scp .github/workflows/staging.testrun.org-default.zone root@ns.testrun.org:/etc/nsd/staging.testrun.org.zone scp -o StrictHostKeyChecking=accept-new .github/workflows/staging.testrun.org-default.zone root@ns.testrun.org:/etc/nsd/staging.testrun.org.zone
ssh root@ns.testrun.org nsd-checkzone staging.testrun.org /etc/nsd/staging.testrun.org.zone ssh root@ns.testrun.org nsd-checkzone staging.testrun.org /etc/nsd/staging.testrun.org.zone
ssh root@ns.testrun.org systemctl reload nsd ssh root@ns.testrun.org systemctl reload nsd

View File

@@ -1,26 +1,5 @@
# Changelog for chatmail deployment # Changelog for chatmail deployment
## untagged
## 1.2.0 - 2024-04-04
- Install dig on the server to resolve DNS records
([#267](https://github.com/deltachat/chatmail/pull/267))
- preserve notification order and exponentially backoff with
retries for tokens where we didn't get a successful return
([#265](https://github.com/deltachat/chatmail/pull/263))
- Run chatmail-metadata and doveauth as vmail
([#261](https://github.com/deltachat/chatmail/pull/261))
- Apply systemd restrictions to echobot
([#259](https://github.com/deltachat/chatmail/pull/259))
- re-enable running the CI in pull requests, but not concurrently
([#258](https://github.com/deltachat/chatmail/pull/258))
## 1.1.0 - 2024-03-28 ## 1.1.0 - 2024-03-28
### The changelog starts to record changes from March 15th, 2024 ### The changelog starts to record changes from March 15th, 2024

View File

@@ -159,27 +159,4 @@ While this file is present, account creation will be blocked.
Delta Chat apps will, however, discover all ports and configurations Delta Chat apps will, however, discover all ports and configurations
automatically by reading the [autoconfig XML file](https://www.ietf.org/archive/id/draft-bucksch-autoconfig-00.html) from the chatmail service. automatically by reading the [autoconfig XML file](https://www.ietf.org/archive/id/draft-bucksch-autoconfig-00.html) from the chatmail service.
## Email authentication
chatmail servers rely on [DKIM](https://www.rfc-editor.org/rfc/rfc6376)
to authenticate incoming emails.
Incoming emails must have a valid DKIM signature with
Signing Domain Identifier (SDID, `d=` parameter in the DKIM-Signature header)
equal to the `From:` header domain.
This property is checked by OpenDKIM screen policy script
before validating the signatures.
This correpsonds to strict [DMARC](https://www.rfc-editor.org/rfc/rfc7489) alignment (`adkim=s`),
but chatmail does not rely on DMARC and does not consult the sender policy published in DMARC records.
Other legacy authentication mechanisms such as [iprev](https://www.rfc-editor.org/rfc/rfc8601#section-2.7.3)
and [SPF](https://www.rfc-editor.org/rfc/rfc7208) are also not taken into account.
If there is no valid DKIM signature on the incoming email,
the sender receives a "5.7.1 No valid DKIM signature found" error.
Outgoing emails must be sent over authenticated connection
with envelope MAIL FROM (return path) corresponding to the login.
This is ensured by Postfix which maps login username
to MAIL FROM with
[`smtpd_sender_login_maps`](https://www.postfix.org/postconf.5.html#smtpd_sender_login_maps)
and rejects incorrectly authenticated emails with [`reject_sender_login_mismatch`](reject_sender_login_mismatch) policy.
`From:` header must correspond to envelope MAIL FROM,
this is ensured by `filtermail` proxy.

View File

@@ -1,3 +1,4 @@
include src/chatmaild/*.f
include src/chatmaild/ini/*.ini.f include src/chatmaild/ini/*.ini.f
include src/chatmaild/ini/*.ini include src/chatmaild/ini/*.ini
include src/chatmaild/tests/mail-data/* include src/chatmaild/tests/mail-data/*

View File

@@ -9,6 +9,7 @@ from socketserver import (
StreamRequestHandler, StreamRequestHandler,
ThreadingMixIn, ThreadingMixIn,
) )
import pwd
from .database import Database from .database import Database
from .config import read_config, Config from .config import read_config, Config
@@ -190,8 +191,9 @@ class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer):
def main(): def main():
socket = sys.argv[1] socket = sys.argv[1]
db = Database(sys.argv[2]) passwd_entry = pwd.getpwnam(sys.argv[2])
config = read_config(sys.argv[3]) db = Database(sys.argv[3])
config = read_config(sys.argv[4])
class Handler(StreamRequestHandler): class Handler(StreamRequestHandler):
def handle(self): def handle(self):
@@ -207,6 +209,7 @@ def main():
pass pass
with ThreadedUnixStreamServer(socket, Handler) as server: with ThreadedUnixStreamServer(socket, Handler) as server:
os.chown(socket, uid=passwd_entry.pw_uid, gid=passwd_entry.pw_gid)
try: try:
server.serve_forever() server.serve_forever()
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@@ -1,4 +1,7 @@
import pwd
from pathlib import Path from pathlib import Path
from threading import Thread, Event
from socketserver import ( from socketserver import (
UnixStreamServer, UnixStreamServer,
StreamRequestHandler, StreamRequestHandler,
@@ -7,9 +10,9 @@ from socketserver import (
import sys import sys
import logging import logging
import os import os
import requests
from .filedict import FileDict from .filedict import FileDict
from .notifier import Notifier
DICTPROXY_HELLO_CHAR = "H" DICTPROXY_HELLO_CHAR = "H"
@@ -20,49 +23,84 @@ DICTPROXY_SET_CHAR = "S"
DICTPROXY_COMMIT_TRANSACTION_CHAR = "C" DICTPROXY_COMMIT_TRANSACTION_CHAR = "C"
DICTPROXY_TRANSACTION_CHARS = "BSC" DICTPROXY_TRANSACTION_CHARS = "BSC"
# each SETMETADATA on this key appends to a list of unique device tokens
# which only ever get removed if the upstream indicates the token is invalid
METADATA_TOKEN_KEY = "devicetoken"
class Metadata:
# each SETMETADATA on this key appends to a list of unique device tokens
# which only ever get removed if the upstream indicates the token is invalid
DEVICETOKEN_KEY = "devicetoken"
class Notifier:
def __init__(self, vmail_dir): def __init__(self, vmail_dir):
self.vmail_dir = vmail_dir self.vmail_dir = vmail_dir
self.notification_dir = vmail_dir / "pending_notifications"
if not self.notification_dir.exists():
self.notification_dir.mkdir()
self.message_arrived_event = Event()
def get_metadata_dict(self, addr): def get_metadata_dict(self, addr):
return FileDict(self.vmail_dir / addr / "metadata.json") return FileDict(self.vmail_dir / addr / "metadata.json")
def add_token_to_addr(self, addr, token): def add_token(self, addr, token):
with self.get_metadata_dict(addr).modify() as data: with self.get_metadata_dict(addr).modify() as data:
tokens = data.setdefault(self.DEVICETOKEN_KEY, []) tokens = data.get(METADATA_TOKEN_KEY)
if token not in tokens: if tokens is None:
data[METADATA_TOKEN_KEY] = [token]
elif token not in tokens:
tokens.append(token) tokens.append(token)
def remove_token_from_addr(self, addr, token): def remove_token(self, addr, token):
with self.get_metadata_dict(addr).modify() as data: with self.get_metadata_dict(addr).modify() as data:
tokens = data.get(self.DEVICETOKEN_KEY, []) tokens = data.get(METADATA_TOKEN_KEY, [])
if token in tokens: try:
tokens.remove(token) tokens.remove(token)
except ValueError:
pass
def get_tokens_for_addr(self, addr): def get_tokens(self, addr):
mdict = self.get_metadata_dict(addr).read() return self.get_metadata_dict(addr).read().get(METADATA_TOKEN_KEY, [])
return mdict.get(self.DEVICETOKEN_KEY, [])
def new_message_for_addr(self, addr):
self.notification_dir.joinpath(addr).touch()
self.message_arrived_event.set()
def thread_run_loop(self):
requests_session = requests.Session()
while 1:
self.message_arrived_event.wait()
self.message_arrived_event.clear()
self.thread_run_one(requests_session)
def thread_run_one(self, requests_session):
for addr_path in self.notification_dir.iterdir():
addr = addr_path.name
if "@" not in addr:
continue
for token in self.get_tokens(addr):
response = requests_session.post(
"https://notifications.delta.chat/notify",
data=token,
timeout=60,
)
if response.status_code == 410:
# 410 Gone status code
# means the token is no longer valid.
self.remove_token(addr, token)
addr_path.unlink()
def handle_dovecot_protocol(rfile, wfile, notifier, metadata): def handle_dovecot_protocol(rfile, wfile, notifier):
transactions = {} transactions = {}
while True: while True:
msg = rfile.readline().strip().decode() msg = rfile.readline().strip().decode()
if not msg: if not msg:
break break
res = handle_dovecot_request(msg, transactions, notifier, metadata) res = handle_dovecot_request(msg, transactions, notifier)
if res: if res:
wfile.write(res.encode("ascii")) wfile.write(res.encode("ascii"))
wfile.flush() wfile.flush()
def handle_dovecot_request(msg, transactions, notifier, metadata): def handle_dovecot_request(msg, transactions, notifier):
# see https://doc.dovecot.org/3.0/developer_manual/design/dict_protocol/ # see https://doc.dovecot.org/3.0/developer_manual/design/dict_protocol/
short_command = msg[0] short_command = msg[0]
parts = msg[1:].split("\t") parts = msg[1:].split("\t")
@@ -72,8 +110,8 @@ def handle_dovecot_request(msg, transactions, notifier, metadata):
if keyparts[0] == "priv": if keyparts[0] == "priv":
keyname = keyparts[2] keyname = keyparts[2]
addr = parts[1] addr = parts[1]
if keyname == metadata.DEVICETOKEN_KEY: if keyname == METADATA_TOKEN_KEY:
res = " ".join(metadata.get_tokens_for_addr(addr)) res = " ".join(notifier.get_tokens(addr))
return f"O{res}\n" return f"O{res}\n"
logging.warning("lookup ignored: %r", msg) logging.warning("lookup ignored: %r", msg)
return "N\n" return "N\n"
@@ -106,10 +144,10 @@ def handle_dovecot_request(msg, transactions, notifier, metadata):
keyname = parts[1].split("/") keyname = parts[1].split("/")
value = parts[2] if len(parts) > 2 else "" value = parts[2] if len(parts) > 2 else ""
addr = transactions[transaction_id]["addr"] addr = transactions[transaction_id]["addr"]
if keyname[0] == "priv" and keyname[2] == metadata.DEVICETOKEN_KEY: if keyname[0] == "priv" and keyname[2] == METADATA_TOKEN_KEY:
metadata.add_token_to_addr(addr, value) notifier.add_token(addr, value)
elif keyname[0] == "priv" and keyname[2] == "messagenew": elif keyname[0] == "priv" and keyname[2] == "messagenew":
notifier.new_message_for_addr(addr, metadata) notifier.new_message_for_addr(addr)
else: else:
# Transaction failed. # Transaction failed.
transactions[transaction_id]["res"] = "F\n" transactions[transaction_id]["res"] = "F\n"
@@ -120,23 +158,21 @@ class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer):
def main(): def main():
socket, vmail_dir = sys.argv[1:] socket, username, vmail_dir = sys.argv[1:]
passwd_entry = pwd.getpwnam(username)
vmail_dir = Path(vmail_dir) vmail_dir = Path(vmail_dir)
if not vmail_dir.exists(): if not vmail_dir.exists():
logging.error("vmail dir does not exist: %r", vmail_dir) logging.error("vmail dir does not exist: %r", vmail_dir)
return 1 return 1
queue_dir = vmail_dir / "pending_notifications" notifier = Notifier(vmail_dir)
queue_dir.mkdir(exist_ok=True)
metadata = Metadata(vmail_dir)
notifier = Notifier(queue_dir)
notifier.start_notification_threads(metadata.remove_token_from_addr)
class Handler(StreamRequestHandler): class Handler(StreamRequestHandler):
def handle(self): def handle(self):
try: try:
handle_dovecot_protocol(self.rfile, self.wfile, notifier, metadata) handle_dovecot_protocol(self.rfile, self.wfile, notifier)
except Exception: except Exception:
logging.exception("Exception in the dovecot dictproxy handler") logging.exception("Exception in the dovecot dictproxy handler")
raise raise
@@ -146,7 +182,17 @@ def main():
except FileNotFoundError: except FileNotFoundError:
pass pass
# start notifier thread for signalling new messages to
# Delta Chat notification server
t = Thread(target=notifier.thread_run_loop)
t.setDaemon(True)
t.start()
# let notifier thread run once for any pending notifications from last run
notifier.message_arrived_event.set()
with ThreadedUnixStreamServer(socket, Handler) as server: with ThreadedUnixStreamServer(socket, Handler) as server:
os.chown(socket, uid=passwd_entry.pw_uid, gid=passwd_entry.pw_gid)
try: try:
server.serve_forever() server.serve_forever()
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@@ -1,165 +0,0 @@
"""
This modules provides notification machinery for transmitting device tokens to
a central notification server which in turn contacts a phone provider's notification server
to trigger Delta Chat apps to retrieve messages and provide instant notifications to users.
The Notifier class arranges the queuing of tokens in separate PriorityQueues
from which NotifyThreads take and transmit them via HTTPS
to the `notifications.delta.chat` service.
The current lack of proper HTTP/2-support in Python leads us
to use multiple threads and connections to the Rust-implemented `notifications.delta.chat`
which itself uses HTTP/2 and thus only a single connection to phone-notification providers.
If a token fails to cause a successful notification
it is moved to a retry-number specific PriorityQueue
which handles all tokens that failed a particular number of times
and which are scheduled for retry using exponential back-off timing.
If a token notification would be scheduled more than DROP_DEADLINE seconds
after its first attempt, it is dropped with a log error.
Note that tokens are completely opaque to the notification machinery here
and will in the future be encrypted foreclosing all ability to distinguish
which device token ultimately goes to which phone-provider notification service,
or to understand the relation of "device tokens" and chatmail addresses.
The meaning and format of tokens is basically a matter of Delta-Chat Core and
the `notification.delta.chat` service.
"""
import os
import time
import math
import logging
from uuid import uuid4
from threading import Thread
from pathlib import Path
from queue import PriorityQueue
from dataclasses import dataclass
import requests
@dataclass
class PersistentQueueItem:
path: Path
addr: str
start_ts: int
token: str
def delete(self):
self.path.unlink(missing_ok=True)
@classmethod
def create(cls, queue_dir, addr, start_ts, token):
queue_id = uuid4().hex
start_ts = int(start_ts)
path = queue_dir.joinpath(queue_id)
tmp_path = path.with_name(path.name + ".tmp")
tmp_path.write_text(f"{addr}\n{start_ts}\n{token}")
os.rename(tmp_path, path)
return cls(path, addr, start_ts, token)
@classmethod
def read_from_path(cls, path):
addr, start_ts, token = path.read_text().split("\n", maxsplit=2)
return cls(path, addr, int(start_ts), token)
def __lt__(self, other):
return self.start_ts < other.start_ts
class Notifier:
URL = "https://notifications.delta.chat/notify"
CONNECTION_TIMEOUT = 60.0 # seconds until http-request is given up
BASE_DELAY = 8.0 # base seconds for exponential back-off delay
DROP_DEADLINE = 5 * 60 * 60 # drop notifications after 5 hours
def __init__(self, queue_dir):
self.queue_dir = queue_dir
max_tries = int(math.log(self.DROP_DEADLINE, self.BASE_DELAY)) + 1
self.retry_queues = [PriorityQueue() for _ in range(max_tries)]
def compute_delay(self, retry_num):
return 0 if retry_num == 0 else pow(self.BASE_DELAY, retry_num)
def new_message_for_addr(self, addr, metadata):
start_ts = int(time.time())
for token in metadata.get_tokens_for_addr(addr):
queue_item = PersistentQueueItem.create(
self.queue_dir, addr, start_ts, token
)
self.queue_for_retry(queue_item)
def requeue_persistent_queue_items(self):
for queue_path in self.queue_dir.iterdir():
if queue_path.name.endswith(".tmp"):
logging.warning("removing spurious queue item: %r", queue_path)
queue_path.unlink()
continue
queue_item = PersistentQueueItem.read_from_path(queue_path)
self.queue_for_retry(queue_item)
def queue_for_retry(self, queue_item, retry_num=0):
delay = self.compute_delay(retry_num)
when = int(time.time()) + delay
deadline = queue_item.start_ts + self.DROP_DEADLINE
if retry_num >= len(self.retry_queues) or when > deadline:
queue_item.delete()
logging.error("notification exceeded deadline: %r", queue_item.token)
return
self.retry_queues[retry_num].put((when, queue_item))
def start_notification_threads(self, remove_token_from_addr):
self.requeue_persistent_queue_items()
threads = {}
for retry_num in range(len(self.retry_queues)):
# use 4 threads for first-try tokens and less for subsequent tries
num_threads = 4 if retry_num == 0 else 2
threads[retry_num] = []
for _ in range(num_threads):
thread = NotifyThread(self, retry_num, remove_token_from_addr)
threads[retry_num].append(thread)
thread.start()
return threads
class NotifyThread(Thread):
def __init__(self, notifier, retry_num, remove_token_from_addr):
super().__init__(daemon=True)
self.notifier = notifier
self.retry_num = retry_num
self.remove_token_from_addr = remove_token_from_addr
def stop(self):
self.notifier.retry_queues[self.retry_num].put((None, None))
def run(self):
requests_session = requests.Session()
while self.retry_one(requests_session):
pass
def retry_one(self, requests_session, sleep=time.sleep):
when, queue_item = self.notifier.retry_queues[self.retry_num].get()
if when is None:
return False
wait_time = when - int(time.time())
if wait_time > 0:
sleep(wait_time)
self.perform_request_to_notification_server(requests_session, queue_item)
return True
def perform_request_to_notification_server(self, requests_session, queue_item):
timeout = self.notifier.CONNECTION_TIMEOUT
token = queue_item.token
try:
res = requests_session.post(self.notifier.URL, data=token, timeout=timeout)
except requests.exceptions.RequestException as e:
res = e
else:
if res.status_code in (200, 410):
if res.status_code == 410:
self.remove_token_from_addr(queue_item.addr, token)
queue_item.delete()
return
logging.warning("Notification request failed: %r", res)
self.notifier.queue_for_retry(queue_item, retry_num=self.retry_num + 1)

View File

@@ -1,32 +1,18 @@
import io import io
import pytest import pytest
import requests
import time
from chatmaild.metadata import ( from chatmaild.metadata import (
handle_dovecot_request, handle_dovecot_request,
handle_dovecot_protocol, handle_dovecot_protocol,
Metadata,
)
from chatmaild.notifier import (
Notifier, Notifier,
NotifyThread,
PersistentQueueItem,
) )
@pytest.fixture @pytest.fixture
def notifier(metadata): def notifier(tmp_path):
queue_dir = metadata.vmail_dir.joinpath("pending_notifications")
queue_dir.mkdir()
return Notifier(queue_dir)
@pytest.fixture
def metadata(tmp_path):
vmail_dir = tmp_path.joinpath("vmaildir") vmail_dir = tmp_path.joinpath("vmaildir")
vmail_dir.mkdir() vmail_dir.mkdir()
return Metadata(vmail_dir) return Notifier(vmail_dir)
@pytest.fixture @pytest.fixture
@@ -39,100 +25,72 @@ def testaddr2():
return "user2@example.org" return "user2@example.org"
@pytest.fixture def test_notifier_persistence(tmp_path, testaddr, testaddr2):
def token(): notifier1 = Notifier(tmp_path)
return "01234" notifier2 = Notifier(tmp_path)
assert not notifier1.get_tokens(testaddr)
assert not notifier2.get_tokens(testaddr)
notifier1.add_token(testaddr, "01234")
notifier1.add_token(testaddr2, "456")
assert notifier2.get_tokens(testaddr) == ["01234"]
assert notifier2.get_tokens(testaddr2) == ["456"]
notifier2.remove_token(testaddr, "01234")
assert not notifier1.get_tokens(testaddr)
assert notifier1.get_tokens(testaddr2) == ["456"]
def get_mocked_requests(statuslist): def test_remove_nonexisting(tmp_path, testaddr):
class ReqMock: notifier1 = Notifier(tmp_path)
requests = [] notifier1.add_token(testaddr, "123")
notifier1.remove_token(testaddr, "1l23k1l2k3")
def post(self, url, data, timeout): assert notifier1.get_tokens(testaddr) == ["123"]
self.requests.append((url, data, timeout))
res = statuslist.pop(0)
if isinstance(res, Exception):
raise res
class Result:
status_code = res
return Result()
return ReqMock()
def test_metadata_persistence(tmp_path, testaddr, testaddr2): def test_notifier_delete_without_set(notifier, testaddr):
metadata1 = Metadata(tmp_path) notifier.remove_token(testaddr, "123")
metadata2 = Metadata(tmp_path) assert not notifier.get_tokens(testaddr)
assert not metadata1.get_tokens_for_addr(testaddr)
assert not metadata2.get_tokens_for_addr(testaddr)
metadata1.add_token_to_addr(testaddr, "01234")
metadata1.add_token_to_addr(testaddr2, "456")
assert metadata2.get_tokens_for_addr(testaddr) == ["01234"]
assert metadata2.get_tokens_for_addr(testaddr2) == ["456"]
metadata2.remove_token_from_addr(testaddr, "01234")
assert not metadata1.get_tokens_for_addr(testaddr)
assert metadata1.get_tokens_for_addr(testaddr2) == ["456"]
def test_remove_nonexisting(metadata, tmp_path, testaddr): def test_handle_dovecot_request_lookup_fails(notifier, testaddr):
metadata.add_token_to_addr(testaddr, "123") res = handle_dovecot_request(f"Lpriv/123/chatmail\t{testaddr}", {}, notifier)
metadata.remove_token_from_addr(testaddr, "1l23k1l2k3")
assert metadata.get_tokens_for_addr(testaddr) == ["123"]
def test_notifier_remove_without_set(metadata, testaddr):
metadata.remove_token_from_addr(testaddr, "123")
assert not metadata.get_tokens_for_addr(testaddr)
def test_handle_dovecot_request_lookup_fails(notifier, metadata, testaddr):
res = handle_dovecot_request(
f"Lpriv/123/chatmail\t{testaddr}", {}, notifier, metadata
)
assert res == "N\n" assert res == "N\n"
def test_handle_dovecot_request_happy_path(notifier, metadata, testaddr, token): def test_handle_dovecot_request_happy_path(notifier, testaddr):
transactions = {} transactions = {}
# set device token in a transaction # set device token in a transaction
tx = "1111" tx = "1111"
msg = f"B{tx}\t{testaddr}" msg = f"B{tx}\t{testaddr}"
res = handle_dovecot_request(msg, transactions, notifier, metadata) res = handle_dovecot_request(msg, transactions, notifier)
assert not res and not metadata.get_tokens_for_addr(testaddr) assert not res and not notifier.get_tokens(testaddr)
assert transactions == {tx: dict(addr=testaddr, res="O\n")} assert transactions == {tx: dict(addr=testaddr, res="O\n")}
msg = f"S{tx}\tpriv/guid00/devicetoken\t{token}" msg = f"S{tx}\tpriv/guid00/devicetoken\t01234"
res = handle_dovecot_request(msg, transactions, notifier, metadata) res = handle_dovecot_request(msg, transactions, notifier)
assert not res assert not res
assert len(transactions) == 1 assert len(transactions) == 1
assert metadata.get_tokens_for_addr(testaddr) == [token] assert notifier.get_tokens(testaddr) == ["01234"]
msg = f"C{tx}" msg = f"C{tx}"
res = handle_dovecot_request(msg, transactions, notifier, metadata) res = handle_dovecot_request(msg, transactions, notifier)
assert res == "O\n" assert res == "O\n"
assert len(transactions) == 0 assert len(transactions) == 0
assert metadata.get_tokens_for_addr(testaddr) == [token] assert notifier.get_tokens(testaddr) == ["01234"]
# trigger notification for incoming message # trigger notification for incoming message
tx2 = "2222" tx2 = "2222"
assert ( assert handle_dovecot_request(f"B{tx2}\t{testaddr}", transactions, notifier) is None
handle_dovecot_request(f"B{tx2}\t{testaddr}", transactions, notifier, metadata)
is None
)
msg = f"S{tx2}\tpriv/guid00/messagenew" msg = f"S{tx2}\tpriv/guid00/messagenew"
assert handle_dovecot_request(msg, transactions, notifier, metadata) is None assert handle_dovecot_request(msg, transactions, notifier) is None
queue_item = notifier.retry_queues[0].get()[1] assert notifier.message_arrived_event.is_set()
assert queue_item.token == token assert handle_dovecot_request(f"C{tx2}", transactions, notifier) == "O\n"
assert handle_dovecot_request(f"C{tx2}", transactions, notifier, metadata) == "O\n"
assert not transactions assert not transactions
assert queue_item.path.exists() assert notifier.notification_dir.joinpath(testaddr).exists()
def test_handle_dovecot_protocol_set_devicetoken(metadata, notifier): def test_handle_dovecot_protocol_set_devicetoken(notifier):
rfile = io.BytesIO( rfile = io.BytesIO(
b"\n".join( b"\n".join(
[ [
@@ -144,12 +102,12 @@ def test_handle_dovecot_protocol_set_devicetoken(metadata, notifier):
) )
) )
wfile = io.BytesIO() wfile = io.BytesIO()
handle_dovecot_protocol(rfile, wfile, notifier, metadata) handle_dovecot_protocol(rfile, wfile, notifier)
assert wfile.getvalue() == b"O\n" assert wfile.getvalue() == b"O\n"
assert metadata.get_tokens_for_addr("user@example.org") == ["01234"] assert notifier.get_tokens("user@example.org") == ["01234"]
def test_handle_dovecot_protocol_set_get_devicetoken(metadata, notifier): def test_handle_dovecot_protocol_set_get_devicetoken(notifier):
rfile = io.BytesIO( rfile = io.BytesIO(
b"\n".join( b"\n".join(
[ [
@@ -161,19 +119,19 @@ def test_handle_dovecot_protocol_set_get_devicetoken(metadata, notifier):
) )
) )
wfile = io.BytesIO() wfile = io.BytesIO()
handle_dovecot_protocol(rfile, wfile, notifier, metadata) handle_dovecot_protocol(rfile, wfile, notifier)
assert metadata.get_tokens_for_addr("user@example.org") == ["01234"] assert notifier.get_tokens("user@example.org") == ["01234"]
assert wfile.getvalue() == b"O\n" assert wfile.getvalue() == b"O\n"
rfile = io.BytesIO( rfile = io.BytesIO(
b"\n".join([b"HELLO", b"Lpriv/0123/devicetoken\tuser@example.org"]) b"\n".join([b"HELLO", b"Lpriv/0123/devicetoken\tuser@example.org"])
) )
wfile = io.BytesIO() wfile = io.BytesIO()
handle_dovecot_protocol(rfile, wfile, notifier, metadata) handle_dovecot_protocol(rfile, wfile, notifier)
assert wfile.getvalue() == b"O01234\n" assert wfile.getvalue() == b"O01234\n"
def test_handle_dovecot_protocol_iterate(metadata, notifier): def test_handle_dovecot_protocol_iterate(notifier):
rfile = io.BytesIO( rfile = io.BytesIO(
b"\n".join( b"\n".join(
[ [
@@ -183,116 +141,90 @@ def test_handle_dovecot_protocol_iterate(metadata, notifier):
) )
) )
wfile = io.BytesIO() wfile = io.BytesIO()
handle_dovecot_protocol(rfile, wfile, notifier, metadata) handle_dovecot_protocol(rfile, wfile, notifier)
assert wfile.getvalue() == b"\n" assert wfile.getvalue() == b"\n"
def test_notifier_thread_deletes_persistent_file(metadata, notifier, testaddr): def test_handle_dovecot_protocol_messagenew(notifier):
reqmock = get_mocked_requests([200]) rfile = io.BytesIO(
metadata.add_token_to_addr(testaddr, "01234") b"\n".join(
notifier.new_message_for_addr(testaddr, metadata) [
NotifyThread(notifier, 0, None).retry_one(reqmock) b"HELLO",
url, data, timeout = reqmock.requests[0] b"Btx01\tuser@example.org",
b"Stx01\tpriv/guid00/messagenew",
b"Ctx01",
]
)
)
wfile = io.BytesIO()
handle_dovecot_protocol(rfile, wfile, notifier)
assert wfile.getvalue() == b"O\n"
assert notifier.message_arrived_event.is_set()
assert notifier.notification_dir.joinpath("user@example.org").exists()
def test_notifier_thread_run(notifier, testaddr):
requests = []
class ReqMock:
def post(self, url, data, timeout):
requests.append((url, data, timeout))
class Result:
status_code = 200
return Result()
notifier.add_token(testaddr, "01234")
notifier.new_message_for_addr(testaddr)
notifier.thread_run_one(ReqMock())
url, data, timeout = requests[0]
assert data == "01234" assert data == "01234"
assert metadata.get_tokens_for_addr(testaddr) == ["01234"] assert notifier.get_tokens(testaddr) == ["01234"]
notifier.requeue_persistent_queue_items()
assert notifier.retry_queues[0].qsize() == 0
@pytest.mark.parametrize("status", [requests.exceptions.RequestException(), 404, 500]) def test_multi_device_notifier(notifier, testaddr):
def test_notifier_thread_connection_failures( requests = []
metadata, notifier, testaddr, status, caplog
):
"""test that tokens keep getting retried until they are given up."""
metadata.add_token_to_addr(testaddr, "01234")
notifier.new_message_for_addr(testaddr, metadata)
notifier.NOTIFICATION_RETRY_DELAY = 5
max_tries = len(notifier.retry_queues)
for i in range(max_tries):
caplog.clear()
reqmock = get_mocked_requests([status])
sleep_calls = []
NotifyThread(notifier, i, None).retry_one(reqmock, sleep=sleep_calls.append)
assert notifier.retry_queues[i].qsize() == 0
assert "request failed" in caplog.records[0].msg
if i > 0:
assert len(sleep_calls) == 1
if i + 1 < max_tries:
assert notifier.retry_queues[i + 1].qsize() == 1
assert len(caplog.records) == 1
else:
assert len(caplog.records) == 2
assert "deadline" in caplog.records[1].msg
notifier.requeue_persistent_queue_items()
assert notifier.retry_queues[0].qsize() == 0
class ReqMock:
def post(self, url, data, timeout):
requests.append((url, data, timeout))
def test_requeue_removes_tmp_files(notifier, metadata, testaddr, caplog): class Result:
metadata.add_token_to_addr(testaddr, "01234") status_code = 200
notifier.new_message_for_addr(testaddr, metadata)
p = notifier.queue_dir.joinpath("1203981203.tmp")
p.touch()
notifier2 = notifier.__class__(notifier.queue_dir)
notifier2.requeue_persistent_queue_items()
assert "spurious" in caplog.records[0].msg
assert not p.exists()
assert notifier2.retry_queues[0].qsize() == 1
when, queue_item = notifier2.retry_queues[0].get()
assert when <= int(time.time())
assert queue_item.addr == testaddr
return Result()
def test_start_and_stop_notification_threads(notifier, testaddr): notifier.add_token(testaddr, "01234")
threads = notifier.start_notification_threads(None) notifier.add_token(testaddr, "56789")
for retry_num, threadlist in threads.items(): notifier.new_message_for_addr(testaddr)
for t in threadlist: notifier.thread_run_one(ReqMock())
t.stop() url, data, timeout = requests[0]
t.join()
def test_multi_device_notifier(metadata, notifier, testaddr):
metadata.add_token_to_addr(testaddr, "01234")
metadata.add_token_to_addr(testaddr, "56789")
notifier.new_message_for_addr(testaddr, metadata)
reqmock = get_mocked_requests([200, 200])
NotifyThread(notifier, 0, None).retry_one(reqmock)
NotifyThread(notifier, 0, None).retry_one(reqmock)
assert notifier.retry_queues[0].qsize() == 0
assert notifier.retry_queues[1].qsize() == 0
url, data, timeout = reqmock.requests[0]
assert data == "01234" assert data == "01234"
url, data, timeout = reqmock.requests[1] url, data, timeout = requests[1]
assert data == "56789" assert data == "56789"
assert metadata.get_tokens_for_addr(testaddr) == ["01234", "56789"] assert notifier.get_tokens(testaddr) == ["01234", "56789"]
def test_notifier_thread_run_gone_removes_token(metadata, notifier, testaddr): def test_notifier_thread_run_gone_removes_token(notifier, testaddr):
metadata.add_token_to_addr(testaddr, "01234") requests = []
metadata.add_token_to_addr(testaddr, "45678")
notifier.new_message_for_addr(testaddr, metadata)
reqmock = get_mocked_requests([410, 200]) class ReqMock:
NotifyThread(notifier, 0, metadata.remove_token_from_addr).retry_one(reqmock) def post(self, url, data, timeout):
NotifyThread(notifier, 0, None).retry_one(reqmock) requests.append((url, data, timeout))
url, data, timeout = reqmock.requests[0]
class Result:
status_code = 410 if data == "01234" else 200
return Result()
notifier.add_token(testaddr, "01234")
notifier.new_message_for_addr(testaddr)
assert notifier.get_tokens(testaddr) == ["01234"]
notifier.add_token(testaddr, "45678")
notifier.thread_run_one(ReqMock())
url, data, timeout = requests[0]
assert data == "01234" assert data == "01234"
url, data, timeout = reqmock.requests[1] url, data, timeout = requests[1]
assert data == "45678" assert data == "45678"
assert metadata.get_tokens_for_addr(testaddr) == ["45678"] assert notifier.get_tokens(testaddr) == ["45678"]
assert notifier.retry_queues[0].qsize() == 0
assert notifier.retry_queues[1].qsize() == 0
def test_persistent_queue_items(tmp_path, testaddr, token):
queue_item = PersistentQueueItem.create(tmp_path, testaddr, 432, token)
assert queue_item.addr == testaddr
assert queue_item.start_ts == 432
assert queue_item.token == token
item2 = PersistentQueueItem.read_from_path(queue_item.path)
assert item2.addr == testaddr
assert item2.start_ts == 432
assert item2.token == token
assert item2 == queue_item
item2.delete()
assert not item2.path.exists()
assert not queue_item < item2 and not item2 < queue_item

View File

@@ -135,6 +135,20 @@ def _configure_opendkim(domain: str, dkim_selector: str = "dkim") -> bool:
"""Configures OpenDKIM""" """Configures OpenDKIM"""
need_restart = False need_restart = False
server.group(name="Create opendkim group", group="opendkim", system=True)
server.user(
name="Create opendkim user",
user="opendkim",
groups=["opendkim"],
system=True,
)
server.user(
name="Add postfix user to opendkim group for socket access",
user="postfix",
groups=["opendkim"],
system=True,
)
main_config = files.template( main_config = files.template(
src=importlib.resources.files(__package__).joinpath("opendkim/opendkim.conf"), src=importlib.resources.files(__package__).joinpath("opendkim/opendkim.conf"),
dest="/etc/opendkim.conf", dest="/etc/opendkim.conf",
@@ -462,29 +476,9 @@ def deploy_chatmail(config_path: Path) -> None:
from .www import build_webpages from .www import build_webpages
apt.update(name="apt update", cache_time=24 * 3600)
server.group(name="Create vmail group", group="vmail", system=True) server.group(name="Create vmail group", group="vmail", system=True)
server.user(name="Create vmail user", user="vmail", group="vmail", system=True) server.user(name="Create vmail user", user="vmail", group="vmail", system=True)
server.group(name="Create opendkim group", group="opendkim", system=True)
server.user(
name="Create opendkim user",
user="opendkim",
groups=["opendkim"],
system=True,
)
server.user(
name="Add postfix user to opendkim group for socket access",
user="postfix",
groups=["opendkim"],
system=True,
)
server.shell(
name="Fix file owner in /home/vmail",
commands=["test -d /home/vmail && chown -R vmail:vmail /home/vmail"],
)
apt.update(name="apt update", cache_time=24 * 3600)
apt.packages( apt.packages(
name="Install rsync", name="Install rsync",
packages=["rsync"], packages=["rsync"],
@@ -571,17 +565,6 @@ def deploy_chatmail(config_path: Path) -> None:
restarted=mta_sts_need_restart, restarted=mta_sts_need_restart,
) )
# Dovecot should be started before Postfix
# because it creates authentication socket
# required by Postfix.
systemd.service(
name="Start and enable Dovecot",
service="dovecot.service",
running=True,
enabled=True,
restarted=dovecot_need_restart,
)
systemd.service( systemd.service(
name="Start and enable Postfix", name="Start and enable Postfix",
service="postfix.service", service="postfix.service",
@@ -590,6 +573,14 @@ def deploy_chatmail(config_path: Path) -> None:
restarted=postfix_need_restart, restarted=postfix_need_restart,
) )
systemd.service(
name="Start and enable Dovecot",
service="dovecot.service",
running=True,
enabled=True,
restarted=dovecot_need_restart,
)
systemd.service( systemd.service(
name="Start and enable nginx", name="Start and enable nginx",
service="nginx.service", service="nginx.service",

View File

@@ -11,11 +11,6 @@ class DNS:
self.session = requests.Session() self.session = requests.Session()
self.out = out self.out = out
self.ssh = f"ssh root@{mail_domain} -- " self.ssh = f"ssh root@{mail_domain} -- "
self.out.shell_output(
f"{ self.ssh }'apt-get update && apt-get install -y dnsutils'",
timeout=60,
no_print=True,
)
try: try:
self.shell(f"unbound-control flush_zone {mail_domain}") self.shell(f"unbound-control flush_zone {mail_domain}")
except subprocess.CalledProcessError: except subprocess.CalledProcessError:

View File

@@ -1,4 +1,4 @@
uri = proxy:/run/doveauth/doveauth.socket:auth uri = proxy:/run/dovecot/doveauth.socket:auth
iterate_disable = yes iterate_disable = yes
default_pass_scheme = plain default_pass_scheme = plain
# %E escapes characters " (double quote), ' (single quote) and \ (backslash) with \ (backslash). # %E escapes characters " (double quote), ' (single quote) and \ (backslash) with \ (backslash).

View File

@@ -78,7 +78,7 @@ mail_privileged_group = vmail
## ##
# Pass all IMAP METADATA requests to the server implementing Dovecot's dict protocol. # Pass all IMAP METADATA requests to the server implementing Dovecot's dict protocol.
mail_attribute_dict = proxy:/run/chatmail-metadata/metadata.socket:metadata mail_attribute_dict = proxy:/run/dovecot/metadata.socket:metadata
# Enable IMAP COMPRESS (RFC 4978). # Enable IMAP COMPRESS (RFC 4978).
# <https://datatracker.ietf.org/doc/html/rfc4978.html> # <https://datatracker.ietf.org/doc/html/rfc4978.html>

View File

@@ -2,11 +2,9 @@
Description=Chatmail dict proxy for IMAP METADATA Description=Chatmail dict proxy for IMAP METADATA
[Service] [Service]
ExecStart={execpath} /run/chatmail-metadata/metadata.socket /home/vmail/mail/{mail_domain} ExecStart={execpath} /run/dovecot/metadata.socket vmail /home/vmail/mail/{mail_domain}
Restart=always Restart=always
RestartSec=30 RestartSec=30
User=vmail
RuntimeDirectory=chatmail-metadata
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target

View File

@@ -2,11 +2,9 @@
Description=Chatmail dict authentication proxy for dovecot Description=Chatmail dict authentication proxy for dovecot
[Service] [Service]
ExecStart={execpath} /run/doveauth/doveauth.socket /home/vmail/passdb.sqlite {config_path} ExecStart={execpath} /run/dovecot/doveauth.socket vmail /home/vmail/passdb.sqlite {config_path}
Restart=always Restart=always
RestartSec=30 RestartSec=30
User=vmail
RuntimeDirectory=doveauth
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target

View File

@@ -7,44 +7,5 @@ Environment="PATH={remote_venv_dir}:$PATH"
Restart=always Restart=always
RestartSec=30 RestartSec=30
# Apply security restrictions suggested by
# systemd-analyze security echobot.service
CapabilityBoundingSet=
LockPersonality=true
MemoryDenyWriteExecute=true
NoNewPrivileges=true
PrivateDevices=true
PrivateMounts=true
PrivateTmp=true
PrivateUsers=true
ProtectClock=true
ProtectControlGroups=true
ProtectHostname=true
ProtectKernelLogs=true
ProtectKernelModules=true
ProtectKernelTunables=true
ProtectProc=noaccess
# Should be "strict", but we currently write /accounts folder in a protected path
ProtectSystem=full
RemoveIPC=true
RestrictAddressFamilies=AF_INET AF_INET6
RestrictNamespaces=true
RestrictRealtime=true
RestrictSUIDSGID=true
SystemCallArchitectures=native
SystemCallFilter=~@clock
SystemCallFilter=~@cpu-emulation
SystemCallFilter=~@debug
SystemCallFilter=~@module
SystemCallFilter=~@mount
SystemCallFilter=~@obsolete
SystemCallFilter=~@raw-io
SystemCallFilter=~@reboot
SystemCallFilter=~@resources
SystemCallFilter=~@swap
UMask=0077
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target