Add better support for AS transactions (Fix #97)
- Process transactions async with completion parking - Detect transactions deduplication
This commit is contained in:
@@ -23,8 +23,10 @@ package io.kamax.mxisd.storage;
|
||||
import io.kamax.matrix.ThreePid;
|
||||
import io.kamax.mxisd.invitation.IThreePidInviteReply;
|
||||
import io.kamax.mxisd.storage.dao.IThreePidSessionDao;
|
||||
import io.kamax.mxisd.storage.ormlite.ThreePidInviteIO;
|
||||
import io.kamax.mxisd.storage.ormlite.dao.ASTransactionDao;
|
||||
import io.kamax.mxisd.storage.ormlite.dao.ThreePidInviteIO;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
|
||||
@@ -44,4 +46,8 @@ public interface IStorage {
|
||||
|
||||
void updateThreePidSession(IThreePidSessionDao session);
|
||||
|
||||
void insertTransactionResult(String localpart, String txnId, Instant completion, String response);
|
||||
|
||||
Optional<ASTransactionDao> getTransactionResult(String localpart, String txnId);
|
||||
|
||||
}
|
||||
|
||||
@@ -31,6 +31,8 @@ import io.kamax.mxisd.exception.InternalServerError;
|
||||
import io.kamax.mxisd.invitation.IThreePidInviteReply;
|
||||
import io.kamax.mxisd.storage.IStorage;
|
||||
import io.kamax.mxisd.storage.dao.IThreePidSessionDao;
|
||||
import io.kamax.mxisd.storage.ormlite.dao.ASTransactionDao;
|
||||
import io.kamax.mxisd.storage.ormlite.dao.ThreePidInviteIO;
|
||||
import io.kamax.mxisd.storage.ormlite.dao.ThreePidSessionDao;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
@@ -63,17 +66,21 @@ public class OrmLiteSqliteStorage implements IStorage {
|
||||
|
||||
private Dao<ThreePidInviteIO, String> invDao;
|
||||
private Dao<ThreePidSessionDao, String> sessionDao;
|
||||
private Dao<ASTransactionDao, String> asTxnDao;
|
||||
|
||||
OrmLiteSqliteStorage(String path) {
|
||||
public OrmLiteSqliteStorage(String backend, String path) {
|
||||
withCatcher(() -> {
|
||||
File parent = new File(path).getParentFile();
|
||||
if (!parent.mkdirs() && !parent.isDirectory()) {
|
||||
throw new RuntimeException("Unable to create DB parent directory: " + parent);
|
||||
if (path.startsWith("/") && !path.startsWith("//")) {
|
||||
File parent = new File(path).getParentFile();
|
||||
if (!parent.mkdirs() && !parent.isDirectory()) {
|
||||
throw new RuntimeException("Unable to create DB parent directory: " + parent);
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionSource connPool = new JdbcConnectionSource("jdbc:sqlite:" + path);
|
||||
ConnectionSource connPool = new JdbcConnectionSource("jdbc:" + backend + ":" + path);
|
||||
invDao = createDaoAndTable(connPool, ThreePidInviteIO.class);
|
||||
sessionDao = createDaoAndTable(connPool, ThreePidSessionDao.class);
|
||||
asTxnDao = createDaoAndTable(connPool, ASTransactionDao.class);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -178,4 +185,35 @@ public class OrmLiteSqliteStorage implements IStorage {
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insertTransactionResult(String localpart, String txnId, Instant completion, String result) {
|
||||
withCatcher(() -> {
|
||||
int created = asTxnDao.create(new ASTransactionDao(localpart, txnId, completion, result));
|
||||
if (created != 1) {
|
||||
throw new RuntimeException("Unexpected row count after DB action: " + created);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ASTransactionDao> getTransactionResult(String localpart, String txnId) {
|
||||
return withCatcher(() -> {
|
||||
ASTransactionDao dao = new ASTransactionDao();
|
||||
dao.setLocalpart(localpart);
|
||||
dao.setTransactionId(txnId);
|
||||
List<ASTransactionDao> daoList = asTxnDao.queryForMatchingArgs(dao);
|
||||
|
||||
if (daoList.size() > 1) {
|
||||
throw new InternalServerError("Lookup for Transaction " +
|
||||
txnId + " for localpart " + localpart + " returned more than one result");
|
||||
}
|
||||
|
||||
if (daoList.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return Optional.of(daoList.get(0));
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -45,13 +45,15 @@ public class OrmLiteSqliteStorageBeanFactory implements FactoryBean<IStorage> {
|
||||
|
||||
@PostConstruct
|
||||
private void postConstruct() {
|
||||
if (StringUtils.equals("sqlite", storagecfg.getBackend())) {
|
||||
if (StringUtils.isBlank(cfg.getDatabase())) {
|
||||
throw new ConfigurationException("storage.provider.sqlite.database");
|
||||
}
|
||||
|
||||
storage = new OrmLiteSqliteStorage(cfg.getDatabase());
|
||||
if (StringUtils.isBlank(storagecfg.getBackend())) {
|
||||
throw new ConfigurationException("storage.backend");
|
||||
}
|
||||
|
||||
if (StringUtils.equals("sqlite", storagecfg.getBackend()) && StringUtils.isBlank(cfg.getDatabase())) {
|
||||
throw new ConfigurationException("storage.provider.sqlite.database");
|
||||
}
|
||||
|
||||
storage = new OrmLiteSqliteStorage(storagecfg.getBackend(), cfg.getDatabase());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
/*
|
||||
* mxisd - Matrix Identity Server Daemon
|
||||
* Copyright (C) 2018 Kamax Sarl
|
||||
*
|
||||
* https://www.kamax.io/
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package io.kamax.mxisd.storage.ormlite.dao;
|
||||
|
||||
import com.j256.ormlite.field.DatabaseField;
|
||||
import com.j256.ormlite.table.DatabaseTable;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
@DatabaseTable(tableName = "as_txn")
|
||||
public class ASTransactionDao {
|
||||
|
||||
@DatabaseField(uniqueCombo = true)
|
||||
private String transactionId;
|
||||
|
||||
@DatabaseField(uniqueCombo = true)
|
||||
private String localpart;
|
||||
|
||||
@DatabaseField(canBeNull = false)
|
||||
private long timestamp;
|
||||
|
||||
@DatabaseField(canBeNull = false)
|
||||
private String result;
|
||||
|
||||
public ASTransactionDao() {
|
||||
// Needed for ORMLite
|
||||
}
|
||||
|
||||
public ASTransactionDao(String localpart, String txnId, Instant completion, String result) {
|
||||
setLocalpart(localpart);
|
||||
setTransactionId(txnId);
|
||||
setTimestamp(completion.toEpochMilli());
|
||||
setResult(result);
|
||||
}
|
||||
|
||||
public String getTransactionId() {
|
||||
return transactionId;
|
||||
}
|
||||
|
||||
public void setTransactionId(String transactionId) {
|
||||
this.transactionId = transactionId;
|
||||
}
|
||||
|
||||
public String getLocalpart() {
|
||||
return localpart;
|
||||
}
|
||||
|
||||
public void setLocalpart(String localpart) {
|
||||
this.localpart = localpart;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public void setTimestamp(long timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
public String getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
public void setResult(String result) {
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -18,12 +18,12 @@
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package io.kamax.mxisd.storage.ormlite;
|
||||
package io.kamax.mxisd.storage.ormlite.dao;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import com.j256.ormlite.field.DatabaseField;
|
||||
import com.j256.ormlite.table.DatabaseTable;
|
||||
import io.kamax.matrix.json.GsonUtil;
|
||||
import io.kamax.mxisd.invitation.IThreePidInviteReply;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
@@ -33,8 +33,6 @@ import java.util.Map;
|
||||
@DatabaseTable(tableName = "invite_3pid")
|
||||
public class ThreePidInviteIO {
|
||||
|
||||
private static Gson gson = new Gson();
|
||||
|
||||
@DatabaseField(id = true)
|
||||
private String id;
|
||||
|
||||
@@ -57,7 +55,7 @@ public class ThreePidInviteIO {
|
||||
private String properties;
|
||||
|
||||
public ThreePidInviteIO() {
|
||||
// needed for ORMlite
|
||||
// Needed for ORMLite
|
||||
}
|
||||
|
||||
public ThreePidInviteIO(IThreePidInviteReply data) {
|
||||
@@ -67,7 +65,7 @@ public class ThreePidInviteIO {
|
||||
this.medium = data.getInvite().getMedium();
|
||||
this.address = data.getInvite().getAddress();
|
||||
this.roomId = data.getInvite().getRoomId();
|
||||
this.properties = gson.toJson(data.getInvite().getProperties());
|
||||
this.properties = GsonUtil.get().toJson(data.getInvite().getProperties());
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
@@ -99,7 +97,7 @@ public class ThreePidInviteIO {
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
return gson.fromJson(properties, new TypeToken<Map<String, String>>() {
|
||||
return GsonUtil.get().fromJson(properties, new TypeToken<Map<String, String>>() {
|
||||
}.getType());
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ public class ThreePidSessionDao implements IThreePidSessionDao {
|
||||
private boolean isRemoteValidated;
|
||||
|
||||
public ThreePidSessionDao() {
|
||||
// stub for ORMLite
|
||||
// Needed for ORMLite
|
||||
}
|
||||
|
||||
public ThreePidSessionDao(IThreePidSessionDao session) {
|
||||
|
||||
Reference in New Issue
Block a user