Compare commits

...

3 Commits

Author SHA1 Message Date
holger krekel
31e08832a6 shift functions to a DictProxy class 2023-10-21 01:40:58 +02:00
holger krekel
9d175316ff formatting and fixture move 2023-10-21 01:32:36 +02:00
holger krekel
fdd528841f fix nocreate tests 2023-10-21 01:17:09 +02:00
5 changed files with 90 additions and 84 deletions

View File

@@ -21,66 +21,68 @@ def encrypt_password(password: str):
return "{SHA512-CRYPT}" + passhash return "{SHA512-CRYPT}" + passhash
def create_user(db, user, password): class DictProxy:
if os.path.exists(NOCREATE_FILE): def __init__(self, db, mail_domain):
logging.warning( self.db = db
f"Didn't create account: {NOCREATE_FILE} exists. Delete the file to enable account creation." self.mail_domain = mail_domain
)
return def create_user(self, user, password):
with db.write_transaction() as conn: if os.path.exists(NOCREATE_FILE):
conn.create_user(user, password) logging.warning(f"Didn't create account: {NOCREATE_FILE} exists.")
return dict(home=f"/home/vmail/{user}", uid="vmail", gid="vmail", password=password) return
with self.db.write_transaction() as conn:
conn.create_user(user, password)
return dict(home=f"/home/vmail/{user}", uid="vmail", gid="vmail", password=password)
def get_user_data(self, user):
with self.db.read_connection() as conn:
result = conn.get_user(user)
if result:
result["uid"] = "vmail"
result["gid"] = "vmail"
return result
def get_user_data(db, user): def lookup_userdb(self, user):
with db.read_connection() as conn: return self.get_user_data(user)
result = conn.get_user(user)
if result:
result["uid"] = "vmail"
result["gid"] = "vmail"
return result
def lookup_userdb(db, user): def lookup_passdb(self, user, password):
return get_user_data(db, user) userdata = self.get_user_data(user)
if not userdata:
return self.create_user(user, encrypt_password(password))
userdata["password"] = userdata["password"].strip()
return userdata
def lookup_passdb(db, user, password): def handle_dovecot_request(self, msg):
userdata = get_user_data(db, user) print(f"received msg: {msg!r}", file=sys.stderr)
if not userdata: short_command = msg[0]
return create_user(db, user, encrypt_password(password)) if short_command == "L": # LOOKUP
userdata["password"] = userdata["password"].strip() parts = msg[1:].split("\t")
return userdata keyname, user = parts[:2]
namespace, type, *args = keyname.split("/")
reply_command = "F"
def handle_dovecot_request(msg, db, mail_domain): res = ""
print(f"received msg: {msg!r}", file=sys.stderr) if namespace == "shared":
short_command = msg[0] if type == "userdb":
if short_command == "L": # LOOKUP if user.endswith(f"@{self.mail_domain}"):
parts = msg[1:].split("\t") res = lookup_userdb(db, user)
keyname, user = parts[:2] if res:
namespace, type, *args = keyname.split("/") reply_command = "O"
reply_command = "F" else:
res = "" reply_command = "N"
if namespace == "shared": elif type == "passdb":
if type == "userdb": if user.endswith(f"@{self.mail_domain}"):
if user.endswith(f"@{mail_domain}"): res = lookup_passdb(db, user, password=args[0])
res = lookup_userdb(db, user) if res:
if res: reply_command = "O"
reply_command = "O" else:
else: reply_command = "N"
reply_command = "N" print(f"res: {res!r}", file=sys.stderr)
elif type == "passdb": json_res = json.dumps(res) if res else ""
if user.endswith(f"@{mail_domain}"): return f"{reply_command}{json_res}\n"
res = lookup_passdb(db, user, password=args[0]) return None
if res:
reply_command = "O"
else:
reply_command = "N"
print(f"res: {res!r}", file=sys.stderr)
json_res = json.dumps(res) if res else ""
return f"{reply_command}{json_res}\n"
return None
class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer): class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer):
@@ -90,17 +92,18 @@ class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer):
def main(): def main():
socket = sys.argv[1] socket = sys.argv[1]
passwd_entry = pwd.getpwnam(sys.argv[2]) passwd_entry = pwd.getpwnam(sys.argv[2])
db = Database(sys.argv[3])
with open("/etc/mailname", "r") as fp: with open("/etc/mailname", "r") as fp:
mail_domain = fp.read().strip() mail_domain = fp.read().strip()
db = Database(sys.argv[3])
dictproxy = DictProxy(db, mail_domain)
class Handler(StreamRequestHandler): class Handler(StreamRequestHandler):
def handle(self): def handle(self):
while True: while True:
msg = self.rfile.readline().strip().decode() msg = self.rfile.readline().strip().decode()
if not msg: if not msg:
break break
res = handle_dovecot_request(msg, db, mail_domain) res = dictproxy.handle_dovecot_request(msg)
if res: if res:
print(f"sending result: {res!r}", file=sys.stderr) print(f"sending result: {res!r}", file=sys.stderr)
self.wfile.write(res.encode("ascii")) self.wfile.write(res.encode("ascii"))

