mirror of
https://github.com/chatmail/relay.git
synced 2026-05-19 20:38:05 +00:00
test: test concurrent user creation
This commit is contained in:
@@ -1,21 +1,15 @@
|
||||
import os
|
||||
import json
|
||||
|
||||
import pytest
|
||||
import threading
|
||||
import queue
|
||||
import traceback
|
||||
|
||||
import chatmaild.dictproxy
|
||||
from chatmaild.dictproxy import get_user_data, lookup_passdb, handle_dovecot_request
|
||||
from chatmaild.database import Database, DBError
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def db(tmpdir):
|
||||
db_path = tmpdir / "passdb.sqlite"
|
||||
print("database path:", db_path)
|
||||
return Database(db_path)
|
||||
|
||||
|
||||
|
||||
def test_basic(db):
|
||||
lookup_passdb(db, "link2xt@c1.testrun.org", "Pieg9aeToe3eghuthe5u")
|
||||
data = get_user_data(db, "link2xt@c1.testrun.org")
|
||||
@@ -53,8 +47,10 @@ def test_too_high_db_version(db):
|
||||
|
||||
|
||||
def test_handle_dovecot_request(db):
|
||||
msg = ('Lshared/passdb/laksjdlaksjdlaksjdlk12j3l1k2j3123/'
|
||||
'some42@c3.testrun.org\tsome42@c3.testrun.org')
|
||||
msg = (
|
||||
"Lshared/passdb/laksjdlaksjdlaksjdlk12j3l1k2j3123/"
|
||||
"some42@c3.testrun.org\tsome42@c3.testrun.org"
|
||||
)
|
||||
res = handle_dovecot_request(msg, db, "c3.testrun.org")
|
||||
assert res
|
||||
assert res[0] == "O" and res.endswith("\n")
|
||||
@@ -62,3 +58,29 @@ def test_handle_dovecot_request(db):
|
||||
assert userdata["home"] == "/home/vmail/some42@c3.testrun.org"
|
||||
assert userdata["uid"] == userdata["gid"] == "vmail"
|
||||
assert userdata["password"].startswith("{SHA512-CRYPT}")
|
||||
|
||||
|
||||
def test_100_concurrent_lookups(db):
|
||||
num = 100
|
||||
dbs = [Database(db.path) for i in range(num)]
|
||||
print(f"created {num} databases")
|
||||
results = queue.Queue()
|
||||
|
||||
def lookup(db):
|
||||
try:
|
||||
lookup_passdb(db, "something@c1.testrun.org", "Pieg9aeToe3eghuthe5u")
|
||||
except Exception:
|
||||
results.put(traceback.format_exc())
|
||||
else:
|
||||
results.put(None)
|
||||
|
||||
threads = [threading.Thread(target=lookup, args=(db,), daemon=True) for db in dbs]
|
||||
|
||||
print(f"created {num} threads, starting them and waiting for results")
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
|
||||
for _ in dbs:
|
||||
res = results.get()
|
||||
if res is not None:
|
||||
pytest.fail(f"concurrent lookup failed\n{res}")
|
||||
|
||||
Reference in New Issue
Block a user