mirror of
https://github.com/chatmail/relay.git
synced 2026-05-10 16:04:37 +00:00
Compare commits
5 Commits
link2xt/py
...
fix221
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e313bc3707 | ||
|
|
21778fa4f3 | ||
|
|
14342383cf | ||
|
|
926de76010 | ||
|
|
ee25d35db1 |
@@ -36,12 +36,11 @@ class DNS:
|
||||
cmd = "ip a | grep inet6 | grep 'scope global' | sed -e 's#/64 scope global##' | sed -e 's#inet6##'"
|
||||
return self.shell(cmd).strip()
|
||||
|
||||
def get(self, typ: str, domain: str) -> Optional[str]:
|
||||
"""Get a DNS entry"""
|
||||
def get(self, typ: str, domain: str) -> str:
|
||||
"""Get a DNS entry or empty string if there is none."""
|
||||
dig_result = self.shell(f"dig -r -q {domain} -t {typ} +short")
|
||||
line = dig_result.partition("\n")[0]
|
||||
if line:
|
||||
return line
|
||||
return line
|
||||
|
||||
def check_ptr_record(self, ip: str, mail_domain) -> bool:
|
||||
"""Check the PTR record for an IPv4 or IPv6 address."""
|
||||
@@ -56,22 +55,25 @@ def show_dns(args, out) -> int:
|
||||
ssh = f"ssh root@{mail_domain}"
|
||||
dns = DNS(out, mail_domain)
|
||||
|
||||
def read_dkim_entries(entry):
|
||||
lines = []
|
||||
for line in entry.split("\n"):
|
||||
if line.startswith(";") or not line.strip():
|
||||
continue
|
||||
line = line.replace("\t", " ")
|
||||
lines.append(line)
|
||||
return "\n".join(lines)
|
||||
|
||||
print("Checking your DKIM keys and DNS entries...")
|
||||
try:
|
||||
acme_account_url = out.shell_output(f"{ssh} -- acmetool account-url")
|
||||
except subprocess.CalledProcessError:
|
||||
print("Please run `cmdeploy run` first.")
|
||||
return 1
|
||||
dkim_entry = read_dkim_entries(out.shell_output(f"{ssh} -- opendkim-genzone -F"))
|
||||
|
||||
dkim_selector = "opendkim"
|
||||
dkim_pubkey = out.shell_output(
|
||||
ssh + f" -- openssl rsa -in /etc/dkimkeys/{dkim_selector}.private"
|
||||
" -pubout 2>/dev/null | awk '/-/{next}{printf(\"%s\",$0)}'"
|
||||
)
|
||||
dkim_entry_value = f"v=DKIM1;k=rsa;p={dkim_pubkey};s=email;t=s"
|
||||
dkim_entry_str = ""
|
||||
while len(dkim_entry_value) >= 255:
|
||||
dkim_entry_str += '"' + dkim_entry_value[:255] + '" '
|
||||
dkim_entry_value = dkim_entry_value[255:]
|
||||
dkim_entry_str += '"' + dkim_entry_value + '"'
|
||||
dkim_entry = f"{dkim_selector}._domainkey.{mail_domain}. TXT {dkim_entry_str}"
|
||||
|
||||
ipv6 = dns.get_ipv6()
|
||||
reverse_ipv6 = dns.check_ptr_record(ipv6, mail_domain)
|
||||
@@ -99,7 +101,6 @@ def show_dns(args, out) -> int:
|
||||
return 0
|
||||
except TypeError:
|
||||
pass
|
||||
started_dkim_parsing = False
|
||||
for line in zonefile.splitlines():
|
||||
line = line.format(
|
||||
acme_account_url=acme_account_url,
|
||||
@@ -126,28 +127,23 @@ def show_dns(args, out) -> int:
|
||||
current = dns.get("SRV", domain[:-1])
|
||||
if current != f"{prio} {weight} {port} {value}":
|
||||
to_print.append(line)
|
||||
if " TXT " in line:
|
||||
if " TXT " in line:
|
||||
domain, value = line.split(" TXT ")
|
||||
current = dns.get("TXT", domain.strip()[:-1])
|
||||
if domain.startswith("_mta-sts."):
|
||||
if current:
|
||||
if current.split("id=")[0] == value.split("id=")[0]:
|
||||
continue
|
||||
if current != value:
|
||||
|
||||
# TXT records longer than 255 bytes
|
||||
# are split into multiple <character-string>s.
|
||||
# This typically happens with DKIM record
|
||||
# which contains long RSA key.
|
||||
#
|
||||
# Removing `" "` before comparison
|
||||
# to get back a single string.
|
||||
if current.replace('" "', "") != value.replace('" "', ""):
|
||||
to_print.append(line)
|
||||
if "IN TXT ( " in line:
|
||||
started_dkim_parsing = True
|
||||
dkim_lines = [line]
|
||||
if started_dkim_parsing and line.startswith('"'):
|
||||
dkim_lines.append(" " + line)
|
||||
domain, data = "\n".join(dkim_lines).split(" IN TXT ")
|
||||
current = dns.get("TXT", domain.strip()[:-1])
|
||||
if current:
|
||||
current = "( %s )" % (current.replace('" "', '"\n "'))
|
||||
if current.replace(";", "\\;") != data:
|
||||
to_print.append(dkim_entry)
|
||||
else:
|
||||
to_print.append(dkim_entry)
|
||||
|
||||
exit_code = 0
|
||||
if to_print:
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
import threading
|
||||
import queue
|
||||
import socket
|
||||
|
||||
from chatmaild.config import read_config
|
||||
from cmdeploy.cmdeploy import main
|
||||
@@ -78,3 +79,24 @@ def test_concurrent_logins_same_account(
|
||||
|
||||
for _ in conns:
|
||||
assert login_results.get()
|
||||
|
||||
|
||||
def test_no_vrfy(chatmail_config):
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect((chatmail_config.mail_domain, 25))
|
||||
banner = sock.recv(1024)
|
||||
print(banner)
|
||||
sock.send(b"VRFY wrongaddress@%s\r\n" % (chatmail_config.mail_domain.encode(),))
|
||||
result = sock.recv(1024)
|
||||
print(result)
|
||||
sock.send(b"VRFY echo@%s\r\n" % (chatmail_config.mail_domain.encode(),))
|
||||
result2 = sock.recv(1024)
|
||||
print(result2)
|
||||
assert result[0:10] == result2[0:10]
|
||||
sock.send(b"VRFY wrongaddress\r\n")
|
||||
result = sock.recv(1024)
|
||||
print(result)
|
||||
sock.send(b"VRFY echo\r\n")
|
||||
result2 = sock.recv(1024)
|
||||
print(result2)
|
||||
assert result[0:10] == result2[0:10] == b"252 2.0.0 "
|
||||
|
||||
Reference in New Issue
Block a user