Compare commits

..

1 Commits

Author SHA1 Message Date
Mark Felder
0b21b83199 feat: metadata service: make turnserver socket path configurable
also add tests for the turnserver metadata
2026-02-17 11:55:30 -08:00
21 changed files with 138 additions and 329 deletions

View File

@@ -46,6 +46,7 @@ class Config:
self.acme_email = params.get("acme_email", "") self.acme_email = params.get("acme_email", "")
self.imap_rawlog = params.get("imap_rawlog", "false").lower() == "true" self.imap_rawlog = params.get("imap_rawlog", "false").lower() == "true"
self.imap_compress = params.get("imap_compress", "false").lower() == "true" self.imap_compress = params.get("imap_compress", "false").lower() == "true"
self.turn_socket_path = params.get("turn_socket_path", "/run/chatmail-turn/turn.socket")
if "iroh_relay" not in params: if "iroh_relay" not in params:
self.iroh_relay = "https://" + params["mail_domain"] self.iroh_relay = "https://" + params["mail_domain"]
self.enable_iroh_relay = True self.enable_iroh_relay = True

View File

@@ -1,11 +1,8 @@
import json import json
import logging import logging
import os import os
import re
import sys import sys
import filelock
try: try:
import crypt_r import crypt_r
except ImportError: except ImportError:
@@ -16,7 +13,6 @@ from .dictproxy import DictProxy
from .migrate_db import migrate_from_db_to_maildir from .migrate_db import migrate_from_db_to_maildir
NOCREATE_FILE = "/etc/chatmail-nocreate" NOCREATE_FILE = "/etc/chatmail-nocreate"
VALID_LOCALPART_RE = re.compile(r"^[a-z0-9._-]+$")
def encrypt_password(password: str): def encrypt_password(password: str):
@@ -56,10 +52,6 @@ def is_allowed_to_create(config: Config, user, cleartext_password) -> bool:
) )
return False return False
if not VALID_LOCALPART_RE.match(localpart):
logging.warning("localpart %r contains invalid characters", localpart)
return False
return True return True
@@ -148,11 +140,6 @@ class AuthDictProxy(DictProxy):
if not is_allowed_to_create(self.config, addr, cleartext_password): if not is_allowed_to_create(self.config, addr, cleartext_password):
return return
lock = filelock.FileLock(str(user.password_path) + ".lock", timeout=5)
with lock:
userdata = user.get_userdb_dict()
if userdata:
return userdata
user.set_password(encrypt_password(cleartext_password)) user.set_password(encrypt_password(cleartext_password))
print(f"Created address: {addr}", file=sys.stderr) print(f"Created address: {addr}", file=sys.stderr)
return user.get_userdb_dict() return user.get_userdb_dict()

View File

@@ -68,7 +68,7 @@ class Report:
for size in self.message_buckets: for size in self.message_buckets:
for msg in mailbox.messages: for msg in mailbox.messages:
if msg.size >= size: if msg.size >= size:
if self.mdir and f"/{self.mdir}/" not in msg.path: if self.mdir and not msg.relpath.startswith(self.mdir):
continue continue
self.message_buckets[size] += msg.size self.message_buckets[size] += msg.size

View File

@@ -55,6 +55,9 @@ passthrough_recipients =
# Deployment Details # Deployment Details
# #
# Path to the TURN server Unix socket
turn_socket_path = /run/chatmail-turn/turn.socket
# SMTP outgoing filtermail and reinjection # SMTP outgoing filtermail and reinjection
filtermail_smtp_port = 10080 filtermail_smtp_port = 10080
postfix_reinject_port = 10025 postfix_reinject_port = 10025

View File