View File

@@ -11,4 +11,3 @@ conn.login(f"imapcapa", "pass")
status, res = conn.capability() status, res = conn.capability()
for capa in sorted(res[0].decode().split()): for capa in sorted(res[0].decode().split()):
print(capa) print(capa)

View File

@@ -5,7 +5,7 @@ import imaplib
domain = os.environ.get("CHATMAIL_DOMAIN", "c3.testrun.org") domain = os.environ.get("CHATMAIL_DOMAIN", "c3.testrun.org")
NUM_CONNECTIONS=10 NUM_CONNECTIONS = 10
conns = [] conns = []
@@ -16,7 +16,7 @@ for i in range(NUM_CONNECTIONS):
conns.append(conn) conns.append(conn)
tlsdone = time.time() tlsdone = time.time()
duration = tlsdone-start duration = tlsdone - start
print(f"{duration}: TLS connections opening TLS connections") print(f"{duration}: TLS connections opening TLS connections")
for i, conn in enumerate(conns): for i, conn in enumerate(conns):

View File

@@ -0,0 +1,9 @@
import pytest
from chatmaild.database import Database
@pytest.fixture()
def db(tmpdir):
db_path = tmpdir / "passdb.sqlite"
print("database path:", db_path)
return Database(db_path)

View File

@@ -3,43 +3,38 @@ import os
import pytest import pytest
import chatmaild.dictproxy import chatmaild.dictproxy
from chatmaild.dictproxy import get_user_data, lookup_passdb from chatmaild.dictproxy import DictProxy
from chatmaild.database import Database, DBError from chatmaild.database import DBError
@pytest.fixture() @pytest.fixture
def db(tmpdir): def dictproxy(db, maildomain):
db_path = tmpdir / "passdb.sqlite" return DictProxy(db, maildomain)
print("database path:", db_path)
return Database(db_path)
def test_basic(db): def test_basic(dictproxy, tmpdir, monkeypatch):
chatmaild.dictproxy.NOCREATE_FILE = "/tmp/nocreate" monkeypatch.setattr(
if os.path.exists(chatmaild.dictproxy.NOCREATE_FILE): chatmaild.dictproxy, "NOCREATE_FILE", tmpdir.join("nocreate").strpath
os.remove(chatmaild.dictproxy.NOCREATE_FILE) )
lookup_passdb(db, "link2xt@c1.testrun.org", "asdf") dictproxy.lookup_passdb("link2xt@c1.testrun.org", "asdf")
data = get_user_data(db, "link2xt@c1.testrun.org") assert dictproxy.get_user_data("link2xt@c1.testrun.org")
assert data
def test_dont_overwrite_password_on_wrong_login(db): def test_dont_overwrite_password_on_wrong_login(dictproxy):
"""Test that logging in with a different password doesn't create a new user""" """Test that logging in with a different password doesn't create a new user"""
res = lookup_passdb(db, "newuser1@something.org", "kajdlkajsldk12l3kj1983") res = dictproxy.lookup_passdb("newuser1@something.org", "kajdlkajsldk12l3kj1983")
assert res["password"] assert res["password"]
res2 = lookup_passdb(db, "newuser1@something.org", "kajdlqweqwe") res2 = dictproxy.lookup_passdb("newuser1@something.org", "kajdlqweqwe")
# this function always returns a password hash, which is actually compared by dovecot. # this function always returns a password hash, which is actually compared by dovecot.
assert res["password"] == res2["password"] assert res["password"] == res2["password"]
def test_nocreate_file(db): def test_nocreate_file(dictproxy, tmpdir, monkeypatch):
chatmaild.dictproxy.NOCREATE_FILE = "/tmp/nocreate" nocreate = tmpdir.join("nocreate")
with open(chatmaild.dictproxy.NOCREATE_FILE, "w+") as f: monkeypatch.setattr(chatmaild.dictproxy, "NOCREATE_FILE", str(nocreate))
f.write("") nocreate.write("")
assert os.path.exists(chatmaild.dictproxy.NOCREATE_FILE) dictproxy.lookup_passdb("newuser1@something.org", "kajdlqweqwe")
lookup_passdb(db, "newuser1@something.org", "kajdlqweqwe") assert not dictproxy.get_user_data("newuser1@something.org")
assert not get_user_data(db, "newuser1@something.org")
os.remove(chatmaild.dictproxy.NOCREATE_FILE)
def test_db_version(db): def test_db_version(db):