shift lookup methods to class for consistency

This commit is contained in:
holger krekel
2024-07-23 10:21:49 +02:00
parent eddfadaf7f
commit e3ff82544a
3 changed files with 49 additions and 51 deletions

View File

@@ -54,28 +54,6 @@ def is_allowed_to_create(config: Config, user, cleartext_password) -> bool:
return True return True
def lookup_userdb(config: Config, user):
userdir = config.get_user_maildir(user)
password_path = userdir.joinpath("password")
result = {}
if password_path.exists():
result = dict(addr=user, password=password_path.read_text())
result.update(config.get_user_dict(user))
return result
def lookup_passdb(config: Config, user, cleartext_password):
userdata = lookup_userdb(config, user)
if userdata:
return userdata
if not is_allowed_to_create(config, user, cleartext_password):
return
enc_password = encrypt_password(cleartext_password)
config.set_user_password(user, enc_password=enc_password)
return config.get_user_dict(user, enc_password=enc_password)
def split_and_unescape(s): def split_and_unescape(s):
"""Split strings using double quote as a separator and backslash as escape character """Split strings using double quote as a separator and backslash as escape character
into parts.""" into parts."""
@@ -122,7 +100,7 @@ class AuthDictProxy(DictProxy):
if type == "userdb": if type == "userdb":
user = args[0] user = args[0]
if user.endswith(f"@{config.mail_domain}"): if user.endswith(f"@{config.mail_domain}"):
res = lookup_userdb(config, user) res = self.lookup_userdb(user)
if res: if res:
reply_command = "O" reply_command = "O"
else: else:
@@ -130,7 +108,7 @@ class AuthDictProxy(DictProxy):
elif type == "passdb": elif type == "passdb":
user = args[1] user = args[1]
if user.endswith(f"@{config.mail_domain}"): if user.endswith(f"@{config.mail_domain}"):
res = lookup_passdb(config, user, cleartext_password=args[0]) res = self.lookup_passdb(user, cleartext_password=args[0])
if res: if res:
reply_command = "O" reply_command = "O"
else: else:
@@ -151,6 +129,26 @@ class AuthDictProxy(DictProxy):
getuserpaths = self.config.mailboxes_dir.iterdir() getuserpaths = self.config.mailboxes_dir.iterdir()
return [x.name for x in getuserpaths if "@" in x.name] return [x.name for x in getuserpaths if "@" in x.name]
def lookup_userdb(self, user):
userdir = self.config.get_user_maildir(user)
password_path = userdir.joinpath("password")
result = {}
if password_path.exists():
result = dict(addr=user, password=password_path.read_text())
result.update(self.config.get_user_dict(user))
return result
def lookup_passdb(self, user, cleartext_password):
userdata = self.lookup_userdb(user)
if userdata:
return userdata
if not is_allowed_to_create(self.config, user, cleartext_password):
return
enc_password = encrypt_password(cleartext_password)
self.config.set_user_password(user, enc_password=enc_password)
return self.config.get_user_dict(user, enc_password=enc_password)
def main(): def main():
socket, cfgpath = sys.argv[1:] socket, cfgpath = sys.argv[1:]

View File

@@ -1,7 +1,7 @@
import time import time
from chatmaild.delete_inactive_users import delete_inactive_users from chatmaild.delete_inactive_users import delete_inactive_users
from chatmaild.doveauth import lookup_passdb from chatmaild.doveauth import AuthDictProxy
from chatmaild.lastlogin import get_last_login_from_userdir, write_last_login_to_userdir from chatmaild.lastlogin import get_last_login_from_userdir, write_last_login_to_userdir
@@ -28,9 +28,10 @@ def test_delete_skips_non_email_dir(db, example_config):
def test_delete_inactive_users(example_config): def test_delete_inactive_users(example_config):
new = time.time() new = time.time()
old = new - (example_config.delete_inactive_users_after * 86400) - 1 old = new - (example_config.delete_inactive_users_after * 86400) - 1
dictproxy = AuthDictProxy(example_config)
def create_user(addr, last_login): def create_user(addr, last_login):
lookup_passdb(example_config, addr, "q9mr3faue") dictproxy.lookup_passdb(addr, "q9mr3faue")
md = example_config.get_user_maildir(addr) md = example_config.get_user_maildir(addr)
md.joinpath("cur").mkdir() md.joinpath("cur").mkdir()
md.joinpath("cur", "something").mkdir() md.joinpath("cur", "something").mkdir()

View File

