mirror of
https://github.com/chatmail/relay.git
synced 2026-05-16 03:24:48 +00:00
retain "config.mail_domain" as the domain part of @ email addresses, so for ipv4 relays "[1.2.3.4]" and introduce config.ipv4_relay and config.mail_domain_bare helpers.
This commit is contained in:
@@ -21,9 +21,19 @@ def read_config(inipath):
|
||||
class Config:
|
||||
def __init__(self, inipath, params):
|
||||
self._inipath = inipath
|
||||
self.mail_domain = params["mail_domain"]
|
||||
self.mail_domain_hostname = format_arpa_address(params["mail_domain"])
|
||||
self.mail_domain_deliverable = format_deliverable_domain(params["mail_domain"])
|
||||
raw_domain = params["mail_domain"]
|
||||
self.mail_domain_bare = raw_domain
|
||||
|
||||
if is_valid_ipv4(raw_domain):
|
||||
self.ipv4_relay = raw_domain
|
||||
self.mail_domain = f"[{raw_domain}]"
|
||||
self.postfix_myhostname = ipaddress.IPv4Address(raw_domain).reverse_pointer
|
||||
else:
|
||||
DomainValidator().validate_domain_re(raw_domain)
|
||||
self.ipv4_relay = None
|
||||
self.mail_domain = raw_domain
|
||||
self.postfix_myhostname = raw_domain
|
||||
|
||||
self.max_user_send_per_minute = int(params.get("max_user_send_per_minute", 60))
|
||||
self.max_user_send_burst_size = int(params.get("max_user_send_burst_size", 10))
|
||||
self.max_mailbox_size = params["max_mailbox_size"]
|
||||
@@ -57,7 +67,7 @@ class Config:
|
||||
self.imap_rawlog = params.get("imap_rawlog", "false").lower() == "true"
|
||||
self.imap_compress = params.get("imap_compress", "false").lower() == "true"
|
||||
if "iroh_relay" not in params:
|
||||
self.iroh_relay = "https://" + params["mail_domain"]
|
||||
self.iroh_relay = "https://" + raw_domain
|
||||
self.enable_iroh_relay = True
|
||||
else:
|
||||
self.iroh_relay = params["iroh_relay"].strip()
|
||||
@@ -83,19 +93,17 @@ class Config:
|
||||
)
|
||||
self.tls_cert_mode = "external"
|
||||
self.tls_cert_path, self.tls_key_path = parts
|
||||
elif self.mail_domain.startswith("_") or is_valid_ipv4(params["mail_domain"]):
|
||||
elif raw_domain.startswith("_") or self.ipv4_relay:
|
||||
self.tls_cert_mode = "self"
|
||||
self.tls_cert_path = "/etc/ssl/certs/mailserver.pem"
|
||||
self.tls_key_path = "/etc/ssl/private/mailserver.key"
|
||||
else:
|
||||
self.tls_cert_mode = "acme"
|
||||
self.tls_cert_path = f"/var/lib/acme/live/{self.mail_domain}/fullchain"
|
||||
self.tls_key_path = f"/var/lib/acme/live/{self.mail_domain}/privkey"
|
||||
self.tls_cert_path = f"/var/lib/acme/live/{raw_domain}/fullchain"
|
||||
self.tls_key_path = f"/var/lib/acme/live/{raw_domain}/privkey"
|
||||
|
||||
# deprecated option
|
||||
mbdir = params.get(
|
||||
"mailboxes_dir", f"/home/vmail/mail/{self.mail_domain_deliverable}"
|
||||
)
|
||||
mbdir = params.get("mailboxes_dir", f"/home/vmail/mail/{raw_domain}")
|
||||
self.mailboxes_dir = Path(mbdir.strip())
|
||||
|
||||
# old unused option (except for first migration from sqlite to maildir store)
|
||||
@@ -192,6 +200,7 @@ def is_valid_ipv4(address: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
|
||||
def format_arpa_address(address: str) -> str:
|
||||
if is_valid_ipv4(address):
|
||||
return ipaddress.IPv4Address(address).reverse_pointer
|
||||
@@ -199,8 +208,8 @@ def format_arpa_address(address: str) -> str:
|
||||
return address
|
||||
|
||||
|
||||
def format_deliverable_domain(mail_domain: str) -> str:
|
||||
if is_valid_ipv4(mail_domain):
|
||||
return f"[{mail_domain}]"
|
||||
DomainValidator().validate_domain_re(mail_domain)
|
||||
return mail_domain
|
||||
def format_mail_domain(raw_domain: str) -> str:
|
||||
if is_valid_ipv4(raw_domain):
|
||||
return f"[{raw_domain}]"
|
||||
DomainValidator().validate_domain_re(raw_domain)
|
||||
return raw_domain
|
||||
|
||||
@@ -108,7 +108,7 @@ class AuthDictProxy(DictProxy):
|
||||
if namespace == "shared":
|
||||
if type == "userdb":
|
||||
user = args[0]
|
||||
if user.endswith(f"@{config.mail_domain_deliverable}"):
|
||||
if user.endswith(f"@{config.mail_domain}"):
|
||||
res = self.lookup_userdb(user)
|
||||
if res:
|
||||
reply_command = "O"
|
||||
@@ -116,7 +116,7 @@ class AuthDictProxy(DictProxy):
|
||||
reply_command = "N"
|
||||
elif type == "passdb":
|
||||
user = args[1]
|
||||
if user.endswith(f"@{config.mail_domain_deliverable}"):
|
||||
if user.endswith(f"@{config.mail_domain}"):
|
||||
res = self.lookup_passdb(user, cleartext_password=args[0])
|
||||
if res:
|
||||
reply_command = "O"
|
||||
|
||||
@@ -22,9 +22,7 @@ def create_newemail_dict(config: Config):
|
||||
secrets.choice(ALPHANUMERIC_PUNCT)
|
||||
for _ in range(config.password_min_length + 3)
|
||||
)
|
||||
return dict(
|
||||
email=f"{user}@{config.mail_domain_deliverable}", password=f"{password}"
|
||||
)
|
||||
return dict(email=f"{user}@{config.mail_domain}", password=f"{password}")
|
||||
|
||||
|
||||
def create_dclogin_url(config, email, password):
|
||||
@@ -33,9 +31,9 @@ def create_dclogin_url(config, email, password):
|
||||
Uses ic=3 (AcceptInvalidCertificates) so chatmail clients
|
||||
can connect to servers with self-signed TLS certificates.
|
||||
"""
|
||||
if config.mail_domain != config.mail_domain_deliverable:
|
||||
imap_host = "&ih=" + config.mail_domain
|
||||
smtp_host = "&sh=" + config.mail_domain
|
||||
if config.ipv4_relay:
|
||||
imap_host = "&ih=" + config.ipv4_relay
|
||||
smtp_host = "&sh=" + config.ipv4_relay
|
||||
else:
|
||||
imap_host = ""
|
||||
smtp_host = ""
|
||||
|
||||
@@ -4,7 +4,7 @@ import pytest
|
||||
|
||||
from chatmaild.config import (
|
||||
format_arpa_address,
|
||||
format_deliverable_domain,
|
||||
format_mail_domain,
|
||||
is_valid_ipv4,
|
||||
parse_size_mb,
|
||||
read_config,
|
||||
@@ -21,12 +21,12 @@ def test_read_config_basic(example_config):
|
||||
example_config = read_config(inipath)
|
||||
assert example_config.max_user_send_per_minute == 37
|
||||
assert example_config.mail_domain == "chat.example.org"
|
||||
assert example_config.mail_domain_deliverable == "chat.example.org"
|
||||
assert example_config.ipv4_relay is None
|
||||
|
||||
|
||||
def test_read_config_deliverable(ipv4_config):
|
||||
assert ipv4_config.mail_domain == "1.3.3.7"
|
||||
assert ipv4_config.mail_domain_deliverable == "[1.3.3.7]"
|
||||
def test_read_config_ipv4(ipv4_config):
|
||||
assert ipv4_config.ipv4_relay == "1.3.3.7"
|
||||
assert ipv4_config.mail_domain == "[1.3.3.7]"
|
||||
|
||||
|
||||
def test_read_config_basic_using_defaults(tmp_path, maildomain):
|
||||
@@ -188,6 +188,6 @@ def test_format_arpa_address(input, result, exception):
|
||||
("12394142", None, pytest.raises(ValueError)),
|
||||
],
|
||||
)
|
||||
def test_format_deliverable_domain(input, result, exception):
|
||||
def test_format_mail_domain(input, result, exception):
|
||||
with exception:
|
||||
assert result == format_deliverable_domain(input)
|
||||
assert result == format_mail_domain(input)
|
||||
|
||||
@@ -44,20 +44,12 @@ def test_invalid_username_length(example_config):
|
||||
config.username_min_length = 6
|
||||
config.username_max_length = 10
|
||||
password = create_newemail_dict(config)["password"]
|
||||
assert not is_allowed_to_create(config, f"a1234@{config.mail_domain}", password)
|
||||
assert is_allowed_to_create(config, f"012345@{config.mail_domain}", password)
|
||||
assert is_allowed_to_create(config, f"0123456@{config.mail_domain}", password)
|
||||
assert is_allowed_to_create(config, f"0123456789@{config.mail_domain}", password)
|
||||
assert not is_allowed_to_create(
|
||||
config, f"a1234@{config.mail_domain_deliverable}", password
|
||||
)
|
||||
assert is_allowed_to_create(
|
||||
config, f"012345@{config.mail_domain_deliverable}", password
|
||||
)
|
||||
assert is_allowed_to_create(
|
||||
config, f"0123456@{config.mail_domain_deliverable}", password
|
||||
)
|
||||
assert is_allowed_to_create(
|
||||
config, f"0123456789@{config.mail_domain_deliverable}", password
|
||||
)
|
||||
assert not is_allowed_to_create(
|
||||
config, f"0123456789x@{config.mail_domain_deliverable}", password
|
||||
config, f"0123456789x@{config.mail_domain}", password
|
||||
)
|
||||
|
||||
|
||||
@@ -132,7 +124,7 @@ def test_invalid_localpart_characters(make_config):
|
||||
"""Test that is_allowed_to_create rejects localparts with invalid characters."""
|
||||
config = make_config("chat.example.org", {"username_min_length": "3"})
|
||||
password = "zequ0Aimuchoodaechik"
|
||||
domain = config.mail_domain_deliverable
|
||||
domain = config.mail_domain
|
||||
|
||||
# valid localparts
|
||||
assert is_allowed_to_create(config, f"abc123@{domain}", password)
|
||||
|
||||
@@ -74,7 +74,7 @@ def test_one_mail(
|
||||
print(line.decode("ascii"), file=sys.stderr)
|
||||
pytest.fail("starting filtermail failed")
|
||||
|
||||
addr = f"user1@{config.mail_domain_deliverable}"
|
||||
addr = f"user1@{config.mail_domain}"
|
||||
config.get_user(addr).set_password("l1k2j3l1k2j3l")
|
||||
|
||||
# send encrypted mail
|
||||
|
||||
@@ -56,7 +56,7 @@ def test_print_new_account(capsys, monkeypatch, maildomain, tmpdir, example_conf
|
||||
assert lines[0] == "Content-Type: application/json"
|
||||
assert not lines[1]
|
||||
dic = json.loads(lines[2])
|
||||
assert dic["email"].endswith(f"@{example_config.mail_domain_deliverable}")
|
||||
assert dic["email"].endswith(f"@{example_config.mail_domain}")
|
||||
assert len(dic["password"]) >= 10
|
||||
# default tls_cert=acme should not include dclogin_url
|
||||
assert "dclogin_url" not in dic
|
||||
|
||||
Reference in New Issue
Block a user