diff --git a/src/main/java/io/kamax/mxisd/as/AppServiceHandler.java b/src/main/java/io/kamax/mxisd/as/AppServiceHandler.java index 2bcacf0..9444564 100644 --- a/src/main/java/io/kamax/mxisd/as/AppServiceHandler.java +++ b/src/main/java/io/kamax/mxisd/as/AppServiceHandler.java @@ -28,18 +28,24 @@ import io.kamax.matrix._ThreePid; import io.kamax.matrix.event.EventKey; import io.kamax.matrix.json.GsonUtil; import io.kamax.mxisd.backend.sql.synapse.Synapse; +import io.kamax.mxisd.config.ListenerConfig; import io.kamax.mxisd.config.MatrixConfig; import io.kamax.mxisd.notification.NotificationManager; import io.kamax.mxisd.profile.ProfileManager; +import io.kamax.mxisd.storage.IStorage; +import io.kamax.mxisd.storage.ormlite.dao.ASTransactionDao; +import io.kamax.mxisd.util.GsonParser; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.io.InputStream; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @Component @@ -47,20 +53,83 @@ public class AppServiceHandler { private final Logger log = LoggerFactory.getLogger(AppServiceHandler.class); + private final GsonParser parser; + + private String localpart; private MatrixConfig cfg; + private IStorage store; private ProfileManager profiler; private NotificationManager notif; private Synapse synapse; + private Map> transactionsInProgress; + @Autowired - public AppServiceHandler(MatrixConfig cfg, ProfileManager profiler, NotificationManager notif, Synapse synapse) { + public AppServiceHandler(ListenerConfig lCfg, MatrixConfig cfg, IStorage store, ProfileManager profiler, NotificationManager notif, Synapse synapse) { this.cfg = cfg; + this.store = store; this.profiler = profiler; this.notif = notif; this.synapse = synapse; + + localpart = lCfg.getLocalpart(); + parser = new GsonParser(); + transactionsInProgress = new ConcurrentHashMap<>(); + } + + public CompletableFuture processTransaction(String txnId, InputStream is) { + synchronized (this) { + Optional dao = store.getTransactionResult(localpart, txnId); + if (dao.isPresent()) { + log.info("AS Transaction {} already processed - returning computed result", txnId); + return CompletableFuture.completedFuture(dao.get().getResult()); + } + + CompletableFuture f = transactionsInProgress.get(txnId); + if (Objects.nonNull(f)) { + log.info("Returning future for transaction {}", txnId); + return f; + } + + transactionsInProgress.put(txnId, new CompletableFuture<>()); + } + + CompletableFuture future = transactionsInProgress.get(txnId); + + Instant start = Instant.now(); + log.info("Processing AS Transaction {}: start", txnId); + try { + List events = GsonUtil.asList(GsonUtil.getArray(parser.parse(is), "events"), JsonObject.class); + is.close(); + log.debug("{} event(s) parsed", events.size()); + + processTransaction(events); + Instant end = Instant.now(); + log.info("Processed AS transaction {} in {} ms", txnId, (Instant.now().toEpochMilli() - start.toEpochMilli())); + + String result = "{}"; + + try { + log.info("Saving transaction details to store"); + store.insertTransactionResult(localpart, txnId, end, result); + } finally { + log.debug("Removing CompletedFuture from transaction map"); + transactionsInProgress.remove(txnId); + } + + future.complete(result); + } catch (Exception e) { + log.error("Unable to properly process transaction {}", txnId, e); + future.completeExceptionally(e); + } + + log.info("Processing AS Transaction {}: end", txnId); + return future; } public void processTransaction(List eventsJson) { + log.info("Processing transaction events: start"); + eventsJson.forEach(ev -> { String evId = EventKey.Id.getStringOrNull(ev); if (StringUtils.isBlank(evId)) { @@ -78,10 +147,11 @@ public class AppServiceHandler { String senderId = EventKey.Sender.getStringOrNull(ev); if (StringUtils.isBlank(senderId)) { - log.debug("Event has no room ID, skipping"); + log.debug("Event has no sender ID, skipping"); return; } _MatrixID sender = MatrixID.asAcceptable(senderId); + log.debug("Sender: {}", senderId); if (!StringUtils.equals("m.room.member", GsonUtil.getStringOrNull(ev, "type"))) { log.debug("This is not a room membership event, skipping"); @@ -105,7 +175,7 @@ public class AppServiceHandler { return; } - log.info("Got invite for {}", inviteeId); + log.info("Got invite from {} to {}", senderId, inviteeId); boolean wasSent = false; List<_ThreePid> tpids = profiler.getThreepids(invitee).stream() @@ -121,7 +191,7 @@ public class AppServiceHandler { synapse.getRoomName(roomId).ifPresent(name -> properties.put("room_name", name)); } catch (RuntimeException e) { log.warn("Could not fetch room name", e); - log.warn("Unable to fetch room name: Did you integrate your Homeserver as documented?"); + log.info("Unable to fetch room name: Did you integrate your Homeserver as documented?"); } IMatrixIdInvite inv = new MatrixIdInvite(roomId, sender, invitee, tpid.getMedium(), tpid.getAddress(), properties); @@ -134,6 +204,8 @@ public class AppServiceHandler { log.debug("Event {}: processing end", evId); }); + + log.info("Processing transaction events: end"); } } diff --git a/src/main/java/io/kamax/mxisd/config/AsyncConfig.java b/src/main/java/io/kamax/mxisd/config/AsyncConfig.java new file mode 100644 index 0000000..1736c1e --- /dev/null +++ b/src/main/java/io/kamax/mxisd/config/AsyncConfig.java @@ -0,0 +1,36 @@ +/* + * mxisd - Matrix Identity Server Daemon + * Copyright (C) 2018 Kamax Sarl + * + * https://www.kamax.io/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.kamax.mxisd.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter; + +@Configuration +public class AsyncConfig extends WebMvcConfigurerAdapter { + + @Override + public void configureAsyncSupport(AsyncSupportConfigurer configurer) { + configurer.setDefaultTimeout(60 * 60 * 1000); // 1h in milliseconds + super.configureAsyncSupport(configurer); + } + +} diff --git a/src/main/java/io/kamax/mxisd/controller/app/v1/AppServiceController.java b/src/main/java/io/kamax/mxisd/controller/app/v1/AppServiceController.java index 183151c..3fbbc2a 100644 --- a/src/main/java/io/kamax/mxisd/controller/app/v1/AppServiceController.java +++ b/src/main/java/io/kamax/mxisd/controller/app/v1/AppServiceController.java @@ -20,7 +20,6 @@ package io.kamax.mxisd.controller.app.v1; -import com.google.gson.JsonObject; import io.kamax.matrix.json.GsonUtil; import io.kamax.mxisd.as.AppServiceHandler; import io.kamax.mxisd.config.ListenerConfig; @@ -36,7 +35,8 @@ import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import java.util.List; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; import static org.springframework.web.bind.annotation.RequestMethod.GET; import static org.springframework.web.bind.annotation.RequestMethod.PUT; @@ -89,23 +89,19 @@ public class AppServiceController { } @RequestMapping(value = "/transactions/{txnId:.+}", method = PUT) - public String getTransaction( + public CompletableFuture getTransaction( HttpServletRequest request, @RequestParam(name = "access_token", required = false) String token, - @PathVariable String txnId) { + @PathVariable String txnId + ) { + validateToken(token); + try { - validateToken(token); - - log.info("Transaction {}: Processing start", txnId); - List events = GsonUtil.asList(GsonUtil.getArray(parser.parse(request.getInputStream()), "events"), JsonObject.class); - log.debug("Transaction {}: {} events to process", txnId, events.size()); - handler.processTransaction(events); - log.info("Transaction {}: Processing end", txnId); - } catch (Throwable e) { - log.error("Unable to properly process transaction {}", txnId, e); + log.info("Received AS transaction {}", txnId); + return handler.processTransaction(txnId, request.getInputStream()); + } catch (IOException e) { + throw new RuntimeException("AS Transaction " + txnId + ": I/O error when getting input", e); } - - return "{}"; } } diff --git a/src/main/java/io/kamax/mxisd/invitation/InvitationManager.java b/src/main/java/io/kamax/mxisd/invitation/InvitationManager.java index 198d122..e1abe9f 100644 --- a/src/main/java/io/kamax/mxisd/invitation/InvitationManager.java +++ b/src/main/java/io/kamax/mxisd/invitation/InvitationManager.java @@ -34,7 +34,7 @@ import io.kamax.mxisd.lookup.ThreePidMapping; import io.kamax.mxisd.lookup.strategy.LookupStrategy; import io.kamax.mxisd.notification.NotificationManager; import io.kamax.mxisd.storage.IStorage; -import io.kamax.mxisd.storage.ormlite.ThreePidInviteIO; +import io.kamax.mxisd.storage.ormlite.dao.ThreePidInviteIO; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.StringUtils; diff --git a/src/main/java/io/kamax/mxisd/storage/IStorage.java b/src/main/java/io/kamax/mxisd/storage/IStorage.java index ea83a58..80d35c4 100644 --- a/src/main/java/io/kamax/mxisd/storage/IStorage.java +++ b/src/main/java/io/kamax/mxisd/storage/IStorage.java @@ -23,8 +23,10 @@ package io.kamax.mxisd.storage; import io.kamax.matrix.ThreePid; import io.kamax.mxisd.invitation.IThreePidInviteReply; import io.kamax.mxisd.storage.dao.IThreePidSessionDao; -import io.kamax.mxisd.storage.ormlite.ThreePidInviteIO; +import io.kamax.mxisd.storage.ormlite.dao.ASTransactionDao; +import io.kamax.mxisd.storage.ormlite.dao.ThreePidInviteIO; +import java.time.Instant; import java.util.Collection; import java.util.Optional; @@ -44,4 +46,8 @@ public interface IStorage { void updateThreePidSession(IThreePidSessionDao session); + void insertTransactionResult(String localpart, String txnId, Instant completion, String response); + + Optional getTransactionResult(String localpart, String txnId); + } diff --git a/src/main/java/io/kamax/mxisd/storage/ormlite/OrmLiteSqliteStorage.java b/src/main/java/io/kamax/mxisd/storage/ormlite/OrmLiteSqliteStorage.java index 0195f28..25d6cab 100644 --- a/src/main/java/io/kamax/mxisd/storage/ormlite/OrmLiteSqliteStorage.java +++ b/src/main/java/io/kamax/mxisd/storage/ormlite/OrmLiteSqliteStorage.java @@ -31,6 +31,8 @@ import io.kamax.mxisd.exception.InternalServerError; import io.kamax.mxisd.invitation.IThreePidInviteReply; import io.kamax.mxisd.storage.IStorage; import io.kamax.mxisd.storage.dao.IThreePidSessionDao; +import io.kamax.mxisd.storage.ormlite.dao.ASTransactionDao; +import io.kamax.mxisd.storage.ormlite.dao.ThreePidInviteIO; import io.kamax.mxisd.storage.ormlite.dao.ThreePidSessionDao; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.sql.SQLException; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -63,17 +66,21 @@ public class OrmLiteSqliteStorage implements IStorage { private Dao invDao; private Dao sessionDao; + private Dao asTxnDao; - OrmLiteSqliteStorage(String path) { + public OrmLiteSqliteStorage(String backend, String path) { withCatcher(() -> { - File parent = new File(path).getParentFile(); - if (!parent.mkdirs() && !parent.isDirectory()) { - throw new RuntimeException("Unable to create DB parent directory: " + parent); + if (path.startsWith("/") && !path.startsWith("//")) { + File parent = new File(path).getParentFile(); + if (!parent.mkdirs() && !parent.isDirectory()) { + throw new RuntimeException("Unable to create DB parent directory: " + parent); + } } - ConnectionSource connPool = new JdbcConnectionSource("jdbc:sqlite:" + path); + ConnectionSource connPool = new JdbcConnectionSource("jdbc:" + backend + ":" + path); invDao = createDaoAndTable(connPool, ThreePidInviteIO.class); sessionDao = createDaoAndTable(connPool, ThreePidSessionDao.class); + asTxnDao = createDaoAndTable(connPool, ASTransactionDao.class); }); } @@ -178,4 +185,35 @@ public class OrmLiteSqliteStorage implements IStorage { }); } + @Override + public void insertTransactionResult(String localpart, String txnId, Instant completion, String result) { + withCatcher(() -> { + int created = asTxnDao.create(new ASTransactionDao(localpart, txnId, completion, result)); + if (created != 1) { + throw new RuntimeException("Unexpected row count after DB action: " + created); + } + }); + } + + @Override + public Optional getTransactionResult(String localpart, String txnId) { + return withCatcher(() -> { + ASTransactionDao dao = new ASTransactionDao(); + dao.setLocalpart(localpart); + dao.setTransactionId(txnId); + List daoList = asTxnDao.queryForMatchingArgs(dao); + + if (daoList.size() > 1) { + throw new InternalServerError("Lookup for Transaction " + + txnId + " for localpart " + localpart + " returned more than one result"); + } + + if (daoList.isEmpty()) { + return Optional.empty(); + } + + return Optional.of(daoList.get(0)); + }); + } + } diff --git a/src/main/java/io/kamax/mxisd/storage/ormlite/OrmLiteSqliteStorageBeanFactory.java b/src/main/java/io/kamax/mxisd/storage/ormlite/OrmLiteSqliteStorageBeanFactory.java index 4c231c1..33ba7f3 100644 --- a/src/main/java/io/kamax/mxisd/storage/ormlite/OrmLiteSqliteStorageBeanFactory.java +++ b/src/main/java/io/kamax/mxisd/storage/ormlite/OrmLiteSqliteStorageBeanFactory.java @@ -45,13 +45,15 @@ public class OrmLiteSqliteStorageBeanFactory implements FactoryBean { @PostConstruct private void postConstruct() { - if (StringUtils.equals("sqlite", storagecfg.getBackend())) { - if (StringUtils.isBlank(cfg.getDatabase())) { - throw new ConfigurationException("storage.provider.sqlite.database"); - } - - storage = new OrmLiteSqliteStorage(cfg.getDatabase()); + if (StringUtils.isBlank(storagecfg.getBackend())) { + throw new ConfigurationException("storage.backend"); } + + if (StringUtils.equals("sqlite", storagecfg.getBackend()) && StringUtils.isBlank(cfg.getDatabase())) { + throw new ConfigurationException("storage.provider.sqlite.database"); + } + + storage = new OrmLiteSqliteStorage(storagecfg.getBackend(), cfg.getDatabase()); } @Override diff --git a/src/main/java/io/kamax/mxisd/storage/ormlite/dao/ASTransactionDao.java b/src/main/java/io/kamax/mxisd/storage/ormlite/dao/ASTransactionDao.java new file mode 100644 index 0000000..58e1af1 --- /dev/null +++ b/src/main/java/io/kamax/mxisd/storage/ormlite/dao/ASTransactionDao.java @@ -0,0 +1,86 @@ +/* + * mxisd - Matrix Identity Server Daemon + * Copyright (C) 2018 Kamax Sarl + * + * https://www.kamax.io/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.kamax.mxisd.storage.ormlite.dao; + +import com.j256.ormlite.field.DatabaseField; +import com.j256.ormlite.table.DatabaseTable; + +import java.time.Instant; + +@DatabaseTable(tableName = "as_txn") +public class ASTransactionDao { + + @DatabaseField(uniqueCombo = true) + private String transactionId; + + @DatabaseField(uniqueCombo = true) + private String localpart; + + @DatabaseField(canBeNull = false) + private long timestamp; + + @DatabaseField(canBeNull = false) + private String result; + + public ASTransactionDao() { + // Needed for ORMLite + } + + public ASTransactionDao(String localpart, String txnId, Instant completion, String result) { + setLocalpart(localpart); + setTransactionId(txnId); + setTimestamp(completion.toEpochMilli()); + setResult(result); + } + + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + public String getLocalpart() { + return localpart; + } + + public void setLocalpart(String localpart) { + this.localpart = localpart; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } + +} diff --git a/src/main/java/io/kamax/mxisd/storage/ormlite/ThreePidInviteIO.java b/src/main/java/io/kamax/mxisd/storage/ormlite/dao/ThreePidInviteIO.java similarity index 89% rename from src/main/java/io/kamax/mxisd/storage/ormlite/ThreePidInviteIO.java rename to src/main/java/io/kamax/mxisd/storage/ormlite/dao/ThreePidInviteIO.java index 3ff290f..7ea9f78 100644 --- a/src/main/java/io/kamax/mxisd/storage/ormlite/ThreePidInviteIO.java +++ b/src/main/java/io/kamax/mxisd/storage/ormlite/dao/ThreePidInviteIO.java @@ -18,12 +18,12 @@ * along with this program. If not, see . */ -package io.kamax.mxisd.storage.ormlite; +package io.kamax.mxisd.storage.ormlite.dao; -import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import com.j256.ormlite.field.DatabaseField; import com.j256.ormlite.table.DatabaseTable; +import io.kamax.matrix.json.GsonUtil; import io.kamax.mxisd.invitation.IThreePidInviteReply; import org.apache.commons.lang.StringUtils; @@ -33,8 +33,6 @@ import java.util.Map; @DatabaseTable(tableName = "invite_3pid") public class ThreePidInviteIO { - private static Gson gson = new Gson(); - @DatabaseField(id = true) private String id; @@ -57,7 +55,7 @@ public class ThreePidInviteIO { private String properties; public ThreePidInviteIO() { - // needed for ORMlite + // Needed for ORMLite } public ThreePidInviteIO(IThreePidInviteReply data) { @@ -67,7 +65,7 @@ public class ThreePidInviteIO { this.medium = data.getInvite().getMedium(); this.address = data.getInvite().getAddress(); this.roomId = data.getInvite().getRoomId(); - this.properties = gson.toJson(data.getInvite().getProperties()); + this.properties = GsonUtil.get().toJson(data.getInvite().getProperties()); } public String getId() { @@ -99,7 +97,7 @@ public class ThreePidInviteIO { return new HashMap<>(); } - return gson.fromJson(properties, new TypeToken>() { + return GsonUtil.get().fromJson(properties, new TypeToken>() { }.getType()); } diff --git a/src/main/java/io/kamax/mxisd/storage/ormlite/dao/ThreePidSessionDao.java b/src/main/java/io/kamax/mxisd/storage/ormlite/dao/ThreePidSessionDao.java index 8a6274a..62c85f4 100644 --- a/src/main/java/io/kamax/mxisd/storage/ormlite/dao/ThreePidSessionDao.java +++ b/src/main/java/io/kamax/mxisd/storage/ormlite/dao/ThreePidSessionDao.java @@ -80,7 +80,7 @@ public class ThreePidSessionDao implements IThreePidSessionDao { private boolean isRemoteValidated; public ThreePidSessionDao() { - // stub for ORMLite + // Needed for ORMLite } public ThreePidSessionDao(IThreePidSessionDao session) { diff --git a/src/test/java/io/kamax/mxisd/test/storage/OrmLiteSqliteStorageTest.java b/src/test/java/io/kamax/mxisd/test/storage/OrmLiteSqliteStorageTest.java new file mode 100644 index 0000000..a83e5ae --- /dev/null +++ b/src/test/java/io/kamax/mxisd/test/storage/OrmLiteSqliteStorageTest.java @@ -0,0 +1,44 @@ +/* + * mxisd - Matrix Identity Server Daemon + * Copyright (C) 2018 Kamax Sarl + * + * https://www.kamax.io/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.kamax.mxisd.test.storage; + +import io.kamax.mxisd.storage.ormlite.OrmLiteSqliteStorage; +import org.junit.Test; + +import java.time.Instant; + +public class OrmLiteSqliteStorageTest { + + @Test + public void insertAsTxnDuplicate() { + OrmLiteSqliteStorage store = new OrmLiteSqliteStorage("sqlite", ":memory:"); + store.insertTransactionResult("mxisd", "1", Instant.now(), "{}"); + store.insertTransactionResult("mxisd", "2", Instant.now(), "{}"); + } + + @Test(expected = RuntimeException.class) + public void insertAsTxnSame() { + OrmLiteSqliteStorage store = new OrmLiteSqliteStorage("sqlite", ":memory:"); + store.insertTransactionResult("mxisd", "1", Instant.now(), "{}"); + store.insertTransactionResult("mxisd", "1", Instant.now(), "{}"); + } + +}