NC-1721: Filter timeout if not queried for 10 minutes (#66)

Lucas Saldanha 6 years ago committed by GitHub
parent 793f149f96
commit 03f92e3eb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/JsonRpcTestMethodsFactory.java
  2. 5
      ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java
  3. 6
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/JsonRpcMethodsFactory.java
  4. 28
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/BlockFilter.java
  5. 41
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/Filter.java
  6. 172
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManager.java
  7. 72
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterRepository.java
  8. 21
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTimeoutMonitor.java
  9. 51
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/LogFilter.java
  10. 28
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/PendingTransactionFilter.java
  11. 3
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java
  12. 6
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/AbstractEthJsonRpcHttpServiceTest.java
  13. 34
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManagerLogFilterTest.java
  14. 49
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManagerTest.java
  15. 191
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterRepositoryTest.java
  16. 36
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTest.java
  17. 64
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTimeoutMonitorTest.java
  18. 29
      pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java

@ -14,6 +14,7 @@ import tech.pegasys.pantheon.ethereum.db.DefaultMutableBlockchain;
import tech.pegasys.pantheon.ethereum.db.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
@ -66,7 +67,8 @@ public class JsonRpcTestMethodsFactory {
final P2PNetwork peerDiscovery = mock(P2PNetwork.class);
final TransactionPool transactionPool = mock(TransactionPool.class);
final FilterManager filterManager =
new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator());
new FilterManager(
blockchainQueries, transactionPool, new FilterIdGenerator(), new FilterRepository());
final EthHashMiningCoordinator miningCoordinator = mock(EthHashMiningCoordinator.class);
return new JsonRpcMethodsFactory()

@ -25,6 +25,7 @@ import tech.pegasys.pantheon.ethereum.db.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.JsonRpcRequest;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.EthGetFilterChanges;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.parameters.JsonRpcParameter;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries;
@ -81,7 +82,9 @@ public class EthGetFilterChangesIntegrationTest {
transactions, genesisConfig.getProtocolSchedule(), protocolContext, batchAddedListener);
final BlockchainQueries blockchainQueries =
new BlockchainQueries(blockchain, worldStateArchive);
filterManager = new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator());
filterManager =
new FilterManager(
blockchainQueries, transactionPool, new FilterIdGenerator(), new FilterRepository());
method = new EthGetFilterChanges(filterManager, parameters);
}

