mirror of
https://github.com/chatmail/relay.git
synced 2026-05-10 16:04:37 +00:00
basic remove-users functionality and tests
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
from pathlib import Path
|
||||
|
||||
import iniconfig
|
||||
|
||||
|
||||
@@ -25,10 +27,16 @@ class Config:
|
||||
self.privacy_mail = params.get("privacy_mail")
|
||||
self.privacy_pdo = params.get("privacy_pdo")
|
||||
self.privacy_supervisor = params.get("privacy_supervisor")
|
||||
self.mail_basedir = Path(f"/home/vmail/mail/{self.mail_domain}")
|
||||
|
||||
def _getbytefile(self):
|
||||
return open(self._inipath, "rb")
|
||||
|
||||
def get_user_maildir(self, addr):
|
||||
if not addr or "/" in addr:
|
||||
raise ValueError(addr)
|
||||
return self.mail_basedir.joinpath(addr)
|
||||
|
||||
|
||||
def write_initial_config(inipath, mail_domain):
|
||||
from importlib.resources import files
|
||||
|
||||
@@ -86,7 +86,7 @@ def lookup_userdb(db, config: Config, user):
|
||||
return get_user_data(db, config, user)
|
||||
|
||||
|
||||
def lookup_passdb(db, config: Config, user, cleartext_password):
|
||||
def lookup_passdb(db, config: Config, user, cleartext_password, last_login=None):
|
||||
if user == f"echo@{config.mail_domain}":
|
||||
# Echobot writes password it wants to log in with into /run/echobot/password
|
||||
try:
|
||||
@@ -102,12 +102,15 @@ def lookup_passdb(db, config: Config, user, cleartext_password):
|
||||
password=encrypt_password(password),
|
||||
)
|
||||
|
||||
if last_login is None:
|
||||
last_login = int(time.time())
|
||||
|
||||
with db.write_transaction() as conn:
|
||||
userdata = conn.get_user(user)
|
||||
if userdata:
|
||||
# Update last login time.
|
||||
conn.execute(
|
||||
"UPDATE users SET last_login=? WHERE addr=?", (int(time.time()), user)
|
||||
"UPDATE users SET last_login=? WHERE addr=?", (last_login, user)
|
||||
)
|
||||
|
||||
userdata["home"] = f"/home/vmail/mail/{config.mail_domain}/{user}"
|
||||
@@ -120,7 +123,7 @@ def lookup_passdb(db, config: Config, user, cleartext_password):
|
||||
encrypted_password = encrypt_password(cleartext_password)
|
||||
q = """INSERT INTO users (addr, password, last_login)
|
||||
VALUES (?, ?, ?)"""
|
||||
conn.execute(q, (user, encrypted_password, int(time.time())))
|
||||
conn.execute(q, (user, encrypted_password, last_login))
|
||||
print(f"Created account {user}", file=sys.stderr)
|
||||
return dict(
|
||||
home=f"/home/vmail/mail/{config.mail_domain}/{user}",
|
||||
@@ -130,7 +133,7 @@ def lookup_passdb(db, config: Config, user, cleartext_password):
|
||||
)
|
||||
|
||||
|
||||
def iter_userdb(db, config: Config) -> list:
|
||||
def iter_userdb(db) -> list:
|
||||
"""Get a list of all user addresses."""
|
||||
with db.read_connection() as conn:
|
||||
rows = conn.execute(
|
||||
@@ -139,6 +142,15 @@ def iter_userdb(db, config: Config) -> list:
|
||||
return [x[0] for x in rows]
|
||||
|
||||
|
||||
def iter_userdb_lastlogin_before(db, cutoff_date):
|
||||
"""Get a list of users where last login was before cutoff_date."""
|
||||
with db.read_connection() as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT addr FROM users WHERE last_login <?", (cutoff_date,)
|
||||
).fetchall()
|
||||
return [x[0] for x in rows]
|
||||
|
||||
|
||||
def split_and_unescape(s):
|
||||
"""Split strings using double quote as a separator and backslash as escape character
|
||||
into parts."""
|
||||
@@ -206,9 +218,7 @@ def handle_dovecot_request(msg, db, config: Config):
|
||||
# example: I0\t0\tshared/userdb/
|
||||
parts = msg[1:].split("\t")
|
||||
if parts[2] == "shared/userdb/":
|
||||
result = "".join(
|
||||
f"Oshared/userdb/{user}\t\n" for user in iter_userdb(db, config)
|
||||
)
|
||||
result = "".join(f"Oshared/userdb/{user}\t\n" for user in iter_userdb(db))
|
||||
return f"{result}\n"
|
||||
|
||||
raise UnknownCommand(msg)
|
||||
|
||||
28
chatmaild/src/chatmaild/remove_stale_users.py
Normal file
28
chatmaild/src/chatmaild/remove_stale_users.py
Normal file
@@ -0,0 +1,28 @@
|
||||
"""
|
||||
Remove old user accounts
|
||||
"""
|
||||
|
||||
import shutil
|
||||
import sys
|
||||
import time
|
||||
|
||||
from .config import read_config
|
||||
from .database import Database
|
||||
from .doveauth import get_stale_users
|
||||
|
||||
|
||||
def remove_user(db, config, user):
|
||||
user_mail_dir = config.get_user_maildir(user)
|
||||
shutil.rmtree(user_mail_dir, ignore_errors=True)
|
||||
with db.write_transaction() as conn:
|
||||
conn.execute("DELETE FROM users WHERE addr = ?", (user,))
|
||||
|
||||
|
||||
def main():
|
||||
db = Database(sys.argv[1])
|
||||
config = read_config(sys.argv[2])
|
||||
cutoff_date = time.time() - config.delete_accounts_after * 86400
|
||||
|
||||
for user in get_stale_users(db, cutoff_date):
|
||||
remove_user(db, config, user)
|
||||
print(f"Deleted user: {user}")
|
||||
@@ -1,3 +1,4 @@
|
||||
import pytest
|
||||
from chatmaild.config import read_config
|
||||
|
||||
|
||||
@@ -30,3 +31,19 @@ def test_read_config_testrun(make_config):
|
||||
assert config.password_min_length == 9
|
||||
assert "privacy@testrun.org" in config.passthrough_recipients
|
||||
assert config.passthrough_senders == []
|
||||
|
||||
|
||||
def test_get_user_maildir(make_config):
|
||||
config = make_config("something.testrun.org")
|
||||
assert config.mail_basedir.name == "something.testrun.org"
|
||||
assert config.mail_domain == "something.testrun.org"
|
||||
path = config.get_user_maildir("user1@something.testrun.org")
|
||||
assert not path.exists()
|
||||
assert path == config.mail_basedir.joinpath("user1@something.testrun.org")
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
config.get_user_maildir("")
|
||||
with pytest.raises(ValueError):
|
||||
config.get_user_maildir(None)
|
||||
with pytest.raises(ValueError):
|
||||
config.get_user_maildir("../some@something.testrun.org")
|
||||
|
||||
@@ -13,6 +13,7 @@ from chatmaild.doveauth import (
|
||||
handle_dovecot_request,
|
||||
is_allowed_to_create,
|
||||
iter_userdb,
|
||||
iter_userdb_lastlogin_before,
|
||||
lookup_passdb,
|
||||
)
|
||||
from chatmaild.newemail import create_newemail_dict
|
||||
@@ -34,10 +35,29 @@ def test_iterate_addresses(db, example_config):
|
||||
for i in range(10):
|
||||
addresses.append(f"asdf1234{i}@chat.example.org")
|
||||
lookup_passdb(db, example_config, addresses[-1], "q9mr3faue")
|
||||
res = iter_userdb(db, example_config)
|
||||
res = iter_userdb(db)
|
||||
assert res == addresses
|
||||
|
||||
|
||||
def test_iterate_addresses_lastlogin_before(db, example_config):
|
||||
addresses = []
|
||||
|
||||
cutoff_date = 1000
|
||||
for i in range(10):
|
||||
addr = f"oldold{i:03}@chat.example.org"
|
||||
lookup_passdb(
|
||||
db, example_config, addr, "q9mr3faue", last_login=cutoff_date - 10
|
||||
)
|
||||
addresses.append(addr)
|
||||
|
||||
for i in range(5):
|
||||
addr = f"newnew{i:03}@chat.example.org"
|
||||
lookup_passdb(db, example_config, addr, "q9mr3faue", last_login=cutoff_date + i)
|
||||
|
||||
res = iter_userdb_lastlogin_before(db, cutoff_date)
|
||||
assert sorted(res) == sorted(addresses)
|
||||
|
||||
|
||||
def test_invalid_username_length(example_config):
|
||||
config = example_config
|
||||
config.username_min_length = 6
|
||||
|
||||
Reference in New Issue
Block a user