Add pooling database connection for postgresql.
This commit is contained in:
@@ -51,10 +51,39 @@ key:
|
|||||||
# - /var/lib/ma1sd/store.db
|
# - /var/lib/ma1sd/store.db
|
||||||
#
|
#
|
||||||
storage:
|
storage:
|
||||||
|
# backend: sqlite # or postgresql
|
||||||
provider:
|
provider:
|
||||||
sqlite:
|
sqlite:
|
||||||
database: '/path/to/ma1sd.db'
|
database: '/path/to/ma1sd.db'
|
||||||
|
# postgresql:
|
||||||
|
# # Wrap all string values with quotes to avoid yaml parsing mistakes
|
||||||
|
# database: '//localhost/ma1sd' # or full variant //192.168.1.100:5432/ma1sd_database
|
||||||
|
# username: 'ma1sd_user'
|
||||||
|
# password: 'ma1sd_password'
|
||||||
|
#
|
||||||
|
# # Pool configuration for postgresql backend.
|
||||||
|
# #######
|
||||||
|
# # Enable or disable pooling
|
||||||
|
# pool: false
|
||||||
|
#
|
||||||
|
# #######
|
||||||
|
# # Check database connection before get from pool
|
||||||
|
# testBeforeGetFromPool: false # or true
|
||||||
|
#
|
||||||
|
# #######
|
||||||
|
# # There is an internal thread which checks each of the database connections as a keep-alive mechanism. This set the
|
||||||
|
# # number of milliseconds it sleeps between checks -- default is 30000. To disable the checking thread, set this to
|
||||||
|
# # 0 before you start using the connection source.
|
||||||
|
# checkConnectionsEveryMillis: 30000
|
||||||
|
#
|
||||||
|
# #######
|
||||||
|
# # Set the number of connections that can be unused in the available list.
|
||||||
|
# maxConnectionsFree: 5
|
||||||
|
#
|
||||||
|
# #######
|
||||||
|
# # Set the number of milliseconds that a connection can stay open before being closed. Set to 9223372036854775807 to have
|
||||||
|
# # the connections never expire.
|
||||||
|
# maxConnectionAgeMillis: 3600000
|
||||||
|
|
||||||
###################
|
###################
|
||||||
# Identity Stores #
|
# Identity Stores #
|
||||||
|
@@ -27,7 +27,6 @@ import io.kamax.mxisd.auth.AuthProviders;
|
|||||||
import io.kamax.mxisd.backend.IdentityStoreSupplier;
|
import io.kamax.mxisd.backend.IdentityStoreSupplier;
|
||||||
import io.kamax.mxisd.backend.sql.synapse.Synapse;
|
import io.kamax.mxisd.backend.sql.synapse.Synapse;
|
||||||
import io.kamax.mxisd.config.MxisdConfig;
|
import io.kamax.mxisd.config.MxisdConfig;
|
||||||
import io.kamax.mxisd.config.PostgresqlStorageConfig;
|
|
||||||
import io.kamax.mxisd.config.StorageConfig;
|
import io.kamax.mxisd.config.StorageConfig;
|
||||||
import io.kamax.mxisd.crypto.CryptoFactory;
|
import io.kamax.mxisd.crypto.CryptoFactory;
|
||||||
import io.kamax.mxisd.crypto.KeyManager;
|
import io.kamax.mxisd.crypto.KeyManager;
|
||||||
@@ -68,7 +67,7 @@ public class Mxisd {
|
|||||||
public static final String Version = StringUtils.defaultIfBlank(Mxisd.class.getPackage().getImplementationVersion(), "UNKNOWN");
|
public static final String Version = StringUtils.defaultIfBlank(Mxisd.class.getPackage().getImplementationVersion(), "UNKNOWN");
|
||||||
public static final String Agent = Name + "/" + Version;
|
public static final String Agent = Name + "/" + Version;
|
||||||
|
|
||||||
private MxisdConfig cfg;
|
private final MxisdConfig cfg;
|
||||||
|
|
||||||
private CloseableHttpClient httpClient;
|
private CloseableHttpClient httpClient;
|
||||||
private IRemoteIdentityServerFetcher srvFetcher;
|
private IRemoteIdentityServerFetcher srvFetcher;
|
||||||
@@ -113,17 +112,7 @@ public class Mxisd {
|
|||||||
|
|
||||||
StorageConfig.BackendEnum storageBackend = cfg.getStorage().getBackend();
|
StorageConfig.BackendEnum storageBackend = cfg.getStorage().getBackend();
|
||||||
StorageConfig.Provider storageProvider = cfg.getStorage().getProvider();
|
StorageConfig.Provider storageProvider = cfg.getStorage().getProvider();
|
||||||
switch (storageBackend) {
|
store = new OrmLiteSqlStorage(storageBackend, storageProvider);
|
||||||
case sqlite:
|
|
||||||
store = new OrmLiteSqlStorage(storageBackend, storageProvider.getSqlite().getDatabase());
|
|
||||||
break;
|
|
||||||
case postgresql:
|
|
||||||
PostgresqlStorageConfig postgresql = storageProvider.getPostgresql();
|
|
||||||
store = new OrmLiteSqlStorage(storageBackend, postgresql.getDatabase(), postgresql.getUsername(), postgresql.getPassword());
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new IllegalStateException("Storage provider hasn't been configured");
|
|
||||||
}
|
|
||||||
|
|
||||||
keyMgr = CryptoFactory.getKeyManager(cfg.getKey());
|
keyMgr = CryptoFactory.getKeyManager(cfg.getKey());
|
||||||
signMgr = CryptoFactory.getSignatureManager(cfg, keyMgr);
|
signMgr = CryptoFactory.getSignatureManager(cfg, keyMgr);
|
||||||
|
@@ -0,0 +1,5 @@
|
|||||||
|
package io.kamax.mxisd.config;
|
||||||
|
|
||||||
|
public interface DatabaseStorageConfig {
|
||||||
|
String getDatabase();
|
||||||
|
}
|
@@ -20,7 +20,7 @@
|
|||||||
|
|
||||||
package io.kamax.mxisd.config;
|
package io.kamax.mxisd.config;
|
||||||
|
|
||||||
public class PostgresqlStorageConfig {
|
public class PostgresqlStorageConfig implements DatabaseStorageConfig {
|
||||||
|
|
||||||
private String database;
|
private String database;
|
||||||
|
|
||||||
@@ -28,6 +28,17 @@ public class PostgresqlStorageConfig {
|
|||||||
|
|
||||||
private String password;
|
private String password;
|
||||||
|
|
||||||
|
private boolean pool;
|
||||||
|
|
||||||
|
private int maxConnectionsFree = 1;
|
||||||
|
|
||||||
|
private long maxConnectionAgeMillis = 60 * 60 * 1000;
|
||||||
|
|
||||||
|
private long checkConnectionsEveryMillis = 30 * 1000;
|
||||||
|
|
||||||
|
private boolean testBeforeGetFromPool = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getDatabase() {
|
public String getDatabase() {
|
||||||
return database;
|
return database;
|
||||||
}
|
}
|
||||||
@@ -51,4 +62,44 @@ public class PostgresqlStorageConfig {
|
|||||||
public void setPassword(String password) {
|
public void setPassword(String password) {
|
||||||
this.password = password;
|
this.password = password;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPool() {
|
||||||
|
return pool;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPool(boolean pool) {
|
||||||
|
this.pool = pool;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxConnectionsFree() {
|
||||||
|
return maxConnectionsFree;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxConnectionsFree(int maxConnectionsFree) {
|
||||||
|
this.maxConnectionsFree = maxConnectionsFree;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxConnectionAgeMillis() {
|
||||||
|
return maxConnectionAgeMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxConnectionAgeMillis(long maxConnectionAgeMillis) {
|
||||||
|
this.maxConnectionAgeMillis = maxConnectionAgeMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCheckConnectionsEveryMillis() {
|
||||||
|
return checkConnectionsEveryMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCheckConnectionsEveryMillis(long checkConnectionsEveryMillis) {
|
||||||
|
this.checkConnectionsEveryMillis = checkConnectionsEveryMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTestBeforeGetFromPool() {
|
||||||
|
return testBeforeGetFromPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTestBeforeGetFromPool(boolean testBeforeGetFromPool) {
|
||||||
|
this.testBeforeGetFromPool = testBeforeGetFromPool;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -20,10 +20,11 @@
|
|||||||
|
|
||||||
package io.kamax.mxisd.config;
|
package io.kamax.mxisd.config;
|
||||||
|
|
||||||
public class SQLiteStorageConfig {
|
public class SQLiteStorageConfig implements DatabaseStorageConfig {
|
||||||
|
|
||||||
private String database;
|
private String database;
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getDatabase() {
|
public String getDatabase() {
|
||||||
return database;
|
return database;
|
||||||
}
|
}
|
||||||
|
@@ -23,15 +23,17 @@ package io.kamax.mxisd.storage.ormlite;
|
|||||||
import com.j256.ormlite.dao.CloseableWrappedIterable;
|
import com.j256.ormlite.dao.CloseableWrappedIterable;
|
||||||
import com.j256.ormlite.dao.Dao;
|
import com.j256.ormlite.dao.Dao;
|
||||||
import com.j256.ormlite.dao.DaoManager;
|
import com.j256.ormlite.dao.DaoManager;
|
||||||
import com.j256.ormlite.db.DatabaseType;
|
|
||||||
import com.j256.ormlite.db.PostgresDatabaseType;
|
import com.j256.ormlite.db.PostgresDatabaseType;
|
||||||
import com.j256.ormlite.db.SqliteDatabaseType;
|
import com.j256.ormlite.db.SqliteDatabaseType;
|
||||||
import com.j256.ormlite.jdbc.JdbcConnectionSource;
|
import com.j256.ormlite.jdbc.JdbcConnectionSource;
|
||||||
|
import com.j256.ormlite.jdbc.JdbcPooledConnectionSource;
|
||||||
import com.j256.ormlite.stmt.QueryBuilder;
|
import com.j256.ormlite.stmt.QueryBuilder;
|
||||||
import com.j256.ormlite.support.ConnectionSource;
|
import com.j256.ormlite.support.ConnectionSource;
|
||||||
import com.j256.ormlite.table.TableUtils;
|
import com.j256.ormlite.table.TableUtils;
|
||||||
import io.kamax.matrix.ThreePid;
|
import io.kamax.matrix.ThreePid;
|
||||||
import io.kamax.mxisd.config.PolicyConfig;
|
import io.kamax.mxisd.config.PolicyConfig;
|
||||||
|
import io.kamax.mxisd.config.PostgresqlStorageConfig;
|
||||||
|
import io.kamax.mxisd.config.SQLiteStorageConfig;
|
||||||
import io.kamax.mxisd.config.StorageConfig;
|
import io.kamax.mxisd.config.StorageConfig;
|
||||||
import io.kamax.mxisd.exception.ConfigurationException;
|
import io.kamax.mxisd.exception.ConfigurationException;
|
||||||
import io.kamax.mxisd.exception.InternalServerError;
|
import io.kamax.mxisd.exception.InternalServerError;
|
||||||
@@ -98,34 +100,25 @@ public class OrmLiteSqlStorage implements IStorage {
|
|||||||
private Dao<ChangelogDao, String> changelogDao;
|
private Dao<ChangelogDao, String> changelogDao;
|
||||||
private StorageConfig.BackendEnum backend;
|
private StorageConfig.BackendEnum backend;
|
||||||
|
|
||||||
public OrmLiteSqlStorage(StorageConfig.BackendEnum backend, String path) {
|
public OrmLiteSqlStorage(StorageConfig.BackendEnum backend, StorageConfig.Provider provider) {
|
||||||
this(backend, path, null, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public OrmLiteSqlStorage(StorageConfig.BackendEnum backend, String database, String username, String password) {
|
|
||||||
if (backend == null) {
|
if (backend == null) {
|
||||||
throw new ConfigurationException("storage.backend");
|
throw new ConfigurationException("storage.backend");
|
||||||
}
|
}
|
||||||
this.backend = backend;
|
this.backend = backend;
|
||||||
|
|
||||||
if (StringUtils.isBlank(database)) {
|
|
||||||
throw new ConfigurationException("Storage destination cannot be empty");
|
|
||||||
}
|
|
||||||
|
|
||||||
withCatcher(() -> {
|
withCatcher(() -> {
|
||||||
DatabaseType databaseType;
|
ConnectionSource connPool;
|
||||||
switch (backend) {
|
switch (backend) {
|
||||||
case postgresql:
|
case postgresql:
|
||||||
databaseType = new PostgresDatabaseType();
|
connPool = createPostgresqlConnection(provider.getPostgresql());
|
||||||
break;
|
break;
|
||||||
case sqlite:
|
case sqlite:
|
||||||
databaseType = new SqliteDatabaseType();
|
connPool = createSqliteConnection(provider.getSqlite());
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new ConfigurationException("storage.backend");
|
throw new ConfigurationException("storage.backend");
|
||||||
}
|
}
|
||||||
|
|
||||||
ConnectionSource connPool = new JdbcConnectionSource("jdbc:" + backend + ":" + database, username, password, databaseType);
|
|
||||||
changelogDao = createDaoAndTable(connPool, ChangelogDao.class);
|
changelogDao = createDaoAndTable(connPool, ChangelogDao.class);
|
||||||
invDao = createDaoAndTable(connPool, ThreePidInviteIO.class);
|
invDao = createDaoAndTable(connPool, ThreePidInviteIO.class);
|
||||||
expInvDao = createDaoAndTable(connPool, HistoricalThreePidInviteIO.class);
|
expInvDao = createDaoAndTable(connPool, HistoricalThreePidInviteIO.class);
|
||||||
@@ -133,11 +126,40 @@ public class OrmLiteSqlStorage implements IStorage {
|
|||||||
asTxnDao = createDaoAndTable(connPool, ASTransactionDao.class);
|
asTxnDao = createDaoAndTable(connPool, ASTransactionDao.class);
|
||||||
accountDao = createDaoAndTable(connPool, AccountDao.class);
|
accountDao = createDaoAndTable(connPool, AccountDao.class);
|
||||||
acceptedDao = createDaoAndTable(connPool, AcceptedDao.class, true);
|
acceptedDao = createDaoAndTable(connPool, AcceptedDao.class, true);
|
||||||
hashDao = createDaoAndTable(connPool, HashDao.class);
|
hashDao = createDaoAndTable(connPool, HashDao.class, true);
|
||||||
runMigration(connPool);
|
runMigration(connPool);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ConnectionSource createSqliteConnection(SQLiteStorageConfig config) throws SQLException {
|
||||||
|
if (StringUtils.isBlank(config.getDatabase())) {
|
||||||
|
throw new ConfigurationException("Storage destination cannot be empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
return new JdbcConnectionSource("jdbc:" + backend + ":" + config.getDatabase(), null, null, new SqliteDatabaseType());
|
||||||
|
}
|
||||||
|
|
||||||
|
private ConnectionSource createPostgresqlConnection(PostgresqlStorageConfig config) throws SQLException {
|
||||||
|
if (StringUtils.isBlank(config.getDatabase())) {
|
||||||
|
throw new ConfigurationException("Storage destination cannot be empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (config.isPool()) {
|
||||||
|
LOGGER.info("Enable pooling");
|
||||||
|
JdbcPooledConnectionSource source = new JdbcPooledConnectionSource(
|
||||||
|
"jdbc:" + backend + ":" + config.getDatabase(), config.getUsername(), config.getPassword(),
|
||||||
|
new PostgresDatabaseType());
|
||||||
|
source.setMaxConnectionsFree(config.getMaxConnectionsFree());
|
||||||
|
source.setMaxConnectionAgeMillis(config.getMaxConnectionAgeMillis());
|
||||||
|
source.setCheckConnectionsEveryMillis(config.getCheckConnectionsEveryMillis());
|
||||||
|
source.setTestBeforeGet(config.isTestBeforeGetFromPool());
|
||||||
|
return source;
|
||||||
|
} else {
|
||||||
|
return new JdbcConnectionSource("jdbc:" + backend + ":" + config.getDatabase(), config.getUsername(), config.getPassword(),
|
||||||
|
new PostgresDatabaseType());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void runMigration(ConnectionSource connPol) throws SQLException {
|
private void runMigration(ConnectionSource connPol) throws SQLException {
|
||||||
ChangelogDao fixAcceptedDao = changelogDao.queryForId(Migrations.FIX_ACCEPTED_DAO);
|
ChangelogDao fixAcceptedDao = changelogDao.queryForId(Migrations.FIX_ACCEPTED_DAO);
|
||||||
if (fixAcceptedDao == null) {
|
if (fixAcceptedDao == null) {
|
||||||
|
@@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
package io.kamax.mxisd.test.storage;
|
package io.kamax.mxisd.test.storage;
|
||||||
|
|
||||||
|
import io.kamax.mxisd.config.SQLiteStorageConfig;
|
||||||
import io.kamax.mxisd.config.StorageConfig;
|
import io.kamax.mxisd.config.StorageConfig;
|
||||||
import io.kamax.mxisd.storage.ormlite.OrmLiteSqlStorage;
|
import io.kamax.mxisd.storage.ormlite.OrmLiteSqlStorage;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@@ -30,14 +31,22 @@ public class OrmLiteSqlStorageTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void insertAsTxnDuplicate() {
|
public void insertAsTxnDuplicate() {
|
||||||
OrmLiteSqlStorage store = new OrmLiteSqlStorage(StorageConfig.BackendEnum.sqlite, ":memory:");
|
StorageConfig.Provider provider = new StorageConfig.Provider();
|
||||||
|
SQLiteStorageConfig config = new SQLiteStorageConfig();
|
||||||
|
config.setDatabase(":memory:");
|
||||||
|
provider.setSqlite(config);
|
||||||
|
OrmLiteSqlStorage store = new OrmLiteSqlStorage(StorageConfig.BackendEnum.sqlite, provider);
|
||||||
store.insertTransactionResult("mxisd", "1", Instant.now(), "{}");
|
store.insertTransactionResult("mxisd", "1", Instant.now(), "{}");
|
||||||
store.insertTransactionResult("mxisd", "2", Instant.now(), "{}");
|
store.insertTransactionResult("mxisd", "2", Instant.now(), "{}");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = RuntimeException.class)
|
@Test(expected = RuntimeException.class)
|
||||||
public void insertAsTxnSame() {
|
public void insertAsTxnSame() {
|
||||||
OrmLiteSqlStorage store = new OrmLiteSqlStorage(StorageConfig.BackendEnum.sqlite, ":memory:");
|
StorageConfig.Provider provider = new StorageConfig.Provider();
|
||||||
|
SQLiteStorageConfig config = new SQLiteStorageConfig();
|
||||||
|
config.setDatabase(":memory:");
|
||||||
|
provider.setSqlite(config);
|
||||||
|
OrmLiteSqlStorage store = new OrmLiteSqlStorage(StorageConfig.BackendEnum.sqlite, provider);
|
||||||
store.insertTransactionResult("mxisd", "1", Instant.now(), "{}");
|
store.insertTransactionResult("mxisd", "1", Instant.now(), "{}");
|
||||||
store.insertTransactionResult("mxisd", "1", Instant.now(), "{}");
|
store.insertTransactionResult("mxisd", "1", Instant.now(), "{}");
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user