mirror of
https://github.com/chatmail/relay.git
synced 2026-05-11 16:34:39 +00:00
Compare commits
1 Commits
link2xt/si
...
link2xt/do
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec22cb3202 |
@@ -98,32 +98,6 @@ def lookup_passdb(db, config: Config, user, cleartext_password):
|
||||
)
|
||||
|
||||
|
||||
def split_and_unescape(s):
|
||||
"""Split strings using double quote as a separator and backslash as escape character
|
||||
into parts."""
|
||||
|
||||
out = ""
|
||||
i = 0
|
||||
while i < len(s):
|
||||
c = s[i]
|
||||
if c == "\\":
|
||||
# Skip escape character.
|
||||
i += 1
|
||||
|
||||
# This will raise IndexError if there is no character
|
||||
# after escape character. This is expected
|
||||
# as this is an invalid input.
|
||||
out += s[i]
|
||||
elif c == '"':
|
||||
# Separator
|
||||
yield out
|
||||
out = ""
|
||||
else:
|
||||
out += c
|
||||
i += 1
|
||||
yield out
|
||||
|
||||
|
||||
def handle_dovecot_request(msg, db, config: Config):
|
||||
short_command = msg[0]
|
||||
if short_command == "L": # LOOKUP
|
||||
@@ -133,9 +107,7 @@ def handle_dovecot_request(msg, db, config: Config):
|
||||
# do not attempt to read any other parts for compatibility.
|
||||
keyname = parts[0]
|
||||
|
||||
namespace, type, args = keyname.split("/", 2)
|
||||
args = list(split_and_unescape(args))
|
||||
|
||||
namespace, type, *args = keyname.split("/")
|
||||
reply_command = "F"
|
||||
res = ""
|
||||
if namespace == "shared":
|
||||
|
||||
@@ -52,9 +52,8 @@ def test_too_high_db_version(db):
|
||||
|
||||
|
||||
def test_handle_dovecot_request(db, example_config):
|
||||
# Test that password can contain ", ', \ and /
|
||||
msg = (
|
||||
'Lshared/passdb/laksjdlaksjdlak\\\\sjdlk\\"12j\\\'3l1/k2j3123"'
|
||||
"Lshared/passdb/laksjdlaksjdlaksjdlk12j3l1k2j3123/"
|
||||
"some42123@chat.example.org\tsome42123@chat.example.org"
|
||||
)
|
||||
res = handle_dovecot_request(msg, db, example_config)
|
||||
|
||||
@@ -37,15 +37,21 @@ class DNS:
|
||||
|
||||
def get(self, typ: str, domain: str) -> str | None:
|
||||
"""Get a DNS entry"""
|
||||
dig_result = self.shell(f"dig -r -q {domain} -t {typ} +short")
|
||||
line = dig_result.partition("\n")[0]
|
||||
if line:
|
||||
return line
|
||||
dig_result = self.shell(f"dig {typ} {domain}")
|
||||
line_num = 0
|
||||
for line in dig_result.splitlines():
|
||||
line_num += 1
|
||||
if line.strip() == ";; ANSWER SECTION:":
|
||||
return dig_result.splitlines()[line_num].split("\t")[-1]
|
||||
|
||||
def check_ptr_record(self, ip: str, mail_domain) -> bool:
|
||||
def check_ptr_record(self, ip: str, mail_domain) -> str:
|
||||
"""Check the PTR record for an IPv4 or IPv6 address."""
|
||||
result = self.shell(f"dig -r -x {ip} +short").rstrip()
|
||||
return result == f"{mail_domain}."
|
||||
result = self.get("-x", ip)
|
||||
if result:
|
||||
if ip_address(ip).version == 6:
|
||||
result = result.split()[-1]
|
||||
if result[:-1] == mail_domain:
|
||||
return result
|
||||
|
||||
|
||||
def show_dns(args, out):
|
||||
|
||||
@@ -1,10 +1,5 @@
|
||||
uri = proxy:/run/dovecot/doveauth.socket:auth
|
||||
iterate_disable = yes
|
||||
default_pass_scheme = plain
|
||||
# %E escapes characters " (double quote), ' (single quote) and \ (backslash) with \ (backslash).
|
||||
# See <https://doc.dovecot.org/configuration_manual/config_file/config_variables/#modifiers>
|
||||
# for documentation.
|
||||
#
|
||||
# We escape user-provided input and use double quote as a separator.
|
||||
password_key = passdb/%Ew"%Eu
|
||||
user_key = userdb/%Eu
|
||||
password_key = passdb/%w/%u
|
||||
user_key = userdb/%u
|
||||
|
||||
@@ -14,12 +14,3 @@ def test_fastcgi_working(maildomain, chatmail_config):
|
||||
res = requests.post(url)
|
||||
assert maildomain in res.json().get("email")
|
||||
assert len(res.json().get("password")) > chatmail_config.password_min_length
|
||||
|
||||
|
||||
def test_newemail_configure(maildomain, rpc):
|
||||
"""Test configuring accounts by scanning a QR code works."""
|
||||
url = f"DCACCOUNT:https://{maildomain}/cgi-bin/newemail.py"
|
||||
for i in range(3):
|
||||
account_id = rpc.add_account()
|
||||
rpc.set_config_from_qr(account_id, url)
|
||||
rpc.configure(account_id)
|
||||
|
||||
Reference in New Issue
Block a user