|
|
|
|
@@ -2,7 +2,6 @@
|
|
|
|
|
import asyncio
|
|
|
|
|
import base64
|
|
|
|
|
import binascii
|
|
|
|
|
import logging
|
|
|
|
|
import sys
|
|
|
|
|
import time
|
|
|
|
|
from email import policy
|
|
|
|
|
@@ -105,8 +104,8 @@ def check_armored_payload(payload: str, outgoing: bool):
|
|
|
|
|
# Disallow comments in outgoing messages
|
|
|
|
|
version_comment = "Version: "
|
|
|
|
|
if payload.startswith(version_comment):
|
|
|
|
|
version_line = payload.splitlines()[0]
|
|
|
|
|
payload = payload.removeprefix(version_line)
|
|
|
|
|
splitindex = payload.find("\r\n") + 2
|
|
|
|
|
payload = payload[splitindex:]
|
|
|
|
|
if outgoing:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
@@ -229,7 +228,7 @@ class OutgoingBeforeQueueHandler:
|
|
|
|
|
self.send_rate_limiter = SendRateLimiter()
|
|
|
|
|
|
|
|
|
|
async def handle_MAIL(self, server, session, envelope, address, mail_options):
|
|
|
|
|
logging.info(f"handle_MAIL from {address}")
|
|
|
|
|
log_info(f"handle_MAIL from {address}")
|
|
|
|
|
envelope.mail_from = address
|
|
|
|
|
max_sent = self.config.max_user_send_per_minute
|
|
|
|
|
if not self.send_rate_limiter.is_sending_allowed(address, max_sent):
|
|
|
|
|
@@ -242,11 +241,15 @@ class OutgoingBeforeQueueHandler:
|
|
|
|
|
return "250 OK"
|
|
|
|
|
|
|
|
|
|
async def handle_DATA(self, server, session, envelope):
|
|
|
|
|
logging.info("handle_DATA before-queue")
|
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
|
return await loop.run_in_executor(None, self.sync_handle_DATA, envelope)
|
|
|
|
|
|
|
|
|
|
def sync_handle_DATA(self, envelope):
|
|
|
|
|
log_info("handle_DATA before-queue")
|
|
|
|
|
error = self.check_DATA(envelope)
|
|
|
|
|
if error:
|
|
|
|
|
return error
|
|
|
|
|
logging.info("re-injecting the mail that passed checks")
|
|
|
|
|
log_info("re-injecting the mail that passed checks")
|
|
|
|
|
client = SMTPClient("localhost", self.config.postfix_reinject_port)
|
|
|
|
|
client.sendmail(
|
|
|
|
|
envelope.mail_from, envelope.rcpt_tos, envelope.original_content
|
|
|
|
|
@@ -255,7 +258,7 @@ class OutgoingBeforeQueueHandler:
|
|
|
|
|
|
|
|
|
|
def check_DATA(self, envelope):
|
|
|
|
|
"""the central filtering function for e-mails."""
|
|
|
|
|
logging.info(f"Processing DATA message from {envelope.mail_from}")
|
|
|
|
|
log_info(f"Processing DATA message from {envelope.mail_from}")
|
|
|
|
|
|
|
|
|
|
message = BytesParser(policy=policy.default).parsebytes(envelope.content)
|
|
|
|
|
mail_encrypted = check_encrypted(message, outgoing=True)
|
|
|
|
|
@@ -295,11 +298,15 @@ class IncomingBeforeQueueHandler:
|
|
|
|
|
self.config = config
|
|
|
|
|
|
|
|
|
|
async def handle_DATA(self, server, session, envelope):
|
|
|
|
|
logging.info("handle_DATA before-queue")
|
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
|
return await loop.run_in_executor(None, self.sync_handle_DATA, envelope)
|
|
|
|
|
|
|
|
|
|
def sync_handle_DATA(self, envelope):
|
|
|
|
|
log_info("handle_DATA before-queue")
|
|
|
|
|
error = self.check_DATA(envelope)
|
|
|
|
|
if error:
|
|
|
|
|
return error
|
|
|
|
|
logging.info("re-injecting the mail that passed checks")
|
|
|
|
|
log_info("re-injecting the mail that passed checks")
|
|
|
|
|
|
|
|
|
|
# the smtp daemon on reinject_port_incoming gives it to dkim milter
|
|
|
|
|
# which looks at source address to determine whether to verify or sign
|
|
|
|
|
@@ -315,7 +322,7 @@ class IncomingBeforeQueueHandler:
|
|
|
|
|
|
|
|
|
|
def check_DATA(self, envelope):
|
|
|
|
|
"""the central filtering function for e-mails."""
|
|
|
|
|
logging.info(f"Processing DATA message from {envelope.mail_from}")
|
|
|
|
|
log_info(f"Processing DATA message from {envelope.mail_from}")
|
|
|
|
|
|
|
|
|
|
message = BytesParser(policy=policy.default).parsebytes(envelope.content)
|
|
|
|
|
mail_encrypted = check_encrypted(message, outgoing=False)
|
|
|
|
|
@@ -357,16 +364,19 @@ class SendRateLimiter:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def log_info(msg):
|
|
|
|
|
print(msg, file=sys.stderr)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
args = sys.argv[1:]
|
|
|
|
|
assert len(args) == 2
|
|
|
|
|
config = read_config(args[0])
|
|
|
|
|
mode = args[1]
|
|
|
|
|
logging.basicConfig(level=logging.WARN)
|
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
|
asyncio.set_event_loop(loop)
|
|
|
|
|
assert mode in ["incoming", "outgoing"]
|
|
|
|
|
task = asyncmain_beforequeue(config, mode)
|
|
|
|
|
loop.create_task(task)
|
|
|
|
|
logging.info("entering serving loop")
|
|
|
|
|
log_info("entering serving loop")
|
|
|
|
|
loop.run_forever()
|
|
|
|
|
|