mirror of
https://github.com/chatmail/relay.git
synced 2026-05-10 16:04:37 +00:00
Improve filtermail checks for encrypted messages
Ensure that first part only contains "Version: 1" and second part only contains base64 payload enclosed in "-----BEGIN PGP MESSAGE-----" and "-----END PGP MESSAGE-----".
This commit is contained in:
@@ -2,6 +2,9 @@
|
||||
|
||||
## untagged
|
||||
|
||||
- improve filtermail checks for encrypted messages
|
||||
([#320](https://github.com/deltachat/chatmail/pull/320))
|
||||
|
||||
## 1.3.0 - 2024-06-06
|
||||
|
||||
- don't check necessary DNS records on cmdeploy init anymore
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from email import policy
|
||||
@@ -12,9 +13,15 @@ from aiosmtpd.controller import Controller
|
||||
|
||||
from .config import read_config
|
||||
|
||||
# Regular expression that matches multi-line base64.
|
||||
BASE64_REGEX = re.compile(r"([A-Za-z0-9+/=]+\r\n)+")
|
||||
|
||||
|
||||
def check_encrypted(message):
|
||||
"""Check that the message is an OpenPGP-encrypted message."""
|
||||
"""Check that the message is an OpenPGP-encrypted message.
|
||||
|
||||
MIME structure of the message must correspond to <https://www.rfc-editor.org/rfc/rfc3156>.
|
||||
"""
|
||||
if not message.is_multipart():
|
||||
return False
|
||||
if message.get("subject") != "...":
|
||||
@@ -23,12 +30,36 @@ def check_encrypted(message):
|
||||
return False
|
||||
parts_count = 0
|
||||
for part in message.iter_parts():
|
||||
# We explicitly check Content-Type of each part later,
|
||||
# but this is to be absolutely sure `get_payload()` returns string and not list.
|
||||
if part.is_multipart():
|
||||
return False
|
||||
|
||||
if parts_count == 0:
|
||||
if part.get_content_type() != "application/pgp-encrypted":
|
||||
return False
|
||||
|
||||
payload = part.get_payload()
|
||||
if payload.strip() != "Version: 1":
|
||||
return False
|
||||
elif parts_count == 1:
|
||||
if part.get_content_type() != "application/octet-stream":
|
||||
return False
|
||||
|
||||
payload = part.get_payload()
|
||||
|
||||
prefix = "-----BEGIN PGP MESSAGE-----\r\n\r\n"
|
||||
if not payload.startswith(prefix):
|
||||
return False
|
||||
payload = payload.removeprefix(prefix)
|
||||
|
||||
suffix = "-----END PGP MESSAGE-----\r\n\r\n"
|
||||
if not payload.endswith(suffix):
|
||||
return False
|
||||
payload = payload.removesuffix(suffix)
|
||||
|
||||
if not BASE64_REGEX.fullmatch(payload):
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
parts_count += 1
|
||||
|
||||
@@ -68,7 +68,9 @@ def maildata(request):
|
||||
assert datadir.exists(), datadir
|
||||
|
||||
def maildata(name, from_addr, to_addr):
|
||||
data = datadir.joinpath(name).read_text()
|
||||
# Using `.read_bytes().decode()` instead of `.read_text()` to preserve newlines.
|
||||
data = datadir.joinpath(name).read_bytes().decode()
|
||||
|
||||
text = data.format(from_addr=from_addr, to_addr=to_addr)
|
||||
return BytesParser(policy=policy.default).parsebytes(text.encode())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user