@ -6,7 +6,6 @@ import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.core.TransactionPool;
import tech.pegasys.pantheon.ethereum.db.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.RpcApis;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.AdminPeers;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.DebugStorageRangeAt;
@ -85,11 +84,10 @@ public class JsonRpcMethodsFactory {
final ProtocolSchedule<?> protocolSchedule,
final AbstractMiningCoordinator<?, ?> miningCoordinator,
final Set<Capability> supportedCapabilities,
final Collection<RpcApis> rpcApis) {
final Collection<RpcApis> rpcApis,
final FilterManager filterManager) {
final BlockchainQueries blockchainQueries =
new BlockchainQueries(blockchain, worldStateArchive);
final FilterManager filterManager =
new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator());
return methods(
clientVersion,
chainId,

@ -0,0 +1,28 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
import tech.pegasys.pantheon.ethereum.core.Hash;
import java.util.ArrayList;
import java.util.List;
/** Tracks new blocks being added to the blockchain. */
class BlockFilter extends Filter {
private final List<Hash> blockHashes = new ArrayList<>();
BlockFilter(final String id) {
super(id);
}
void addBlockHash(final Hash hash) {
blockHashes.add(hash);
}
List<Hash> blockHashes() {
return blockHashes;
}
void clearBlockHashes() {
blockHashes.clear();
}
}

@ -0,0 +1,41 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
import java.time.Duration;
import java.time.Instant;
import com.google.common.annotations.VisibleForTesting;
abstract class Filter {
private static final Duration DEFAULT_EXPIRE_DURATION = Duration.ofMinutes(10);
private final String id;
private Instant expireTime;
Filter(final String id) {
this.id = id;
resetExpireTime();
}
String getId() {
return id;
}
void resetExpireTime() {
this.expireTime = Instant.now().plus(DEFAULT_EXPIRE_DURATION);
}
boolean isExpired() {
return Instant.now().isAfter(expireTime);
}
@VisibleForTesting
void setExpireTime(final Instant expireTime) {
this.expireTime = expireTime;
}
@VisibleForTesting
Instant getExpireTime() {
return expireTime;
}
}

@ -12,118 +12,52 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.LogWithMetadata;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.annotations.VisibleForTesting;
import io.vertx.core.AbstractVerticle;
/** Manages JSON-RPC filter events. */
public class FilterManager {
public class FilterManager extends AbstractVerticle {
private final Map<String, BlockFilter> blockFilters = new ConcurrentHashMap<>();
private final Map<String, PendingTransactionFilter> pendingTransactionFilters =
new ConcurrentHashMap<>();
private final Map<String, LogFilter> logFilters = new ConcurrentHashMap<>();
private static final int FILTER_TIMEOUT_CHECK_TIMER = 10000;
private final FilterIdGenerator filterIdGenerator;
/** Tracks new blocks being added to the blockchain. */
private static class BlockFilter {
private final List<Hash> blockHashes = new ArrayList<>();
BlockFilter() {}
void addBlockHash(final Hash hash) {
blockHashes.add(hash);
}
List<Hash> blockHashes() {
return blockHashes;
}
void clearBlockHashes() {
blockHashes.clear();
}
}
/** Tracks new pending transactions that have arrived in the transaction pool */
private static class PendingTransactionFilter {
private final List<Hash> transactionHashes = new ArrayList<>();
PendingTransactionFilter() {}
void addTransactionHash(final Hash hash) {
transactionHashes.add(hash);
}
List<Hash> transactionHashes() {
return transactionHashes;
}
void clearTransactionHashes() {
transactionHashes.clear();
}
}
/** Tracks new log events. */
private static class LogFilter {
private final BlockParameter fromBlock;
private final BlockParameter toBlock;
private final LogsQuery logsQuery;
private final List<LogWithMetadata> logs = new ArrayList<>();
LogFilter(
final BlockParameter fromBlock, final BlockParameter toBlock, final LogsQuery logsQuery) {
this.fromBlock = fromBlock;
this.toBlock = toBlock;
this.logsQuery = logsQuery;
}
public BlockParameter getFromBlock() {
return fromBlock;
}
public BlockParameter getToBlock() {
return toBlock;
}
public LogsQuery getLogsQuery() {
return logsQuery;
}
void addLog(final List<LogWithMetadata> logs) {
this.logs.addAll(logs);
}
List<LogWithMetadata> logs() {
return logs;
}
void clearLogs() {
logs.clear();
}
}
private final FilterRepository filterRepository;
private final BlockchainQueries blockchainQueries;
public FilterManager(
final BlockchainQueries blockchainQueries,
final TransactionPool transactionPool,
final FilterIdGenerator filterIdGenerator) {
final FilterIdGenerator filterIdGenerator,
final FilterRepository filterRepository) {
this.filterIdGenerator = filterIdGenerator;
this.filterRepository = filterRepository;
checkNotNull(blockchainQueries.getBlockchain());
blockchainQueries.getBlockchain().observeBlockAdded(this::recordBlockEvent);
transactionPool.addTransactionListener(this::recordPendingTransactionEvent);
this.blockchainQueries = blockchainQueries;
}
@Override
public void start() {
startFilterTimeoutTimer();
}
@Override
public void stop() {
filterRepository.deleteAll();
}
private void startFilterTimeoutTimer() {
vertx.setPeriodic(
FILTER_TIMEOUT_CHECK_TIMER,
timerId ->
vertx.executeBlocking(
future -> new FilterTimeoutMonitor(filterRepository).checkFilters(), result -> {}));
}
/**
* Installs a new block filter
*
@ -131,7 +65,7 @@ public class FilterManager {
*/
public String installBlockFilter() {
final String filterId = filterIdGenerator.nextId();
blockFilters.put(filterId, new BlockFilter());
filterRepository.save(new BlockFilter(filterId));
return filterId;
}
@ -142,7 +76,7 @@ public class FilterManager {
*/
public String installPendingTransactionFilter() {
final String filterId = filterIdGenerator.nextId();
pendingTransactionFilters.put(filterId, new PendingTransactionFilter());
filterRepository.save(new PendingTransactionFilter(filterId));
return filterId;
}
@ -157,7 +91,7 @@ public class FilterManager {
public String installLogFilter(
final BlockParameter fromBlock, final BlockParameter toBlock, final LogsQuery logsQuery) {
final String filterId = filterIdGenerator.nextId();
logFilters.put(filterId, new LogFilter(fromBlock, toBlock, logsQuery));
filterRepository.save(new LogFilter(filterId, fromBlock, toBlock, logsQuery));
return filterId;
}
@ -168,15 +102,19 @@ public class FilterManager {
* @return {@code true} if the filter was successfully removed; otherwise {@code false}
*/
public boolean uninstallFilter(final String filterId) {
return blockFilters.remove(filterId) != null
|| pendingTransactionFilters.remove(filterId) != null
|| logFilters.remove(filterId) != null;
if (filterRepository.exists(filterId)) {
filterRepository.delete(filterId);
return true;
} else {
return false;
}
}
public void recordBlockEvent(final BlockAddedEvent event, final Blockchain blockchain) {
final Hash blockHash = event.getBlock().getHash();
Collection<BlockFilter> blockFilters = filterRepository.getFiltersOfType(BlockFilter.class);
blockFilters.forEach(
(filterId, filter) -> {
(filter) -> {
synchronized (filter) {
filter.addBlockHash(blockHash);
}
@ -186,8 +124,9 @@ public class FilterManager {
}
private void checkBlockchainForMatchingLogsForFilters() {
Collection<LogFilter> logFilters = filterRepository.getFiltersOfType(LogFilter.class);
logFilters.forEach(
(filterId, filter) -> {
(filter) -> {
final long headBlockNumber = blockchainQueries.headBlockNumber();
final long toBlockNumber =
filter.getToBlock().getNumber().orElse(blockchainQueries.headBlockNumber());
@ -199,12 +138,14 @@ public class FilterManager {
@VisibleForTesting
void recordPendingTransactionEvent(final Transaction transaction) {
Collection<PendingTransactionFilter> pendingTransactionFilters =
filterRepository.getFiltersOfType(PendingTransactionFilter.class);
if (pendingTransactionFilters.isEmpty()) {
return;
}
pendingTransactionFilters.forEach(
(filterId, filter) -> {
(filter) -> {
synchronized (filter) {
filter.addTransactionHash(transaction.hash());
}
@ -218,7 +159,7 @@ public class FilterManager {
* @return the new block hashes that have occurred since the filter was last checked
*/
public List<Hash> blockChanges(final String filterId) {
final BlockFilter filter = blockFilters.get(filterId);
final BlockFilter filter = filterRepository.getFilter(filterId, BlockFilter.class).orElse(null);
if (filter == null) {
return null;
}
@ -227,6 +168,7 @@ public class FilterManager {
synchronized (filter) {
hashes = new ArrayList<>(filter.blockHashes());
filter.clearBlockHashes();
filter.resetExpireTime();
}
return hashes;
}
@ -238,7 +180,8 @@ public class FilterManager {
* @return the new pending transaction hashes that have occurred since the filter was last checked
*/
public List<Hash> pendingTransactionChanges(final String filterId) {
final PendingTransactionFilter filter = pendingTransactionFilters.get(filterId);
final PendingTransactionFilter filter =
filterRepository.getFilter(filterId, PendingTransactionFilter.class).orElse(null);
if (filter == null) {
return null;
}
@ -247,12 +190,13 @@ public class FilterManager {
synchronized (filter) {
hashes = new ArrayList<>(filter.transactionHashes());
filter.clearTransactionHashes();
filter.resetExpireTime();
}
return hashes;
}
public List<LogWithMetadata> logsChanges(final String filterId) {
final LogFilter filter = logFilters.get(filterId);
final LogFilter filter = filterRepository.getFilter(filterId, LogFilter.class).orElse(null);
if (filter == null) {
return null;
}
@ -261,14 +205,17 @@ public class FilterManager {
synchronized (filter) {
logs = new ArrayList<>(filter.logs());
filter.clearLogs();
filter.resetExpireTime();
}
return logs;
}
public List<LogWithMetadata> logs(final String filterId) {
final LogFilter filter = logFilters.get(filterId);
final LogFilter filter = filterRepository.getFilter(filterId, LogFilter.class).orElse(null);
if (filter == null) {
return null;
} else {
filter.resetExpireTime();
}
final long fromBlockNumber =
@ -278,19 +225,4 @@ public class FilterManager {
return blockchainQueries.matchingLogs(fromBlockNumber, toBlockNumber, filter.getLogsQuery());
}
@VisibleForTesting
int blockFilterCount() {
return blockFilters.size();
}
@VisibleForTesting
int pendingTransactionFilterCount() {
return pendingTransactionFilters.size();
}
@VisibleForTesting
int logFilterCount() {
return logFilters.size();
}
}

@ -0,0 +1,72 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class FilterRepository {
private final Map<String, Filter> filters = new ConcurrentHashMap<>();
public FilterRepository() {}
Collection<Filter> getFilters() {
return new ArrayList<>(filters.values());
}
<T extends Filter> Collection<T> getFiltersOfType(final Class<T> filterClass) {
return filters
.values()
.stream()
.flatMap(f -> getIfTypeMatches(f, filterClass).map(Stream::of).orElseGet(Stream::empty))
.collect(Collectors.toList());
}
<T extends Filter> Optional<T> getFilter(final String filterId, final Class<T> filterClass) {
final Filter filter = filters.get(filterId);
return getIfTypeMatches(filter, filterClass);
}
@SuppressWarnings("unchecked")
private <T extends Filter> Optional<T> getIfTypeMatches(
final Filter filter, final Class<T> filterClass) {
if (filter == null) {
return Optional.empty();
}
if (!filterClass.isAssignableFrom(filter.getClass())) {
return Optional.empty();
}
return Optional.of((T) filter);
}
boolean exists(final String id) {
return filters.containsKey(id);
}
void save(final Filter filter) {
if (filter == null) {
throw new IllegalArgumentException("Can't save null filter");
}
if (exists(filter.getId())) {
throw new IllegalArgumentException(
String.format("Filter with id %s already exists", filter.getId()));
}
filters.put(filter.getId(), filter);
}
void delete(final String id) {
filters.remove(id);
}
void deleteAll() {
filters.clear();
}
}

@ -0,0 +1,21 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
class FilterTimeoutMonitor {
private final FilterRepository filterRepository;
FilterTimeoutMonitor(final FilterRepository filterRepository) {
this.filterRepository = filterRepository;
}
void checkFilters() {
filterRepository
.getFilters()
.forEach(
filter -> {
if (filter.isExpired()) {
filterRepository.delete(filter.getId());
}
});
}
}

@ -0,0 +1,51 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.parameters.BlockParameter;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.LogWithMetadata;
import java.util.ArrayList;
import java.util.List;
class LogFilter extends Filter {
private final BlockParameter fromBlock;
private final BlockParameter toBlock;
private final LogsQuery logsQuery;
private final List<LogWithMetadata> logs = new ArrayList<>();
LogFilter(
final String id,
final BlockParameter fromBlock,
final BlockParameter toBlock,
final LogsQuery logsQuery) {
super(id);
this.fromBlock = fromBlock;
this.toBlock = toBlock;
this.logsQuery = logsQuery;
}
public BlockParameter getFromBlock() {
return fromBlock;
}
public BlockParameter getToBlock() {
return toBlock;
}
public LogsQuery getLogsQuery() {
return logsQuery;
}
void addLog(final List<LogWithMetadata> logs) {
this.logs.addAll(logs);
}
List<LogWithMetadata> logs() {
return logs;
}
void clearLogs() {
logs.clear();
}
}

@ -0,0 +1,28 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
import tech.pegasys.pantheon.ethereum.core.Hash;
import java.util.ArrayList;
import java.util.List;
/** Tracks new pending transactions that have arrived in the transaction pool */
class PendingTransactionFilter extends Filter {
private final List<Hash> transactionHashes = new ArrayList<>();
PendingTransactionFilter(final String id) {
super(id);
}
void addTransactionHash(final Hash hash) {
transactionHashes.add(hash);
}
List<Hash> transactionHashes() {
return transactionHashes;
}
void clearTransactionHashes() {
transactionHashes.clear();
}
}

@ -25,9 +25,6 @@ import org.apache.logging.log4j.Logger;
/**
* The SubscriptionManager is responsible for managing subscriptions and sending messages to the
* clients that have an active subscription subscription.
*
* <p>TODO: The logic to send a notification to a client that has an active subscription TODO:
* handle connection close (remove subscriptions)
*/
public class SubscriptionManager extends AbstractVerticle {

@ -24,6 +24,7 @@ import tech.pegasys.pantheon.ethereum.eth.EthProtocol;
import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.RpcApis;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
@ -146,8 +147,11 @@ public abstract class AbstractEthJsonRpcHttpServiceTest {
final BlockchainQueries blockchainQueries = new BlockchainQueries(blockchain, stateArchive);
final FilterIdGenerator filterIdGenerator = mock(FilterIdGenerator.class);
final FilterRepository filterRepository = new FilterRepository();
when(filterIdGenerator.nextId()).thenReturn("0x1");
filterManager = new FilterManager(blockchainQueries, transactionPoolMock, filterIdGenerator);
filterManager =
new FilterManager(
blockchainQueries, transactionPoolMock, filterIdGenerator, filterRepository);
final Set<Capability> supportedCapabilities = new HashSet<>();
supportedCapabilities.add(EthProtocol.ETH62);

@ -5,6 +5,8 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -22,12 +24,14 @@ import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.List;
import java.util.Optional;
import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@ -38,23 +42,23 @@ public class FilterManagerLogFilterTest {
@Mock private Blockchain blockchain;
@Mock private BlockchainQueries blockchainQueries;
@Mock private TransactionPool transactionPool;
@Spy private final FilterRepository filterRepository = new FilterRepository();
@Before
public void setupTest() {
when(blockchainQueries.getBlockchain()).thenReturn(blockchain);
this.filterManager =
new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator());
new FilterManager(
blockchainQueries, transactionPool, new FilterIdGenerator(), filterRepository);
}
@Test
public void installUninstallNewLogFilter() {
assertThat(filterManager.logFilterCount()).isEqualTo(0);
final String filterId = filterManager.installLogFilter(latest(), latest(), logsQuery());
assertThat(filterManager.logFilterCount()).isEqualTo(1);
assertThat(filterRepository.exists(filterId)).isTrue();
assertThat(filterManager.uninstallFilter(filterId)).isTrue();
assertThat(filterManager.logFilterCount()).isEqualTo(0);
assertThat(filterRepository.exists(filterId)).isFalse();
assertThat(filterManager.blockChanges(filterId)).isNull();
}
@ -149,6 +153,26 @@ public class FilterManagerLogFilterTest {
assertThat(retrievedLogs).isEqualToComparingFieldByFieldRecursively(Lists.newArrayList(log));
}
@Test
public void getLogsChangesShouldResetFilterExpireDate() {
LogFilter filter = spy(new LogFilter("foo", latest(), latest(), logsQuery()));
doReturn(Optional.of(filter)).when(filterRepository).getFilter(eq("foo"), eq(LogFilter.class));
filterManager.logsChanges("foo");
verify(filter).resetExpireTime();
}
@Test
public void getLogsShouldResetFilterExpireDate() {
LogFilter filter = spy(new LogFilter("foo", latest(), latest(), logsQuery()));
doReturn(Optional.of(filter)).when(filterRepository).getFilter(eq("foo"), eq(LogFilter.class));
filterManager.logs("foo");
verify(filter).resetExpireTime();
}
private LogWithMetadata logWithMetadata() {
return LogWithMetadata.create(
0,

@ -1,6 +1,11 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
@ -13,12 +18,14 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import java.util.List;
import java.util.Optional;
import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@ -31,6 +38,7 @@ public class FilterManagerTest {
@Mock private Blockchain blockchain;
@Mock private BlockchainQueries blockchainQueries;
@Mock private TransactionPool transactionPool;
@Spy final FilterRepository filterRepository = new FilterRepository();
@Before
public void setupTest() {
@ -38,7 +46,8 @@ public class FilterManagerTest {
this.blockGenerator = new BlockDataGenerator();
this.currentBlock = blockGenerator.genesisBlock();
this.filterManager =
new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator());
new FilterManager(
blockchainQueries, transactionPool, new FilterIdGenerator(), filterRepository);
}
@Test
@ -48,13 +57,11 @@ public class FilterManagerTest {
@Test
public void installUninstallNewBlockFilter() {
assertThat(filterManager.blockFilterCount()).isEqualTo(0);
final String filterId = filterManager.installBlockFilter();
assertThat(filterManager.blockFilterCount()).isEqualTo(1);
assertThat(filterRepository.exists(filterId)).isTrue();
assertThat(filterManager.uninstallFilter(filterId)).isTrue();
assertThat(filterManager.blockFilterCount()).isEqualTo(0);
assertThat(filterRepository.exists(filterId)).isFalse();
assertThat(filterManager.blockChanges(filterId)).isNull();
}
@ -117,15 +124,11 @@ public class FilterManagerTest {
@Test
public void installUninstallPendingTransactionFilter() {
assertThat(filterManager.pendingTransactionFilterCount()).isEqualTo(0);
final String filterId = filterManager.installPendingTransactionFilter();
assertThat(filterManager.pendingTransactionFilterCount()).isEqualTo(1);
verify(filterRepository).save(any(Filter.class));
assertThat(filterManager.uninstallFilter(filterId)).isTrue();
assertThat(filterManager.pendingTransactionFilterCount()).isEqualTo(0);
assertThat(filterManager.pendingTransactionChanges(filterId)).isNull();
verify(filterRepository).delete(eq(filterId));
}
@Test
@ -184,6 +187,30 @@ public class FilterManagerTest {
assertThat(filterManager.pendingTransactionChanges(filterId2)).isEqualTo(expectedHashes2);
}
@Test
public void getBlockChangesShouldResetFilterExpireDate() {
BlockFilter filter = spy(new BlockFilter("foo"));
doReturn(Optional.of(filter))
.when(filterRepository)
.getFilter(eq("foo"), eq(BlockFilter.class));
filterManager.blockChanges("foo");
verify(filter).resetExpireTime();
}
@Test
public void getPendingTransactionsChangesShouldResetFilterExpireDate() {
PendingTransactionFilter filter = spy(new PendingTransactionFilter("foo"));
doReturn(Optional.of(filter))
.when(filterRepository)
.getFilter(eq("foo"), eq(PendingTransactionFilter.class));
filterManager.pendingTransactionChanges("foo");
verify(filter).resetExpireTime();
}
private Hash appendBlockToBlockchain() {
final long blockNumber = currentBlock.getHeader().getNumber() + 1;
final Hash parentHash = currentBlock.getHash();

@ -0,0 +1,191 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import java.util.Collection;
import java.util.Optional;
import org.assertj.core.util.Lists;
import org.junit.Before;
import org.junit.Test;
public class FilterRepositoryTest {
private FilterRepository repository;
@Before
public void before() {
repository = new FilterRepository();
}
@Test
public void getFiltersShouldReturnAllFilters() {
BlockFilter filter1 = new BlockFilter("foo");
BlockFilter filter2 = new BlockFilter("bar");
repository.save(filter1);
repository.save(filter2);
Collection<Filter> filters = repository.getFilters();
assertThat(filters).containsExactlyInAnyOrderElementsOf(Lists.newArrayList(filter1, filter2));
}
@Test
public void getFiltersShouldReturnEmptyListWhenRepositoryIsEmpty() {
assertThat(repository.getFilters()).isEmpty();
}
@Test
public void saveShouldAddFilterToRepository() {
BlockFilter filter = new BlockFilter("id");
repository.save(filter);
BlockFilter retrievedFilter = repository.getFilter("id", BlockFilter.class).get();
assertThat(retrievedFilter).isEqualToComparingFieldByField(filter);
}
@Test
public void saveNullFilterShouldFail() {
Throwable throwable = catchThrowable(() -> repository.save(null));
assertThat(throwable)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Can't save null filter");
}
@Test
public void saveFilterWithSameIdShouldFail() {
BlockFilter filter = new BlockFilter("x");
repository.save(filter);
Throwable throwable = catchThrowable(() -> repository.save(filter));
assertThat(throwable)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Filter with id x already exists");
}
@Test
public void getSingleFilterShouldReturnExistingFilterOfCorrectType() {
BlockFilter filter = new BlockFilter("id");
repository.save(filter);
Optional<BlockFilter> optional = repository.getFilter(filter.getId(), BlockFilter.class);
assertThat(optional.isPresent()).isTrue();
assertThat(optional.get()).isEqualToComparingFieldByField(filter);
}
@Test
public void getSingleFilterShouldReturnEmptyForFilterOfIncorrectType() {
BlockFilter filter = new BlockFilter("id");
repository.save(filter);
Optional<PendingTransactionFilter> optional =
repository.getFilter(filter.getId(), PendingTransactionFilter.class);
assertThat(optional.isPresent()).isFalse();
}
@Test
public void getSingleFilterShouldReturnEmptyForAbsentId() {
BlockFilter filter = new BlockFilter("foo");
repository.save(filter);
Optional<BlockFilter> optional = repository.getFilter("bar", BlockFilter.class);
assertThat(optional.isPresent()).isFalse();
}
@Test
public void getSingleFilterShouldReturnEmptyForEmptyRepository() {
Optional<BlockFilter> optional = repository.getFilter("id", BlockFilter.class);
assertThat(optional.isPresent()).isFalse();
}
@Test
public void getFilterCollectionShouldReturnAllFiltersOfSpecificType() {
BlockFilter blockFilter1 = new BlockFilter("foo");
BlockFilter blockFilter2 = new BlockFilter("biz");
PendingTransactionFilter pendingTxFilter1 = new PendingTransactionFilter("bar");
Collection<BlockFilter> expectedFilters = Lists.newArrayList(blockFilter1, blockFilter2);
repository.save(blockFilter1);
repository.save(blockFilter2);
repository.save(pendingTxFilter1);
Collection<BlockFilter> blockFilters = repository.getFiltersOfType(BlockFilter.class);
assertThat(blockFilters).containsExactlyInAnyOrderElementsOf(expectedFilters);
}
@Test
public void getFilterCollectionShouldReturnEmptyForNoneMatchingTypes() {
PendingTransactionFilter filter = new PendingTransactionFilter("foo");
repository.save(filter);
Collection<BlockFilter> filters = repository.getFiltersOfType(BlockFilter.class);
assertThat(filters).isEmpty();
}
@Test
public void getFilterCollectionShouldReturnEmptyListForEmptyRepository() {
Collection<BlockFilter> filters = repository.getFiltersOfType(BlockFilter.class);
assertThat(filters).isEmpty();
}
@Test
public void existsShouldReturnTrueForExistingId() {
BlockFilter filter = new BlockFilter("id");
repository.save(filter);
assertThat(repository.exists("id")).isTrue();
}
@Test
public void existsShouldReturnFalseForAbsentId() {
BlockFilter filter = new BlockFilter("foo");
repository.save(filter);
assertThat(repository.exists("bar")).isFalse();
}
@Test
public void existsShouldReturnFalseForEmptyRepository() {
assertThat(repository.exists("id")).isFalse();
}
@Test
public void deleteExistingFilterShouldDeleteSuccessfully() {
BlockFilter filter = new BlockFilter("foo");
repository.save(filter);
repository.delete(filter.getId());
assertThat(repository.exists(filter.getId())).isFalse();
}
@Test
public void deleteAbsentFilterDoesNothing() {
assertThat(repository.exists("foo")).isFalse();
repository.delete("foo");
}
@Test
public void deleteAllShouldClearFilters() {
BlockFilter filter1 = new BlockFilter("foo");
BlockFilter filter2 = new BlockFilter("biz");
repository.save(filter1);
repository.save(filter2);
repository.deleteAll();
assertThat(repository.exists(filter1.getId())).isFalse();
assertThat(repository.exists(filter2.getId())).isFalse();
}
}

@ -0,0 +1,36 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.time.Instant;
import org.junit.Test;
public class FilterTest {
@Test
public void filterJustCreatedShouldNotBeExpired() {
BlockFilter filter = new BlockFilter("foo");
assertThat(filter.isExpired()).isFalse();
}
@Test
public void isExpiredShouldReturnTrueForExpiredFilter() {
BlockFilter filter = new BlockFilter("foo");
filter.setExpireTime(Instant.now().minusSeconds(1));
assertThat(filter.isExpired()).isTrue();
}
@Test
public void resetExpireDateShouldIncrementExpireDate() {
BlockFilter filter = new BlockFilter("foo");
filter.setExpireTime(Instant.now().minus(Duration.ofDays(1)));
filter.resetExpireTime();
assertThat(filter.getExpireTime())
.isBeforeOrEqualTo(Instant.now().plus(Duration.ofMinutes(10)));
}
}

@ -0,0 +1,64 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.util.Collections;
import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class FilterTimeoutMonitorTest {
@Mock private FilterRepository filterRepository;
private FilterTimeoutMonitor timeoutMonitor;
@Before
public void before() {
timeoutMonitor = new FilterTimeoutMonitor(filterRepository);
}
@Test
public void expiredFilterShouldBeDeleted() {
Filter filter = spy(new BlockFilter("foo"));
when(filter.isExpired()).thenReturn(true);
when(filterRepository.getFilters()).thenReturn(Lists.newArrayList(filter));
timeoutMonitor.checkFilters();
verify(filterRepository).getFilters();
verify(filterRepository).delete("foo");
verifyNoMoreInteractions(filterRepository);
}
@Test
public void nonExpiredFilterShouldNotBeDeleted() {
Filter filter = mock(Filter.class);
when(filter.isExpired()).thenReturn(false);
when(filterRepository.getFilters()).thenReturn(Lists.newArrayList(filter));
timeoutMonitor.checkFilters();
verify(filter).isExpired();
verifyNoMoreInteractions(filter);
}
@Test
public void checkEmptyFilterRepositoryDoesNothing() {
when(filterRepository.getFilters()).thenReturn(Collections.emptyList());
timeoutMonitor.checkFilters();
verify(filterRepository).getFilters();
verifyNoMoreInteractions(filterRepository);
}
}

@ -16,6 +16,9 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration;
import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.RpcApis;
import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcHttpService;
import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcMethodsFactory;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketConfiguration;
@ -127,6 +130,8 @@ public class RunnerBuilder {
final AbstractMiningCoordinator<?, ?> miningCoordinator =
pantheonController.getMiningCoordinator();
final FilterManager filterManager = createFilterManager(vertx, context, transactionPool);
Optional<JsonRpcHttpService> jsonRpcHttpService = Optional.empty();
if (jsonRpcConfiguration.isEnabled()) {
final Map<String, JsonRpcMethod> jsonRpcMethods =
@ -139,7 +144,8 @@ public class RunnerBuilder {
transactionPool,
miningCoordinator,
supportedCapabilities,
jsonRpcConfiguration.getRpcApis());
jsonRpcConfiguration.getRpcApis(),
filterManager);
jsonRpcHttpService =
Optional.of(new JsonRpcHttpService(vertx, jsonRpcConfiguration, jsonRpcMethods));
}
@ -156,7 +162,8 @@ public class RunnerBuilder {
transactionPool,
miningCoordinator,
supportedCapabilities,
webSocketConfiguration.getRpcApis());
webSocketConfiguration.getRpcApis(),
filterManager);
final SubscriptionManager subscriptionManager =
createSubscriptionManager(vertx, context.getBlockchain(), transactionPool);
@ -179,6 +186,18 @@ public class RunnerBuilder {
vertx, networkRunner, jsonRpcHttpService, webSocketService, pantheonController, dataDir);
}
private FilterManager createFilterManager(
final Vertx vertx, final ProtocolContext<?> context, final TransactionPool transactionPool) {
FilterManager filterManager =
new FilterManager(
new BlockchainQueries(context.getBlockchain(), context.getWorldStateArchive()),
transactionPool,
new FilterIdGenerator(),
new FilterRepository());
vertx.deployVerticle(filterManager);
return filterManager;
}
private Map<String, JsonRpcMethod> jsonRpcMethods(
final ProtocolContext<?> context,
final ProtocolSchedule<?> protocolSchedule,
@ -188,7 +207,8 @@ public class RunnerBuilder {
final TransactionPool transactionPool,
final AbstractMiningCoordinator<?, ?> miningCoordinator,
final Set<Capability> supportedCapabilities,
final Collection<RpcApis> jsonRpcApis) {
final Collection<RpcApis> jsonRpcApis,
final FilterManager filterManager) {
final Map<String, JsonRpcMethod> methods =
new JsonRpcMethodsFactory()
.methods(
@ -202,7 +222,8 @@ public class RunnerBuilder {
protocolSchedule,
miningCoordinator,
supportedCapabilities,
jsonRpcApis);
jsonRpcApis,
filterManager);
if (context.getConsensusState() instanceof CliqueContext) {
// This is checked before entering this if branch

Loading…
Cancel
Save