@@ -9,30 +9,31 @@ import pytest
from chatmaild.doveauth import ( from chatmaild.doveauth import (
AuthDictProxy, AuthDictProxy,
is_allowed_to_create, is_allowed_to_create,
lookup_passdb,
lookup_userdb,
) )
from chatmaild.newemail import create_newemail_dict from chatmaild.newemail import create_newemail_dict
def test_basic(example_config): @pytest.fixture
lookup_passdb(example_config, "asdf12345@chat.example.org", "q9mr3faue") def dictproxy(example_config):
data = lookup_userdb(example_config, "asdf12345@chat.example.org") return AuthDictProxy(config=example_config)
def test_basic(dictproxy, gencreds):
addr, password = gencreds()
dictproxy.lookup_passdb(addr, password)
data = dictproxy.lookup_userdb(addr)
assert data assert data
data2 = lookup_passdb( data2 = dictproxy.lookup_passdb(addr, password)
example_config, "asdf12345@chat.example.org", "q9mr3jewvadsfaue"
)
assert data == data2 assert data == data2
def test_iterate_addresses(example_config): def test_iterate_addresses(dictproxy):
addresses = [] addresses = []
for i in range(10): for i in range(10):
addresses.append(f"asdf1234{i}@chat.example.org") addresses.append(f"asdf1234{i}@chat.example.org")
lookup_passdb(example_config, addresses[-1], "q9mr3faue") dictproxy.lookup_passdb(addresses[-1], "q9mr3faue")
dictproxy = AuthDictProxy(config=example_config)
res = dictproxy.iter_userdb() res = dictproxy.iter_userdb()
assert set(res) == set(addresses) assert set(res) == set(addresses)
@@ -51,28 +52,26 @@ def test_invalid_username_length(example_config):
) )
def test_dont_overwrite_password_on_wrong_login(example_config): def test_dont_overwrite_password_on_wrong_login(dictproxy):
"""Test that logging in with a different password doesn't create a new user""" """Test that logging in with a different password doesn't create a new user"""
res = lookup_passdb( res = dictproxy.lookup_passdb(
example_config, "newuser12@chat.example.org", "kajdlkajsldk12l3kj1983" "newuser12@chat.example.org", "kajdlkajsldk12l3kj1983"
) )
assert res["password"] assert res["password"]
res2 = lookup_passdb(example_config, "newuser12@chat.example.org", "kajdslqwe") res2 = dictproxy.lookup_passdb("newuser12@chat.example.org", "kajdslqwe")
# this function always returns a password hash, which is actually compared by dovecot. # this function always returns a password hash, which is actually compared by dovecot.
assert res["password"] == res2["password"] assert res["password"] == res2["password"]
def test_nocreate_file(monkeypatch, tmpdir, example_config): def test_nocreate_file(monkeypatch, tmpdir, dictproxy):
p = tmpdir.join("nocreate") p = tmpdir.join("nocreate")
p.write("") p.write("")
monkeypatch.setattr(chatmaild.doveauth, "NOCREATE_FILE", str(p)) monkeypatch.setattr(chatmaild.doveauth, "NOCREATE_FILE", str(p))
lookup_passdb(example_config, "newuser12@chat.example.org", "zequ0Aimuchoodaechik") dictproxy.lookup_passdb("newuser12@chat.example.org", "zequ0Aimuchoodaechik")
assert not lookup_userdb(example_config, "newuser12@chat.example.org") assert not dictproxy.lookup_userdb("newuser12@chat.example.org")
def test_handle_dovecot_request(example_config): def test_handle_dovecot_request(dictproxy):
dictproxy = AuthDictProxy(config=example_config)
# Test that password can contain ", ', \ and / # Test that password can contain ", ', \ and /
msg = ( msg = (
'Lshared/passdb/laksjdlaksjdlak\\\\sjdlk\\"12j\\\'3l1/k2j3123"' 'Lshared/passdb/laksjdlaksjdlak\\\\sjdlk\\"12j\\\'3l1/k2j3123"'
@@ -108,8 +107,8 @@ def test_handle_dovecot_protocol_user_not_exists(example_config):
def test_handle_dovecot_protocol_iterate(gencreds, example_config): def test_handle_dovecot_protocol_iterate(gencreds, example_config):
dictproxy = AuthDictProxy(config=example_config) dictproxy = AuthDictProxy(config=example_config)
lookup_passdb(example_config, "asdf00000@chat.example.org", "q9mr3faue") dictproxy.lookup_passdb("asdf00000@chat.example.org", "q9mr3faue")
lookup_passdb(example_config, "asdf11111@chat.example.org", "q9mr3faue") dictproxy.lookup_passdb("asdf11111@chat.example.org", "q9mr3faue")
rfile = io.BytesIO(b"H3\t2\t0\t\tauth\nI0\t0\tshared/userdb/") rfile = io.BytesIO(b"H3\t2\t0\t\tauth\nI0\t0\tshared/userdb/")
wfile = io.BytesIO() wfile = io.BytesIO()
dictproxy.loop_forever(rfile, wfile) dictproxy.loop_forever(rfile, wfile)
@@ -119,7 +118,7 @@ def test_handle_dovecot_protocol_iterate(gencreds, example_config):
assert not lines[2] assert not lines[2]
def test_50_concurrent_lookups_different_accounts(gencreds, example_config): def test_50_concurrent_lookups_different_accounts(gencreds, dictproxy):
num_threads = 50 num_threads = 50
req_per_thread = 5 req_per_thread = 5
results = queue.Queue() results = queue.Queue()
@@ -128,7 +127,7 @@ def test_50_concurrent_lookups_different_accounts(gencreds, example_config):
for i in range(req_per_thread): for i in range(req_per_thread):
addr, password = gencreds() addr, password = gencreds()
try: try:
lookup_passdb(example_config, addr, password) dictproxy.lookup_passdb(addr, password)
except Exception: except Exception:
results.put(traceback.format_exc()) results.put(traceback.format_exc())
else: else: