From 1ce0a2b0ba9f4d4176f1e6b5e2b3f65091356cb5 Mon Sep 17 00:00:00 2001 From: link2xt Date: Tue, 11 Jun 2024 00:34:27 +0000 Subject: [PATCH] Improve filtermail checks for encrypted messages Ensure that first part only contains "Version: 1" and second part only contains base64 payload enclosed in "-----BEGIN PGP MESSAGE-----" and "-----END PGP MESSAGE-----". --- CHANGELOG.md | 3 +++ chatmaild/src/chatmaild/filtermail.py | 33 ++++++++++++++++++++++++- chatmaild/src/chatmaild/tests/plugin.py | 4 ++- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 716dd269..5b7a72fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## untagged +- improve filtermail checks for encrypted messages + ([#320](https://github.com/deltachat/chatmail/pull/320)) + ## 1.3.0 - 2024-06-06 - don't check necessary DNS records on cmdeploy init anymore diff --git a/chatmaild/src/chatmaild/filtermail.py b/chatmaild/src/chatmaild/filtermail.py index 87231fd2..d65b0e57 100644 --- a/chatmaild/src/chatmaild/filtermail.py +++ b/chatmaild/src/chatmaild/filtermail.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import asyncio import logging +import re import sys import time from email import policy @@ -12,9 +13,15 @@ from aiosmtpd.controller import Controller from .config import read_config +# Regular expression that matches multi-line base64. +BASE64_REGEX = re.compile(r"([A-Za-z0-9+/=]+\r\n)+") + def check_encrypted(message): - """Check that the message is an OpenPGP-encrypted message.""" + """Check that the message is an OpenPGP-encrypted message. + + MIME structure of the message must correspond to . + """ if not message.is_multipart(): return False if message.get("subject") != "...": @@ -23,12 +30,36 @@ def check_encrypted(message): return False parts_count = 0 for part in message.iter_parts(): + # We explicitly check Content-Type of each part later, + # but this is to be absolutely sure `get_payload()` returns string and not list. + if part.is_multipart(): + return False + if parts_count == 0: if part.get_content_type() != "application/pgp-encrypted": return False + + payload = part.get_payload() + if payload.strip() != "Version: 1": + return False elif parts_count == 1: if part.get_content_type() != "application/octet-stream": return False + + payload = part.get_payload() + + prefix = "-----BEGIN PGP MESSAGE-----\r\n\r\n" + if not payload.startswith(prefix): + return False + payload = payload.removeprefix(prefix) + + suffix = "-----END PGP MESSAGE-----\r\n\r\n" + if not payload.endswith(suffix): + return False + payload = payload.removesuffix(suffix) + + if not BASE64_REGEX.fullmatch(payload): + return False else: return False parts_count += 1 diff --git a/chatmaild/src/chatmaild/tests/plugin.py b/chatmaild/src/chatmaild/tests/plugin.py index 7f34fa7f..061067c6 100644 --- a/chatmaild/src/chatmaild/tests/plugin.py +++ b/chatmaild/src/chatmaild/tests/plugin.py @@ -68,7 +68,9 @@ def maildata(request): assert datadir.exists(), datadir def maildata(name, from_addr, to_addr): - data = datadir.joinpath(name).read_text() + # Using `.read_bytes().decode()` instead of `.read_text()` to preserve newlines. + data = datadir.joinpath(name).read_bytes().decode() + text = data.format(from_addr=from_addr, to_addr=to_addr) return BytesParser(policy=policy.default).parsebytes(text.encode())