Cache processing of bulk lookups and de-dup concurrent requests

This commit is contained in:
Max Dor
2019-02-04 06:04:39 +01:00
parent 559f6a7401
commit fbbafeb769
3 changed files with 35 additions and 10 deletions

View File

@@ -46,7 +46,7 @@ public class BulkLookupHandler extends LookupHandler {
}
@Override
public void handleRequest(HttpServerExchange exchange) {
public void handleRequest(HttpServerExchange exchange) throws Exception {
ClientBulkLookupRequest input = parseJsonTo(exchange, ClientBulkLookupRequest.class);
BulkLookupRequest lookupRequest = new BulkLookupRequest();
setRequesterInfo(lookupRequest, exchange);
@@ -63,7 +63,9 @@ public class BulkLookupHandler extends LookupHandler {
lookupRequest.setMappings(mappings);
ClientBulkLookupAnswer answer = new ClientBulkLookupAnswer();
answer.addAll(strategy.find(lookupRequest));
answer.addAll(strategy.find(lookupRequest).get());
log.info("Finished bulk lookup request from {}", lookupRequest.getRequester());
respondJson(exchange, answer);
}

View File

@@ -28,6 +28,7 @@ import io.kamax.mxisd.lookup.provider.IThreePidProvider;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public interface LookupStrategy {
@@ -43,6 +44,6 @@ public interface LookupStrategy {
Optional<SingleLookupReply> findRecursive(SingleLookupRequest request);
List<ThreePidMapping> find(BulkLookupRequest requests);
CompletableFuture<List<ThreePidMapping>> find(BulkLookupRequest requests);
}

View File

@@ -21,19 +21,21 @@
package io.kamax.mxisd.lookup.strategy;
import edazdarevic.commons.net.CIDRUtils;
import io.kamax.matrix.json.GsonUtil;
import io.kamax.matrix.json.MatrixJson;
import io.kamax.mxisd.config.MxisdConfig;
import io.kamax.mxisd.exception.ConfigurationException;
import io.kamax.mxisd.lookup.*;
import io.kamax.mxisd.lookup.fetcher.IBridgeFetcher;
import io.kamax.mxisd.lookup.provider.IThreePidProvider;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class RecursivePriorityLookupStrategy implements LookupStrategy {
@@ -44,6 +46,8 @@ public class RecursivePriorityLookupStrategy implements LookupStrategy {
private List<IThreePidProvider> providers;
private IBridgeFetcher bridge;
private Map<String, CompletableFuture<List<ThreePidMapping>>> bulkLookupInProgress = new ConcurrentHashMap<>();
private List<CIDRUtils> allowedCidr = new ArrayList<>();
public RecursivePriorityLookupStrategy(MxisdConfig.Lookup cfg, List<? extends IThreePidProvider> providers, IBridgeFetcher bridge) {
@@ -182,11 +186,27 @@ public class RecursivePriorityLookupStrategy implements LookupStrategy {
}
@Override
public List<ThreePidMapping> find(BulkLookupRequest request) {
public CompletableFuture<List<ThreePidMapping>> find(BulkLookupRequest request) {
if (!cfg.getBulk().getEnabled()) {
return Collections.emptyList();
return CompletableFuture.completedFuture(new ArrayList<>());
}
String payloadId = DigestUtils.md5Hex(MatrixJson.encodeCanonical(GsonUtil.makeObj(request)));
log.info("Computed Payload ID: {}", payloadId);
synchronized (this) {
CompletableFuture<List<ThreePidMapping>> f = bulkLookupInProgress.get(payloadId);
if (Objects.nonNull(f)) {
log.info("Returning existing future for Payload ID {}", payloadId);
return f;
}
bulkLookupInProgress.put(payloadId, new CompletableFuture<>());
}
log.info("Processing Payload ID {}", payloadId);
CompletableFuture<List<ThreePidMapping>> result = bulkLookupInProgress.get(payloadId);
List<ThreePidMapping> mapToDo = new ArrayList<>(request.getMappings());
List<ThreePidMapping> mapFoundAll = new ArrayList<>();
@@ -205,7 +225,9 @@ public class RecursivePriorityLookupStrategy implements LookupStrategy {
mapToDo.removeAll(mapFound);
}
return mapFoundAll;
log.info("Processed Payload ID {}", payloadId);
result.complete(mapFoundAll);
return bulkLookupInProgress.remove(payloadId);
}
}