From e3ff82544a2bce9143a4c621ca64d721a196e18b Mon Sep 17 00:00:00 2001 From: holger krekel Date: Tue, 23 Jul 2024 10:21:49 +0200 Subject: [PATCH] shift lookup methods to class for consistency --- chatmaild/src/chatmaild/doveauth.py | 46 +++++++++-------- .../tests/test_delete_inactive_users.py | 5 +- .../src/chatmaild/tests/test_doveauth.py | 49 +++++++++---------- 3 files changed, 49 insertions(+), 51 deletions(-) diff --git a/chatmaild/src/chatmaild/doveauth.py b/chatmaild/src/chatmaild/doveauth.py index eecfad3c..cc34f17e 100644 --- a/chatmaild/src/chatmaild/doveauth.py +++ b/chatmaild/src/chatmaild/doveauth.py @@ -54,28 +54,6 @@ def is_allowed_to_create(config: Config, user, cleartext_password) -> bool: return True -def lookup_userdb(config: Config, user): - userdir = config.get_user_maildir(user) - password_path = userdir.joinpath("password") - result = {} - if password_path.exists(): - result = dict(addr=user, password=password_path.read_text()) - result.update(config.get_user_dict(user)) - return result - - -def lookup_passdb(config: Config, user, cleartext_password): - userdata = lookup_userdb(config, user) - if userdata: - return userdata - if not is_allowed_to_create(config, user, cleartext_password): - return - - enc_password = encrypt_password(cleartext_password) - config.set_user_password(user, enc_password=enc_password) - return config.get_user_dict(user, enc_password=enc_password) - - def split_and_unescape(s): """Split strings using double quote as a separator and backslash as escape character into parts.""" @@ -122,7 +100,7 @@ class AuthDictProxy(DictProxy): if type == "userdb": user = args[0] if user.endswith(f"@{config.mail_domain}"): - res = lookup_userdb(config, user) + res = self.lookup_userdb(user) if res: reply_command = "O" else: @@ -130,7 +108,7 @@ class AuthDictProxy(DictProxy): elif type == "passdb": user = args[1] if user.endswith(f"@{config.mail_domain}"): - res = lookup_passdb(config, user, cleartext_password=args[0]) + res = self.lookup_passdb(user, cleartext_password=args[0]) if res: reply_command = "O" else: @@ -151,6 +129,26 @@ class AuthDictProxy(DictProxy): getuserpaths = self.config.mailboxes_dir.iterdir() return [x.name for x in getuserpaths if "@" in x.name] + def lookup_userdb(self, user): + userdir = self.config.get_user_maildir(user) + password_path = userdir.joinpath("password") + result = {} + if password_path.exists(): + result = dict(addr=user, password=password_path.read_text()) + result.update(self.config.get_user_dict(user)) + return result + + def lookup_passdb(self, user, cleartext_password): + userdata = self.lookup_userdb(user) + if userdata: + return userdata + if not is_allowed_to_create(self.config, user, cleartext_password): + return + + enc_password = encrypt_password(cleartext_password) + self.config.set_user_password(user, enc_password=enc_password) + return self.config.get_user_dict(user, enc_password=enc_password) + def main(): socket, cfgpath = sys.argv[1:] diff --git a/chatmaild/src/chatmaild/tests/test_delete_inactive_users.py b/chatmaild/src/chatmaild/tests/test_delete_inactive_users.py index ae1bf8eb..df39db18 100644 --- a/chatmaild/src/chatmaild/tests/test_delete_inactive_users.py +++ b/chatmaild/src/chatmaild/tests/test_delete_inactive_users.py @@ -1,7 +1,7 @@ import time from chatmaild.delete_inactive_users import delete_inactive_users -from chatmaild.doveauth import lookup_passdb +from chatmaild.doveauth import AuthDictProxy from chatmaild.lastlogin import get_last_login_from_userdir, write_last_login_to_userdir @@ -28,9 +28,10 @@ def test_delete_skips_non_email_dir(db, example_config): def test_delete_inactive_users(example_config): new = time.time() old = new - (example_config.delete_inactive_users_after * 86400) - 1 + dictproxy = AuthDictProxy(example_config) def create_user(addr, last_login): - lookup_passdb(example_config, addr, "q9mr3faue") + dictproxy.lookup_passdb(addr, "q9mr3faue") md = example_config.get_user_maildir(addr) md.joinpath("cur").mkdir() md.joinpath("cur", "something").mkdir() diff --git a/chatmaild/src/chatmaild/tests/test_doveauth.py b/chatmaild/src/chatmaild/tests/test_doveauth.py index d5a2e314..f62c4c70 100644 --- a/chatmaild/src/chatmaild/tests/test_doveauth.py +++ b/chatmaild/src/chatmaild/tests/test_doveauth.py @@ -9,30 +9,31 @@ import pytest from chatmaild.doveauth import ( AuthDictProxy, is_allowed_to_create, - lookup_passdb, - lookup_userdb, ) from chatmaild.newemail import create_newemail_dict -def test_basic(example_config): - lookup_passdb(example_config, "asdf12345@chat.example.org", "q9mr3faue") - data = lookup_userdb(example_config, "asdf12345@chat.example.org") +@pytest.fixture +def dictproxy(example_config): + return AuthDictProxy(config=example_config) + + +def test_basic(dictproxy, gencreds): + addr, password = gencreds() + dictproxy.lookup_passdb(addr, password) + data = dictproxy.lookup_userdb(addr) assert data - data2 = lookup_passdb( - example_config, "asdf12345@chat.example.org", "q9mr3jewvadsfaue" - ) + data2 = dictproxy.lookup_passdb(addr, password) assert data == data2 -def test_iterate_addresses(example_config): +def test_iterate_addresses(dictproxy): addresses = [] for i in range(10): addresses.append(f"asdf1234{i}@chat.example.org") - lookup_passdb(example_config, addresses[-1], "q9mr3faue") + dictproxy.lookup_passdb(addresses[-1], "q9mr3faue") - dictproxy = AuthDictProxy(config=example_config) res = dictproxy.iter_userdb() assert set(res) == set(addresses) @@ -51,28 +52,26 @@ def test_invalid_username_length(example_config): ) -def test_dont_overwrite_password_on_wrong_login(example_config): +def test_dont_overwrite_password_on_wrong_login(dictproxy): """Test that logging in with a different password doesn't create a new user""" - res = lookup_passdb( - example_config, "newuser12@chat.example.org", "kajdlkajsldk12l3kj1983" + res = dictproxy.lookup_passdb( + "newuser12@chat.example.org", "kajdlkajsldk12l3kj1983" ) assert res["password"] - res2 = lookup_passdb(example_config, "newuser12@chat.example.org", "kajdslqwe") + res2 = dictproxy.lookup_passdb("newuser12@chat.example.org", "kajdslqwe") # this function always returns a password hash, which is actually compared by dovecot. assert res["password"] == res2["password"] -def test_nocreate_file(monkeypatch, tmpdir, example_config): +def test_nocreate_file(monkeypatch, tmpdir, dictproxy): p = tmpdir.join("nocreate") p.write("") monkeypatch.setattr(chatmaild.doveauth, "NOCREATE_FILE", str(p)) - lookup_passdb(example_config, "newuser12@chat.example.org", "zequ0Aimuchoodaechik") - assert not lookup_userdb(example_config, "newuser12@chat.example.org") + dictproxy.lookup_passdb("newuser12@chat.example.org", "zequ0Aimuchoodaechik") + assert not dictproxy.lookup_userdb("newuser12@chat.example.org") -def test_handle_dovecot_request(example_config): - dictproxy = AuthDictProxy(config=example_config) - +def test_handle_dovecot_request(dictproxy): # Test that password can contain ", ', \ and / msg = ( 'Lshared/passdb/laksjdlaksjdlak\\\\sjdlk\\"12j\\\'3l1/k2j3123"' @@ -108,8 +107,8 @@ def test_handle_dovecot_protocol_user_not_exists(example_config): def test_handle_dovecot_protocol_iterate(gencreds, example_config): dictproxy = AuthDictProxy(config=example_config) - lookup_passdb(example_config, "asdf00000@chat.example.org", "q9mr3faue") - lookup_passdb(example_config, "asdf11111@chat.example.org", "q9mr3faue") + dictproxy.lookup_passdb("asdf00000@chat.example.org", "q9mr3faue") + dictproxy.lookup_passdb("asdf11111@chat.example.org", "q9mr3faue") rfile = io.BytesIO(b"H3\t2\t0\t\tauth\nI0\t0\tshared/userdb/") wfile = io.BytesIO() dictproxy.loop_forever(rfile, wfile) @@ -119,7 +118,7 @@ def test_handle_dovecot_protocol_iterate(gencreds, example_config): assert not lines[2] -def test_50_concurrent_lookups_different_accounts(gencreds, example_config): +def test_50_concurrent_lookups_different_accounts(gencreds, dictproxy): num_threads = 50 req_per_thread = 5 results = queue.Queue() @@ -128,7 +127,7 @@ def test_50_concurrent_lookups_different_accounts(gencreds, example_config): for i in range(req_per_thread): addr, password = gencreds() try: - lookup_passdb(example_config, addr, password) + dictproxy.lookup_passdb(addr, password) except Exception: results.put(traceback.format_exc()) else: