MSC2140 Add SQL storage for hashes and the time-based rotation policy.
This commit is contained in:
@@ -4,14 +4,16 @@ import io.kamax.mxisd.config.HashingConfig;
|
||||
import io.kamax.mxisd.hash.rotation.HashRotationStrategy;
|
||||
import io.kamax.mxisd.hash.rotation.NoOpRotationStrategy;
|
||||
import io.kamax.mxisd.hash.rotation.RotationPerRequests;
|
||||
import io.kamax.mxisd.hash.rotation.TimeBasedRotation;
|
||||
import io.kamax.mxisd.hash.storage.EmptyStorage;
|
||||
import io.kamax.mxisd.hash.storage.HashStorage;
|
||||
import io.kamax.mxisd.hash.storage.InMemoryHashStorage;
|
||||
import io.kamax.mxisd.hash.storage.SqlHashStorage;
|
||||
import io.kamax.mxisd.lookup.provider.IThreePidProvider;
|
||||
import io.kamax.mxisd.storage.IStorage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@@ -23,10 +25,12 @@ public class HashManager {
|
||||
private HashRotationStrategy rotationStrategy;
|
||||
private HashStorage hashStorage;
|
||||
private HashingConfig config;
|
||||
private IStorage storage;
|
||||
private AtomicBoolean configured = new AtomicBoolean(false);
|
||||
|
||||
public void init(HashingConfig config, List<? extends IThreePidProvider> providers) {
|
||||
public void init(HashingConfig config, List<? extends IThreePidProvider> providers, IStorage storage) {
|
||||
this.config = config;
|
||||
this.storage = storage;
|
||||
initStorage();
|
||||
hashEngine = new HashEngine(providers, getHashStorage(), config);
|
||||
initRotationStrategy();
|
||||
@@ -39,6 +43,9 @@ public class HashManager {
|
||||
case IN_MEMORY:
|
||||
this.hashStorage = new InMemoryHashStorage();
|
||||
break;
|
||||
case SQL:
|
||||
this.hashStorage = new SqlHashStorage(storage);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown storage type: " + config.getHashStorageType());
|
||||
}
|
||||
@@ -53,6 +60,9 @@ public class HashManager {
|
||||
case PER_REQUESTS:
|
||||
this.rotationStrategy = new RotationPerRequests();
|
||||
break;
|
||||
case PER_SECONDS:
|
||||
this.rotationStrategy = new TimeBasedRotation(config.getDelay());
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown rotation type: " + config.getHashStorageType());
|
||||
}
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
package io.kamax.mxisd.hash.rotation;
|
||||
|
||||
import io.kamax.mxisd.hash.HashEngine;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class TimeBasedRotation implements HashRotationStrategy {
|
||||
|
||||
private final long delay;
|
||||
private HashEngine hashEngine;
|
||||
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
public TimeBasedRotation(long delay) {
|
||||
this.delay = delay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(HashEngine hashEngine) {
|
||||
this.hashEngine = hashEngine;
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(executorService::shutdown));
|
||||
executorService.scheduleWithFixedDelay(this::trigger, 0, delay, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HashEngine getHashEngine() {
|
||||
return hashEngine;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void newRequest() {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package io.kamax.mxisd.hash.storage;
|
||||
|
||||
import io.kamax.mxisd.lookup.ThreePidMapping;
|
||||
import io.kamax.mxisd.storage.IStorage;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
public class SqlHashStorage implements HashStorage {
|
||||
|
||||
private final IStorage storage;
|
||||
|
||||
public SqlHashStorage(IStorage storage) {
|
||||
this.storage = storage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Pair<String, ThreePidMapping>> find(Iterable<String> hashes) {
|
||||
return storage.findHashes(hashes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(ThreePidMapping pidMapping, String hash) {
|
||||
storage.addHash(pidMapping.getMxid(), pidMapping.getMedium(), pidMapping.getValue(), hash);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
storage.clearHashes();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user