@@ -76,12 +76,13 @@ class Metadata:
class MetadataDictProxy(DictProxy): class MetadataDictProxy(DictProxy):
def __init__(self, notifier, metadata, iroh_relay=None, turn_hostname=None): def __init__(self, notifier, metadata, iroh_relay=None, turn_hostname=None, config=None):
super().__init__() super().__init__()
self.notifier = notifier self.notifier = notifier
self.metadata = metadata self.metadata = metadata
self.iroh_relay = iroh_relay self.iroh_relay = iroh_relay
self.turn_hostname = turn_hostname self.turn_hostname = turn_hostname
self.config = config
def handle_lookup(self, parts): def handle_lookup(self, parts):
# Lpriv/43f5f508a7ea0366dff30200c15250e3/devicetoken\tlkj123poi@c2.testrun.org # Lpriv/43f5f508a7ea0366dff30200c15250e3/devicetoken\tlkj123poi@c2.testrun.org
@@ -101,11 +102,7 @@ class MetadataDictProxy(DictProxy):
# Handle `GETMETADATA "" /shared/vendor/deltachat/irohrelay` # Handle `GETMETADATA "" /shared/vendor/deltachat/irohrelay`
return f"O{self.iroh_relay}\n" return f"O{self.iroh_relay}\n"
elif keyname == "vendor/vendor.dovecot/pvt/server/vendor/deltachat/turn": elif keyname == "vendor/vendor.dovecot/pvt/server/vendor/deltachat/turn":
try: res = turn_credentials(self.config)
res = turn_credentials()
except Exception:
logging.exception("failed to get TURN credentials")
return "N\n"
port = 3478 port = 3478
return f"O{self.turn_hostname}:{port}:{res}\n" return f"O{self.turn_hostname}:{port}:{res}\n"
@@ -150,6 +147,7 @@ def main():
metadata=metadata, metadata=metadata,
iroh_relay=iroh_relay, iroh_relay=iroh_relay,
turn_hostname=mail_domain, turn_hostname=mail_domain,
config=config,
) )
dictproxy.serve_forever_from_socket(socket) dictproxy.serve_forever_from_socket(socket)

View File

