Add mechanism for 3PID invites expiration (#120)
This commit is contained in:
@@ -31,6 +31,7 @@ import io.kamax.mxisd.config.MxisdConfig;
|
||||
import io.kamax.mxisd.config.ServerConfig;
|
||||
import io.kamax.mxisd.dns.FederationDnsOverwrite;
|
||||
import io.kamax.mxisd.exception.BadRequestException;
|
||||
import io.kamax.mxisd.exception.ConfigurationException;
|
||||
import io.kamax.mxisd.exception.MappingAlreadyExistsException;
|
||||
import io.kamax.mxisd.exception.ObjectNotFoundException;
|
||||
import io.kamax.mxisd.lookup.SingleLookupReply;
|
||||
@@ -41,6 +42,7 @@ import io.kamax.mxisd.profile.ProfileManager;
|
||||
import io.kamax.mxisd.storage.IStorage;
|
||||
import io.kamax.mxisd.storage.crypto.*;
|
||||
import io.kamax.mxisd.storage.ormlite.dao.ThreePidInviteIO;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@@ -63,6 +65,8 @@ import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.DateTimeException;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
@@ -70,7 +74,10 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class InvitationManager {
|
||||
|
||||
private transient final Logger log = LoggerFactory.getLogger(InvitationManager.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(InvitationManager.class);
|
||||
private static final String CreatedAtPropertyKey = "created_at";
|
||||
|
||||
private final String defaultCreateTs = Long.toString(Instant.now().toEpochMilli());
|
||||
|
||||
private InvitationConfig cfg;
|
||||
private ServerConfig srvCfg;
|
||||
@@ -97,7 +104,7 @@ public class InvitationManager {
|
||||
NotificationManager notifMgr,
|
||||
ProfileManager profileMgr
|
||||
) {
|
||||
this.cfg = mxisdCfg.getInvite();
|
||||
this.cfg = requireValid(mxisdCfg);
|
||||
this.srvCfg = mxisdCfg.getServer();
|
||||
this.storage = storage;
|
||||
this.lookupMgr = lookupMgr;
|
||||
@@ -110,6 +117,7 @@ public class InvitationManager {
|
||||
log.info("Loading saved invites");
|
||||
Collection<ThreePidInviteIO> ioList = storage.getInvites();
|
||||
ioList.forEach(io -> {
|
||||
io.getProperties().putIfAbsent(CreatedAtPropertyKey, defaultCreateTs);
|
||||
log.info("Processing invite {}", GsonUtil.get().toJson(io));
|
||||
ThreePidInvite invite = new ThreePidInvite(
|
||||
MatrixID.asAcceptable(io.getSender()),
|
||||
@@ -119,7 +127,7 @@ public class InvitationManager {
|
||||
io.getProperties()
|
||||
);
|
||||
|
||||
ThreePidInviteReply reply = new ThreePidInviteReply(getId(invite), invite, io.getToken(), "", Collections.emptyList());
|
||||
ThreePidInviteReply reply = new ThreePidInviteReply(io.getId(), invite, io.getToken(), "", Collections.emptyList());
|
||||
invitations.put(reply.getId(), reply);
|
||||
});
|
||||
|
||||
@@ -136,25 +144,64 @@ public class InvitationManager {
|
||||
|
||||
log.info("Setting up invitation mapping refresh timer");
|
||||
refreshTimer = new Timer();
|
||||
refreshTimer.scheduleAtFixedRate(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
lookupMappingsForInvites();
|
||||
} catch (Throwable t) {
|
||||
log.error("Error when running background mapping refresh", t);
|
||||
}
|
||||
}
|
||||
}, 5000L, TimeUnit.MILLISECONDS.convert(cfg.getResolution().getTimer(), TimeUnit.MINUTES));
|
||||
|
||||
// We add a shutdown hook to cancel the hook and wait for pending resolutions
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
refreshTimer.cancel();
|
||||
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.MINUTES);
|
||||
}));
|
||||
|
||||
// We set the refresh timer for background tasks
|
||||
refreshTimer.scheduleAtFixedRate(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
doMaintenance();
|
||||
} catch (Throwable t) {
|
||||
log.error("Error when running background maintenance", t);
|
||||
}
|
||||
}
|
||||
}, 5000L, TimeUnit.MILLISECONDS.convert(cfg.getResolution().getTimer(), TimeUnit.MINUTES));
|
||||
}
|
||||
|
||||
private String getId(IThreePidInvite invite) {
|
||||
return invite.getSender().getDomain().toLowerCase() + invite.getMedium().toLowerCase() + invite.getAddress().toLowerCase();
|
||||
private InvitationConfig requireValid(MxisdConfig cfg) {
|
||||
// This is not configured, we'll apply a default configuration
|
||||
if (Objects.isNull(cfg.getInvite().getExpiration().isEnabled())) {
|
||||
// We compute our own user, so it can be used if we bridge as well
|
||||
String mxId = MatrixID.asAcceptable("_mxisd-expired_invite", cfg.getMatrix().getDomain()).getId();
|
||||
|
||||
// Enabled by default
|
||||
cfg.getInvite().getExpiration().setEnabled(true);
|
||||
|
||||
// We'll resolve to our computed User ID
|
||||
cfg.getInvite().getExpiration().setResolveTo(mxId);
|
||||
|
||||
// One calendar week (60min/1h * 24 = 1d * 7 = 1w)
|
||||
cfg.getInvite().getExpiration().setAfter(60 * 24 * 7);
|
||||
}
|
||||
|
||||
if (cfg.getInvite().getExpiration().isEnabled()) {
|
||||
if (cfg.getInvite().getExpiration().getAfter() < 1) {
|
||||
throw new ConfigurationException("Invitation expiration delay must be greater or equal to 1");
|
||||
}
|
||||
|
||||
if (StringUtils.isBlank(cfg.getInvite().getExpiration().getResolveTo())) {
|
||||
throw new ConfigurationException("Invitation expiration resolution target cannot be empty/blank");
|
||||
}
|
||||
|
||||
try {
|
||||
MatrixID.asAcceptable(cfg.getInvite().getExpiration().getResolveTo());
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new ConfigurationException("Invitation expiration resolution target is not a valid Matrix ID: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
return cfg.getInvite();
|
||||
}
|
||||
|
||||
private String computeId(IThreePidInvite invite) {
|
||||
String rawId = invite.getSender().getDomain().toLowerCase() + invite.getMedium().toLowerCase() + invite.getAddress().toLowerCase();
|
||||
return Base64.encodeBase64URLSafeString(rawId.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
private String getIdForLog(IThreePidInviteReply reply) {
|
||||
@@ -248,7 +295,7 @@ public class InvitationManager {
|
||||
throw new BadRequestException("Medium type " + invitation.getMedium() + " is not supported");
|
||||
}
|
||||
|
||||
String invId = getId(invitation);
|
||||
String invId = computeId(invitation);
|
||||
log.info("Handling invite for {}:{} from {} in room {}", invitation.getMedium(), invitation.getAddress(), invitation.getSender(), invitation.getRoomId());
|
||||
IThreePidInviteReply reply = invitations.get(invId);
|
||||
if (reply != null) {
|
||||
@@ -276,6 +323,7 @@ public class InvitationManager {
|
||||
String pPubKey = keyMgr.getPublicKeyBase64(pKeyId);
|
||||
String ePubKey = keyMgr.getPublicKeyBase64(eKeyId);
|
||||
|
||||
invitation.getProperties().put(CreatedAtPropertyKey, Long.toString(Instant.now().toEpochMilli()));
|
||||
invitation.getProperties().put("p_key_algo", pKeyId.getAlgorithm());
|
||||
invitation.getProperties().put("p_key_serial", pKeyId.getSerial());
|
||||
invitation.getProperties().put("p_key_public", pPubKey);
|
||||
@@ -312,6 +360,58 @@ public class InvitationManager {
|
||||
return false;
|
||||
}
|
||||
|
||||
private void removeInvite(IThreePidInviteReply reply) {
|
||||
invitations.remove(reply.getId());
|
||||
storage.deleteInvite(reply.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger the periodic maintenance tasks
|
||||
*/
|
||||
public void doMaintenance() {
|
||||
lookupMappingsForInvites();
|
||||
expireInvites();
|
||||
}
|
||||
|
||||
public void expireInvites() {
|
||||
log.debug("Invite expiration: started");
|
||||
|
||||
if (!cfg.getExpiration().isEnabled()) {
|
||||
log.debug("Invite expiration is disabled, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
if (invitations.isEmpty()) {
|
||||
log.debug("No invite to expired, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
String targetMxid = cfg.getExpiration().getResolveTo();
|
||||
for (IThreePidInviteReply reply : invitations.values()) {
|
||||
log.debug("Processing invite {}", reply.getId());
|
||||
|
||||
String tsRaw = reply.getInvite().getProperties().computeIfAbsent(CreatedAtPropertyKey, k -> defaultCreateTs);
|
||||
try {
|
||||
Instant ts = Instant.ofEpochMilli(Long.parseLong(tsRaw));
|
||||
Instant targetTs = ts.plusSeconds(cfg.getExpiration().getAfter() * 60);
|
||||
Instant now = Instant.now();
|
||||
log.debug("Invite {} - Created at {} - Expire at {} - Current time is {}", reply.getId(), ts, targetTs, now);
|
||||
if (targetTs.isBefore(Instant.now())) {
|
||||
log.debug("Invite {} has not expired yet, skipping", reply.getId());
|
||||
continue;
|
||||
}
|
||||
|
||||
log.info("Invite {} has expired at TS {} - Expiring and resolving to {}", targetTs, targetMxid);
|
||||
publishMapping(reply, targetMxid);
|
||||
} catch (NumberFormatException | DateTimeException e) {
|
||||
log.warn("Invite {} has an invalid creation TS, setting to default value of {}", reply.getId(), defaultCreateTs);
|
||||
reply.getInvite().getProperties().put(CreatedAtPropertyKey, defaultCreateTs);
|
||||
}
|
||||
}
|
||||
|
||||
log.debug("Invite expiration: finished");
|
||||
}
|
||||
|
||||
public void lookupMappingsForInvites() {
|
||||
if (!invitations.isEmpty()) {
|
||||
log.info("Checking for existing mapping for pending invites");
|
||||
@@ -391,25 +491,32 @@ public class InvitationManager {
|
||||
StringEntity entity = new StringEntity(content.toString(), StandardCharsets.UTF_8);
|
||||
entity.setContentType("application/json");
|
||||
req.setEntity(entity);
|
||||
|
||||
Instant resolvedAt = Instant.now();
|
||||
boolean couldPublish = false;
|
||||
try {
|
||||
log.info("Posting onBind event to {}", req.getURI());
|
||||
CloseableHttpResponse response = client.execute(req);
|
||||
int statusCode = response.getStatusLine().getStatusCode();
|
||||
log.info("Answer code: {}", statusCode);
|
||||
if (statusCode >= 300 && statusCode != 403) {
|
||||
log.warn("Answer body: {}", IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8));
|
||||
log.info("Answer body: {}", IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8));
|
||||
log.warn("HS returned an error. Invite can be found in historical storage for manual re-processing");
|
||||
} else {
|
||||
couldPublish = true;
|
||||
if (statusCode == 403) {
|
||||
log.info("Invite was obsolete");
|
||||
log.info("Invite is obsolete or no longer under our control");
|
||||
}
|
||||
|
||||
invitations.remove(getId(reply.getInvite()));
|
||||
storage.deleteInvite(reply.getId());
|
||||
log.info("Removed invite from internal store");
|
||||
}
|
||||
response.close();
|
||||
} catch (IOException e) {
|
||||
log.warn("Unable to tell HS {} about invite being mapped", domain, e);
|
||||
} finally {
|
||||
synchronized (this) {
|
||||
storage.insertHistoricalInvite(reply, mxid, resolvedAt, couldPublish);
|
||||
removeInvite(reply);
|
||||
log.info("Moved invite {} to historical table", reply.getId());
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
@@ -425,7 +532,7 @@ public class InvitationManager {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
log.info("Searching for mapping created since invite {} was created", getIdForLog(reply));
|
||||
log.info("Searching for mapping created after invite {} was created", getIdForLog(reply));
|
||||
Optional<SingleLookupReply> result = lookup3pid(reply.getInvite().getMedium(), reply.getInvite().getAddress());
|
||||
if (result.isPresent()) {
|
||||
SingleLookupReply lookup = result.get();
|
||||
|
Reference in New Issue
Block a user