mirror of
https://github.com/chatmail/relay.git
synced 2026-05-18 00:38:57 +00:00
get tests working on IPv4 only machine
This commit is contained in:
@@ -22,10 +22,7 @@ class Config:
|
|||||||
self._inipath = inipath
|
self._inipath = inipath
|
||||||
self.mail_domain = params["mail_domain"]
|
self.mail_domain = params["mail_domain"]
|
||||||
self.mail_domain_hostname = format_arpa_address(params["mail_domain"])
|
self.mail_domain_hostname = format_arpa_address(params["mail_domain"])
|
||||||
if is_valid_ipv4(params["mail_domain"]):
|
self.mail_domain_deliverable = format_deliverable_domain(params["mail_domain"])
|
||||||
self.mail_domain_deliverable = f"[{params['mail_domain']}]"
|
|
||||||
else:
|
|
||||||
self.mail_domain_deliverable = params["mail_domain"]
|
|
||||||
self.max_user_send_per_minute = int(params.get("max_user_send_per_minute", 60))
|
self.max_user_send_per_minute = int(params.get("max_user_send_per_minute", 60))
|
||||||
self.max_user_send_burst_size = int(params.get("max_user_send_burst_size", 10))
|
self.max_user_send_burst_size = int(params.get("max_user_send_burst_size", 10))
|
||||||
self.max_mailbox_size = params["max_mailbox_size"]
|
self.max_mailbox_size = params["max_mailbox_size"]
|
||||||
@@ -198,3 +195,9 @@ def format_arpa_address(address: str) -> str:
|
|||||||
if is_valid_ipv4(address):
|
if is_valid_ipv4(address):
|
||||||
return ipaddress.IPv4Address(address).reverse_pointer
|
return ipaddress.IPv4Address(address).reverse_pointer
|
||||||
return address
|
return address
|
||||||
|
|
||||||
|
|
||||||
|
def format_deliverable_domain(mail_domain: str) -> str:
|
||||||
|
if is_valid_ipv4(mail_domain):
|
||||||
|
return f"[{mail_domain}]"
|
||||||
|
return mail_domain
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ def test_one_mail(
|
|||||||
print(line.decode("ascii"), file=sys.stderr)
|
print(line.decode("ascii"), file=sys.stderr)
|
||||||
pytest.fail("starting filtermail failed")
|
pytest.fail("starting filtermail failed")
|
||||||
|
|
||||||
addr = f"user1@{config.mail_domain}"
|
addr = f"user1@{config.mail_domain_deliverable}"
|
||||||
config.get_user(addr).set_password("l1k2j3l1k2j3l")
|
config.get_user(addr).set_password("l1k2j3l1k2j3l")
|
||||||
|
|
||||||
# send encrypted mail
|
# send encrypted mail
|
||||||
|
|||||||
@@ -80,10 +80,9 @@ filter unix - n n - - lmtp
|
|||||||
127.0.0.1:{{ config.postfix_reinject_port }} inet n - n - 100 smtpd
|
127.0.0.1:{{ config.postfix_reinject_port }} inet n - n - 100 smtpd
|
||||||
-o syslog_name=postfix/reinject
|
-o syslog_name=postfix/reinject
|
||||||
-o milter_macro_daemon_name=ORIGINATING
|
-o milter_macro_daemon_name=ORIGINATING
|
||||||
{% if config.mail_domain == config.mail_domain_deliverable %}
|
|
||||||
-o smtpd_milters=unix:opendkim/opendkim.sock
|
|
||||||
{% endif %}
|
|
||||||
-o cleanup_service_name=authclean
|
-o cleanup_service_name=authclean
|
||||||
|
{% if config.mail_domain == config.mail_domain_deliverable %} -o smtpd_milters=unix:opendkim/opendkim.sock
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
# Local SMTP server for reinjecting incoming filtered mail
|
# Local SMTP server for reinjecting incoming filtered mail
|
||||||
127.0.0.1:{{ config.postfix_reinject_port_incoming }} inet n - n - 100 smtpd
|
127.0.0.1:{{ config.postfix_reinject_port_incoming }} inet n - n - 100 smtpd
|
||||||
|
|||||||
@@ -92,12 +92,11 @@ def test_concurrent_logins_same_account(
|
|||||||
def test_no_vrfy(cmfactory, chatmail_config):
|
def test_no_vrfy(cmfactory, chatmail_config):
|
||||||
ac = cmfactory.get_online_account()
|
ac = cmfactory.get_online_account()
|
||||||
addr = ac.get_config("addr")
|
addr = ac.get_config("addr")
|
||||||
domain = chatmail_config.mail_domain
|
|
||||||
|
|
||||||
s = smtplib.SMTP(domain)
|
s = smtplib.SMTP(chatmail_config.mail_domain)
|
||||||
s.starttls()
|
s.starttls()
|
||||||
|
|
||||||
s.putcmd("vrfy", f"wrongaddress@{chatmail_config.mail_domain}")
|
s.putcmd("vrfy", f"wrongaddress@{chatmail_config.mail_domain_deliverable}")
|
||||||
result = s.getreply()
|
result = s.getreply()
|
||||||
print(result)
|
print(result)
|
||||||
s.putcmd("vrfy", addr)
|
s.putcmd("vrfy", addr)
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import pytest
|
|||||||
|
|
||||||
from cmdeploy import remote
|
from cmdeploy import remote
|
||||||
from cmdeploy.cmdeploy import get_sshexec
|
from cmdeploy.cmdeploy import get_sshexec
|
||||||
|
from chatmaild.config import is_valid_ipv4
|
||||||
|
|
||||||
|
|
||||||
class TestSSHExecutor:
|
class TestSSHExecutor:
|
||||||
@@ -21,6 +22,8 @@ class TestSSHExecutor:
|
|||||||
assert out == out2
|
assert out == out2
|
||||||
|
|
||||||
def test_perform_initial(self, sshexec, maildomain):
|
def test_perform_initial(self, sshexec, maildomain):
|
||||||
|
if is_valid_ipv4(maildomain):
|
||||||
|
pytest.skip(f"{maildomain} is not a domain")
|
||||||
res = sshexec(
|
res = sshexec(
|
||||||
remote.rdns.perform_initial_checks, kwargs=dict(mail_domain=maildomain)
|
remote.rdns.perform_initial_checks, kwargs=dict(mail_domain=maildomain)
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ def imap_mailbox(cmfactory, ssl_context):
|
|||||||
(ac1,) = cmfactory.get_online_accounts(1)
|
(ac1,) = cmfactory.get_online_accounts(1)
|
||||||
user = ac1.get_config("addr")
|
user = ac1.get_config("addr")
|
||||||
password = ac1.get_config("mail_pw")
|
password = ac1.get_config("mail_pw")
|
||||||
host = user.split("@")[1]
|
host = user.split("@")[1].strip("[").strip("]")
|
||||||
mailbox = imap_tools.MailBox(host, ssl_context=ssl_context)
|
mailbox = imap_tools.MailBox(host, ssl_context=ssl_context)
|
||||||
mailbox.login(user, password)
|
mailbox.login(user, password)
|
||||||
mailbox.dc_ac = ac1
|
mailbox.dc_ac = ac1
|
||||||
@@ -178,7 +178,7 @@ def test_hide_senders_ip_address(cmfactory, ssl_context):
|
|||||||
chat.send_text("testing submission header cleanup")
|
chat.send_text("testing submission header cleanup")
|
||||||
user2.wait_for_incoming_msg()
|
user2.wait_for_incoming_msg()
|
||||||
addr = user2.get_config("addr")
|
addr = user2.get_config("addr")
|
||||||
host = addr.split("@")[1]
|
host = addr.split("@")[1].strip("[").strip("]")
|
||||||
pw = user2.get_config("mail_pw")
|
pw = user2.get_config("mail_pw")
|
||||||
mailbox = imap_tools.MailBox(host, ssl_context=ssl_context)
|
mailbox = imap_tools.MailBox(host, ssl_context=ssl_context)
|
||||||
mailbox.login(addr, pw)
|
mailbox.login(addr, pw)
|
||||||
|
|||||||
@@ -10,7 +10,8 @@ import time
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from chatmaild.config import read_config
|
from chatmaild.config import read_config, format_deliverable_domain, is_valid_ipv4
|
||||||
|
|
||||||
|
|
||||||
conftestdir = Path(__file__).parent
|
conftestdir = Path(__file__).parent
|
||||||
|
|
||||||
@@ -61,6 +62,11 @@ def maildomain(chatmail_config):
|
|||||||
return chatmail_config.mail_domain
|
return chatmail_config.mail_domain
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def maildomain_deliverable(maildomain):
|
||||||
|
return format_deliverable_domain(maildomain)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
def sshdomain(maildomain):
|
def sshdomain(maildomain):
|
||||||
return os.environ.get("CHATMAIL_SSH", maildomain)
|
return os.environ.get("CHATMAIL_SSH", maildomain)
|
||||||
@@ -277,8 +283,7 @@ def gencreds(chatmail_config):
|
|||||||
next(count)
|
next(count)
|
||||||
|
|
||||||
def gen(domain=None):
|
def gen(domain=None):
|
||||||
domain = domain if domain else chatmail_config.mail_domain
|
domain = domain if domain else chatmail_config.mail_domain_deliverable
|
||||||
addr_domain = f"[{domain}]" if _is_ip(domain) else domain
|
|
||||||
while 1:
|
while 1:
|
||||||
num = next(count)
|
num = next(count)
|
||||||
alphanumeric = "abcdefghijklmnopqrstuvwxyz1234567890"
|
alphanumeric = "abcdefghijklmnopqrstuvwxyz1234567890"
|
||||||
@@ -317,7 +322,8 @@ class ChatmailACFactory:
|
|||||||
|
|
||||||
def _make_transport(self, domain):
|
def _make_transport(self, domain):
|
||||||
"""Build a transport config dict for the given domain."""
|
"""Build a transport config dict for the given domain."""
|
||||||
addr, password = self.gencreds(domain)
|
domain_deliverable = format_deliverable_domain(domain)
|
||||||
|
addr, password = self.gencreds(domain_deliverable)
|
||||||
transport = {
|
transport = {
|
||||||
"addr": addr,
|
"addr": addr,
|
||||||
"password": password,
|
"password": password,
|
||||||
@@ -326,7 +332,7 @@ class ChatmailACFactory:
|
|||||||
"imapServer": domain,
|
"imapServer": domain,
|
||||||
"smtpServer": domain,
|
"smtpServer": domain,
|
||||||
}
|
}
|
||||||
if self.chatmail_config.tls_cert_mode == "self":
|
if domain.startswith("_") or is_valid_ipv4(domain):
|
||||||
transport["certificateChecks"] = "acceptInvalidCertificates"
|
transport["certificateChecks"] = "acceptInvalidCertificates"
|
||||||
return transport
|
return transport
|
||||||
|
|
||||||
@@ -341,7 +347,8 @@ class ChatmailACFactory:
|
|||||||
accounts = []
|
accounts = []
|
||||||
for _ in range(num):
|
for _ in range(num):
|
||||||
account = self.dc.add_account()
|
account = self.dc.add_account()
|
||||||
addr, password = self.gencreds(domain)
|
domain_deliverable = format_deliverable_domain(domain)
|
||||||
|
addr, password = self.gencreds(domain_deliverable)
|
||||||
if _is_ip(domain):
|
if _is_ip(domain):
|
||||||
# Use DCLOGIN scheme with explicit server hosts,
|
# Use DCLOGIN scheme with explicit server hosts,
|
||||||
# matching how madmail presents its addresses to users.
|
# matching how madmail presents its addresses to users.
|
||||||
|
|||||||
Reference in New Issue
Block a user