@@ -3,6 +3,7 @@
"""CGI script for creating new accounts.""" """CGI script for creating new accounts."""
import json import json
import random
import secrets import secrets
import string import string
@@ -14,9 +15,7 @@ ALPHANUMERIC_PUNCT = string.ascii_letters + string.digits + string.punctuation
def create_newemail_dict(config: Config): def create_newemail_dict(config: Config):
user = "".join( user = "".join(random.choices(ALPHANUMERIC, k=config.username_max_length))
secrets.choice(ALPHANUMERIC) for _ in range(config.username_max_length)
)
password = "".join( password = "".join(
secrets.choice(ALPHANUMERIC_PUNCT) secrets.choice(ALPHANUMERIC_PUNCT)
for _ in range(config.password_min_length + 3) for _ in range(config.password_min_length + 3)

View File

@@ -120,60 +120,6 @@ def test_handle_dovecot_protocol_iterate(gencreds, example_config):
assert not lines[2] assert not lines[2]
def test_invalid_localpart_characters(make_config):
"""Test that is_allowed_to_create rejects localparts with invalid characters."""
config = make_config("chat.example.org", {"username_min_length": "3"})
password = "zequ0Aimuchoodaechik"
domain = config.mail_domain
# valid localparts
assert is_allowed_to_create(config, f"abc123@{domain}", password)
assert is_allowed_to_create(config, f"a.b-c_d@{domain}", password)
# uppercase rejected
assert not is_allowed_to_create(config, f"Abc123@{domain}", password)
assert not is_allowed_to_create(config, f"ABCDEFG@{domain}", password)
# spaces and special chars rejected
assert not is_allowed_to_create(config, f"a b cde@{domain}", password)
assert not is_allowed_to_create(config, f"abc+def@{domain}", password)
assert not is_allowed_to_create(config, f"abc!def@{domain}", password)
assert not is_allowed_to_create(config, f"ab@cdef@{domain}", password)
assert not is_allowed_to_create(config, f"abc/def@{domain}", password)
assert not is_allowed_to_create(config, f"abc\\def@{domain}", password)
def test_concurrent_creation_same_account(dictproxy):
"""Test that concurrent creation of the same account doesn't corrupt password."""
addr = "racetest1@chat.example.org"
password = "zequ0Aimuchoodaechik"
num_threads = 10
results = queue.Queue()
def create():
try:
res = dictproxy.lookup_passdb(addr, password)
results.put(("ok", res))
except Exception:
results.put(("err", traceback.format_exc()))
threads = [threading.Thread(target=create, daemon=True) for _ in range(num_threads)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
passwords_seen = set()
for _ in range(num_threads):
status, res = results.get()
if status == "err":
pytest.fail(f"concurrent creation failed\n{res}")
passwords_seen.add(res["password"])
# all threads must see the same password hash
assert len(passwords_seen) == 1
def test_50_concurrent_lookups_different_accounts(gencreds, dictproxy): def test_50_concurrent_lookups_different_accounts(gencreds, dictproxy):
num_threads = 50 num_threads = 50
req_per_thread = 5 req_per_thread = 5

View File

@@ -112,43 +112,6 @@ def test_report(mbox1, example_config):
report_main(args) report_main(args)
def test_report_mdir_filters_by_path(mbox1, example_config):
"""Test that Report with mdir='cur' only counts messages in cur/ subdirectory."""
from chatmaild.fsreport import Report
now = datetime.utcnow().timestamp()
# Set password mtime to old enough so min_login_age check passes
password = Path(mbox1.basedir).joinpath("password")
old_time = now - 86400 * 10 # 10 days ago
os.utime(password, (old_time, old_time))
# Reload mailbox with updated mtime
from chatmaild.expire import MailboxStat
mbox = MailboxStat(mbox1.basedir)
# Report without mdir — should count all messages
rep_all = Report(now=now, min_login_age=1, mdir=None)
rep_all.process_mailbox_stat(mbox)
total_all = rep_all.message_buckets[0]
# Report with mdir='cur' — should only count cur/ messages
rep_cur = Report(now=now, min_login_age=1, mdir="cur")
rep_cur.process_mailbox_stat(mbox)
total_cur = rep_cur.message_buckets[0]
# Report with mdir='new' — should only count new/ messages
rep_new = Report(now=now, min_login_age=1, mdir="new")
rep_new.process_mailbox_stat(mbox)
total_new = rep_new.message_buckets[0]
# cur has 500-byte msg, new has 600-byte msg (from fill_mbox)
assert total_cur == 500
assert total_new == 600
assert total_all == 500 + 600
def test_expiry_cli_basic(example_config, mbox1): def test_expiry_cli_basic(example_config, mbox1):
args = (str(example_config._inipath),) args = (str(example_config._inipath),)
expiry_main(args) expiry_main(args)

View File

@@ -314,51 +314,6 @@ def test_persistent_queue_items(tmp_path, testaddr, token):
assert not queue_item < item2 and not item2 < queue_item assert not queue_item < item2 and not item2 < queue_item
def test_turn_credentials_exception_returns_N(notifier, metadata, monkeypatch):
"""Test that turn_credentials() failure returns N\\n instead of crashing."""
import chatmaild.metadata
dictproxy = MetadataDictProxy(
notifier=notifier,
metadata=metadata,
turn_hostname="turn.example.org",
)
def mock_turn_credentials():
raise ConnectionRefusedError("socket not available")
monkeypatch.setattr(chatmaild.metadata, "turn_credentials", mock_turn_credentials)
transactions = {}
res = dictproxy.handle_dovecot_request(
"Lshared/0123/vendor/vendor.dovecot/pvt/server/vendor/deltachat/turn"
"\tuser@example.org",
transactions,
)
assert res == "N\n"
def test_turn_credentials_success(notifier, metadata, monkeypatch):
"""Test that valid turn_credentials() returns TURN URI."""
import chatmaild.metadata
dictproxy = MetadataDictProxy(
notifier=notifier,
metadata=metadata,
turn_hostname="turn.example.org",
)
monkeypatch.setattr(chatmaild.metadata, "turn_credentials", lambda: "user:pass")
transactions = {}
res = dictproxy.handle_dovecot_request(
"Lshared/0123/vendor/vendor.dovecot/pvt/server/vendor/deltachat/turn"
"\tuser@example.org",
transactions,
)
assert res == "Oturn.example.org:3478:user:pass\n"
def test_iroh_relay(dictproxy): def test_iroh_relay(dictproxy):
rfile = io.BytesIO( rfile = io.BytesIO(
b"\n".join( b"\n".join(

View File

@@ -1,73 +1,120 @@
"""Tests for turnserver functionality, particularly metadata integration."""
import socket import socket
import tempfile
import threading import threading
import time from pathlib import Path
from unittest.mock import patch
import pytest
from chatmaild.config import read_config, write_initial_config
from chatmaild.metadata import MetadataDictProxy, Metadata
from chatmaild.notifier import Notifier
from chatmaild.turnserver import turn_credentials from chatmaild.turnserver import turn_credentials
SOCKET_PATH = "/run/chatmail-turn/turn.socket"
def test_turn_credentials_function_with_custom_socket():
"""Test that turn_credentials function works with a custom socket path from config."""
# Create a temporary directory and socket file
temp_dir = Path(tempfile.mkdtemp())
temp_socket_path = temp_dir / "test_turn.socket"
# Create a mock TURN credentials server
def mock_server():
server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_sock.bind(str(temp_socket_path))
server_sock.listen(1)
# Accept connection and send mock credentials
conn, addr = server_sock.accept()
with conn:
conn.send(b"mock_turn_credentials_abc123\n")
server_sock.close()
# Start server in a background thread
server_thread = threading.Thread(target=mock_server, daemon=True)
server_thread.start()
# Create a config with custom socket path
config_path = temp_dir / "chatmail.ini"
write_initial_config(config_path, "test.example.org", {
"turn_socket_path": str(temp_socket_path)
})
config = read_config(config_path)
# Allow time for server to start
import time
time.sleep(0.01)
# Test that turn_credentials can connect using the config
credentials = turn_credentials(config)
assert credentials == "mock_turn_credentials_abc123"
server_thread.join(timeout=1) # Clean up thread
@pytest.fixture def test_metadata_turn_lookup_integration(tmp_path):
def turn_socket(tmp_path): """Test that metadata service properly handles TURN metadata lookups."""
"""Create a real Unix socket server at a temp path.""" # Create mock config with custom turn socket path
sock_path = str(tmp_path / "turn.socket") config_path = tmp_path / "chatmail.ini"
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) socket_path = tmp_path / "test_turn.socket"
server.bind(sock_path) write_initial_config(config_path, "example.org", {
server.listen(1) "turn_socket_path": str(socket_path)
yield sock_path, server })
server.close() config = read_config(config_path)
# Create mock TURN server to return credentials
def mock_turn_server():
import os
os.makedirs(socket_path.parent, exist_ok=True) # Ensure parent directory exists
def _call_turn_credentials(sock_path): server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
"""Call turn_credentials but connect to sock_path instead of hardcoded path.""" server_sock.bind(str(socket_path))
original_connect = socket.socket.connect server_sock.listen(1)
def patched_connect(self, address): # Accept connection and send mock credentials
if address == SOCKET_PATH: conn, addr = server_sock.accept()
address = sock_path with conn:
return original_connect(self, address) conn.send(b"test_creds_12345\n")
server_sock.close()
with patch.object(socket.socket, "connect", patched_connect): server_thread = threading.Thread(target=mock_turn_server, daemon=True)
return turn_credentials() server_thread.start()
import time
time.sleep(0.01) # Allow server to start
def test_turn_credentials_timeout(turn_socket): # Create a MetadataDictProxy with config
"""Server accepts but never responds — must raise socket.timeout.""" queue_dir = tmp_path / "queue"
sock_path, server = turn_socket queue_dir.mkdir()
notifier = Notifier(queue_dir)
metadata = Metadata(tmp_path / "vmail")
def accept_and_hang(): dict_proxy = MetadataDictProxy(
conn, _ = server.accept() notifier=notifier,
time.sleep(30) metadata=metadata,
conn.close() iroh_relay="https://example.org",
turn_hostname="example.org",
config=config
)
t = threading.Thread(target=accept_and_hang, daemon=True) # Simulate a lookup for TURN credentials using the correct format
t.start() # Input: "shared/0123/vendor/vendor.dovecot/pvt/server/vendor/deltachat/turn"
# After parts[0].split("/", 2):
# - keyparts[0] = "shared"
# - keyparts[1] = "0123"
# - keyparts[2] = "vendor/vendor.dovecot/pvt/server/vendor/deltachat/turn"
# So keyname = keyparts[2] should match "vendor/vendor.dovecot/pvt/server/vendor/deltachat/turn"
parts = [
"shared/0123/vendor/vendor.dovecot/pvt/server/vendor/deltachat/turn",
"dummy@user.org"
]
with pytest.raises(socket.timeout): # Call handle_lookup directly
_call_turn_credentials(sock_path) result = dict_proxy.handle_lookup(parts)
# Verify the response format is correct for TURN credentials
assert result.startswith("O") # Output response starts with 'O'
assert ":3478:" in result # Contains port 3478
assert "test_creds_12345" in result # Contains credentials returned by mock server
assert "example.org:3478:test_creds_12345" in result
def test_turn_credentials_connection_refused(tmp_path): server_thread.join(timeout=1) # Clean up thread
"""Socket file doesn't exist — must raise ConnectionRefusedError or FileNotFoundError."""
missing = str(tmp_path / "nonexistent.socket")
with pytest.raises((ConnectionRefusedError, FileNotFoundError)):
_call_turn_credentials(missing)
def test_turn_credentials_success(turn_socket):
"""Server responds with credentials — must return stripped string."""
sock_path, server = turn_socket
def respond():
conn, _ = server.accept()
conn.sendall(b"testuser:testpass\n")
conn.close()
t = threading.Thread(target=respond, daemon=True)
t.start()
result = _call_turn_credentials(sock_path)
assert result == "testuser:testpass"

View File

@@ -2,9 +2,8 @@
import socket import socket
def turn_credentials() -> str: def turn_credentials(config) -> str:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client_socket: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client_socket:
client_socket.settimeout(5) client_socket.connect(config.turn_socket_path)
client_socket.connect("/run/chatmail-turn/turn.socket")
with client_socket.makefile("rb") as file: with client_socket.makefile("rb") as file:
return file.readline().decode("utf-8").strip() return file.readline().decode("utf-8").strip()

View File

@@ -113,15 +113,24 @@ def run_cmd(args, out):
return 1 return 1
try: try:
out.check_call(cmd, env=env) retcode = out.check_call(cmd, env=env)
if args.website_only: if args.website_only:
if retcode == 0:
out.green("Website deployment completed.") out.green("Website deployment completed.")
else: else:
out.red("Website deployment failed.")
elif retcode == 0:
out.green("Deploy completed, call `cmdeploy dns` next.") out.green("Deploy completed, call `cmdeploy dns` next.")
return 0 elif not remote_data["acme_account_url"]:
out.red("Deploy completed but letsencrypt not configured")
out.red("Run 'cmdeploy run' again")
retcode = 0
else:
out.red("Deploy failed")
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
out.red("Deploy failed") out.red("Deploy failed")
return 1 retcode = 1
return retcode
def dns_cmd_options(parser): def dns_cmd_options(parser):

View File

@@ -264,9 +264,6 @@ class WebsiteDeployer(Deployer):
# if www_folder is a hugo page, build it # if www_folder is a hugo page, build it
if build_dir: if build_dir:
www_path = build_webpages(src_dir, build_dir, self.config) www_path = build_webpages(src_dir, build_dir, self.config)
if www_path is None:
logger.warning("Web page build failed, skipping website deployment")
return
# if it is not a hugo page, upload it as is # if it is not a hugo page, upload it as is
files.rsync( files.rsync(
f"{www_path}/", "/var/www/html", flags=["-avz", "--chown=www-data"] f"{www_path}/", "/var/www/html", flags=["-avz", "--chown=www-data"]

View File

@@ -37,9 +37,7 @@ class DovecotDeployer(Deployer):
restart = False if self.disable_mail else self.need_restart restart = False if self.disable_mail else self.need_restart
systemd.service( systemd.service(
name="Disable dovecot for now" name="Disable dovecot for now" if self.disable_mail else "Start and enable Dovecot",
if self.disable_mail
else "Start and enable Dovecot",
service="dovecot.service", service="dovecot.service",
running=False if self.disable_mail else True, running=False if self.disable_mail else True,
enabled=False if self.disable_mail else True, enabled=False if self.disable_mail else True,

View File

@@ -51,7 +51,7 @@ http {
include /etc/nginx/mime.types; include /etc/nginx/mime.types;
default_type application/octet-stream; default_type application/octet-stream;
ssl_protocols TLSv1.2 TLSv1.3; ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3;
ssl_prefer_server_ciphers on; ssl_prefer_server_ciphers on;
ssl_certificate /var/lib/acme/live/{{ config.domain_name }}/fullchain; ssl_certificate /var/lib/acme/live/{{ config.domain_name }}/fullchain;
ssl_certificate_key /var/lib/acme/live/{{ config.domain_name }}/privkey; ssl_certificate_key /var/lib/acme/live/{{ config.domain_name }}/privkey;

View File

@@ -83,9 +83,7 @@ class PostfixDeployer(Deployer):
server.shell( server.shell(
name="Validate postfix configuration", name="Validate postfix configuration",
# Extract stderr and quit with error if non-zero # Extract stderr and quit with error if non-zero
commands=[ commands=["""bash -c 'w=$(postconf 2>&1 >/dev/null); [[ -z "$w" ]] || { echo "$w"; false; }'"""],
"""bash -c 'w=$(postconf 2>&1 >/dev/null); [[ -z "$w" ]] || { echo "$w"; false; }'"""
],
) )
self.need_restart = need_restart self.need_restart = need_restart

View File

@@ -53,7 +53,7 @@ def get_dkim_entry(mail_domain, pre_command, dkim_selector):
print=log_progress, print=log_progress,
) )
except CalledProcessError: except CalledProcessError:
return None, None return
dkim_value_raw = f"v=DKIM1;k=rsa;p={dkim_pubkey};s=email;t=s" dkim_value_raw = f"v=DKIM1;k=rsa;p={dkim_pubkey};s=email;t=s"
dkim_value = '" "'.join(re.findall(".{1,255}", dkim_value_raw)) dkim_value = '" "'.join(re.findall(".{1,255}", dkim_value_raw))
web_dkim_value = "".join(re.findall(".{1,255}", dkim_value_raw)) web_dkim_value = "".join(re.findall(".{1,255}", dkim_value_raw))

View File

@@ -40,5 +40,5 @@ def dovecot_recalc_quota(user):
# #
for line in output.split("\n"): for line in output.split("\n"):
parts = line.split() parts = line.split()
if len(parts) >= 6 and parts[2] == "STORAGE": if parts[2] == "STORAGE":
return dict(value=int(parts[3]), limit=int(parts[4]), percent=int(parts[5])) return dict(value=int(parts[3]), limit=int(parts[4]), percent=int(parts[5]))

View File

@@ -60,29 +60,6 @@ def mockdns(request, mockdns_base, mockdns_expected):
return mockdns_base return mockdns_base
class TestGetDkimEntry:
def test_dkim_entry_returns_tuple_on_success(self, mockdns):
entry, web_entry = remote.rdns.get_dkim_entry(
"some.domain", "", dkim_selector="opendkim"
)
# May return None,None if openssl not available, but should never crash
if entry is not None:
assert "opendkim._domainkey.some.domain" in entry
assert "opendkim._domainkey.some.domain" in web_entry
def test_dkim_entry_returns_none_tuple_on_error(self, monkeypatch):
"""CalledProcessError must return (None, None), not bare None."""
from subprocess import CalledProcessError
def failing_shell(command, fail_ok=False, print=print):
raise CalledProcessError(1, command)
monkeypatch.setattr(remote.rdns, "shell", failing_shell)
result = remote.rdns.get_dkim_entry("some.domain", "", dkim_selector="opendkim")
assert result == (None, None)
assert result[0] is None and result[1] is None
class TestPerformInitialChecks: class TestPerformInitialChecks:
def test_perform_initial_checks_ok1(self, mockdns, mockdns_expected): def test_perform_initial_checks_ok1(self, mockdns, mockdns_expected):
remote_data = remote.rdns.perform_initial_checks("some.domain") remote_data = remote.rdns.perform_initial_checks("some.domain")

View File

@@ -1,68 +0,0 @@
from unittest.mock import patch
from cmdeploy.remote.rshell import dovecot_recalc_quota
def test_dovecot_recalc_quota_normal_output():
"""Normal doveadm output returns parsed dict."""
normal_output = (
"Quota name Type Value Limit %\n"
"User quota STORAGE 5 102400 0\n"
"User quota MESSAGE 2 - 0\n"
)
with patch("cmdeploy.remote.rshell.shell", return_value=normal_output):
result = dovecot_recalc_quota("user@example.org")
# shell is called twice (recalc + get), patch returns same for both
assert result == {"value": 5, "limit": 102400, "percent": 0}
def test_dovecot_recalc_quota_empty_output():
"""Empty doveadm output (trailing newline) must not IndexError."""
call_count = [0]
def mock_shell(cmd):
call_count[0] += 1
if "recalc" in cmd:
return ""
# quota get returns only empty lines
return "\n\n"
with patch("cmdeploy.remote.rshell.shell", side_effect=mock_shell):
result = dovecot_recalc_quota("user@example.org")
assert result is None
def test_dovecot_recalc_quota_malformed_output():
"""Malformed output with too few columns must not crash."""
call_count = [0]
def mock_shell(cmd):
call_count[0] += 1
if "recalc" in cmd:
return ""
# partial line, fewer than 6 parts
return "Quota name\nUser quota STORAGE\n"
with patch("cmdeploy.remote.rshell.shell", side_effect=mock_shell):
result = dovecot_recalc_quota("user@example.org")
assert result is None
def test_dovecot_recalc_quota_header_only():
"""Only header line, no data rows."""
call_count = [0]
def mock_shell(cmd):
call_count[0] += 1
if "recalc" in cmd:
return ""
return "Quota name Type Value Limit %\n"
with patch("cmdeploy.remote.rshell.shell", side_effect=mock_shell):
result = dovecot_recalc_quota("user@example.org")
assert result is None