From 03f92e3eb0a31a15a1caa22d84a5a02ba99c5202 Mon Sep 17 00:00:00 2001 From: Lucas Saldanha Date: Wed, 17 Oct 2018 11:51:45 +1300 Subject: [PATCH] NC-1721: Filter timeout if not queried for 10 minutes (#66) --- .../jsonrpc/JsonRpcTestMethodsFactory.java | 4 +- .../EthGetFilterChangesIntegrationTest.java | 5 +- .../jsonrpc/JsonRpcMethodsFactory.java | 6 +- .../jsonrpc/internal/filter/BlockFilter.java | 28 +++ .../jsonrpc/internal/filter/Filter.java | 41 ++++ .../internal/filter/FilterManager.java | 172 +++++----------- .../internal/filter/FilterRepository.java | 72 +++++++ .../internal/filter/FilterTimeoutMonitor.java | 21 ++ .../jsonrpc/internal/filter/LogFilter.java | 51 +++++ .../filter/PendingTransactionFilter.java | 28 +++ .../subscription/SubscriptionManager.java | 3 - .../AbstractEthJsonRpcHttpServiceTest.java | 6 +- .../filter/FilterManagerLogFilterTest.java | 34 +++- .../internal/filter/FilterManagerTest.java | 49 ++++- .../internal/filter/FilterRepositoryTest.java | 191 ++++++++++++++++++ .../jsonrpc/internal/filter/FilterTest.java | 36 ++++ .../filter/FilterTimeoutMonitorTest.java | 64 ++++++ .../tech/pegasys/pantheon/RunnerBuilder.java | 29 ++- 18 files changed, 690 insertions(+), 150 deletions(-) create mode 100644 ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/BlockFilter.java create mode 100644 ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/Filter.java create mode 100644 ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterRepository.java create mode 100644 ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTimeoutMonitor.java create mode 100644 ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/LogFilter.java create mode 100644 ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/PendingTransactionFilter.java create mode 100644 ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterRepositoryTest.java create mode 100644 ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTest.java create mode 100644 ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTimeoutMonitorTest.java diff --git a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/JsonRpcTestMethodsFactory.java b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/JsonRpcTestMethodsFactory.java index 9295976dc7..b21d29bf84 100644 --- a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/JsonRpcTestMethodsFactory.java +++ b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/JsonRpcTestMethodsFactory.java @@ -14,6 +14,7 @@ import tech.pegasys.pantheon.ethereum.db.DefaultMutableBlockchain; import tech.pegasys.pantheon.ethereum.db.WorldStateArchive; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager; +import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries; import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; @@ -66,7 +67,8 @@ public class JsonRpcTestMethodsFactory { final P2PNetwork peerDiscovery = mock(P2PNetwork.class); final TransactionPool transactionPool = mock(TransactionPool.class); final FilterManager filterManager = - new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator()); + new FilterManager( + blockchainQueries, transactionPool, new FilterIdGenerator(), new FilterRepository()); final EthHashMiningCoordinator miningCoordinator = mock(EthHashMiningCoordinator.class); return new JsonRpcMethodsFactory() diff --git a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java index 357b8b8603..b8592b6cf2 100644 --- a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java +++ b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java @@ -25,6 +25,7 @@ import tech.pegasys.pantheon.ethereum.db.WorldStateArchive; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.JsonRpcRequest; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager; +import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.EthGetFilterChanges; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.parameters.JsonRpcParameter; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries; @@ -81,7 +82,9 @@ public class EthGetFilterChangesIntegrationTest { transactions, genesisConfig.getProtocolSchedule(), protocolContext, batchAddedListener); final BlockchainQueries blockchainQueries = new BlockchainQueries(blockchain, worldStateArchive); - filterManager = new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator()); + filterManager = + new FilterManager( + blockchainQueries, transactionPool, new FilterIdGenerator(), new FilterRepository()); method = new EthGetFilterChanges(filterManager, parameters); } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/JsonRpcMethodsFactory.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/JsonRpcMethodsFactory.java index 3267101efa..cebffcaf61 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/JsonRpcMethodsFactory.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/JsonRpcMethodsFactory.java @@ -6,7 +6,6 @@ import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.core.TransactionPool; import tech.pegasys.pantheon.ethereum.db.WorldStateArchive; import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.RpcApis; -import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.AdminPeers; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.DebugStorageRangeAt; @@ -85,11 +84,10 @@ public class JsonRpcMethodsFactory { final ProtocolSchedule protocolSchedule, final AbstractMiningCoordinator miningCoordinator, final Set supportedCapabilities, - final Collection rpcApis) { + final Collection rpcApis, + final FilterManager filterManager) { final BlockchainQueries blockchainQueries = new BlockchainQueries(blockchain, worldStateArchive); - final FilterManager filterManager = - new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator()); return methods( clientVersion, chainId, diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/BlockFilter.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/BlockFilter.java new file mode 100644 index 0000000000..6d174565ac --- /dev/null +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/BlockFilter.java @@ -0,0 +1,28 @@ +package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter; + +import tech.pegasys.pantheon.ethereum.core.Hash; + +import java.util.ArrayList; +import java.util.List; + +/** Tracks new blocks being added to the blockchain. */ +class BlockFilter extends Filter { + + private final List blockHashes = new ArrayList<>(); + + BlockFilter(final String id) { + super(id); + } + + void addBlockHash(final Hash hash) { + blockHashes.add(hash); + } + + List blockHashes() { + return blockHashes; + } + + void clearBlockHashes() { + blockHashes.clear(); + } +} diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/Filter.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/Filter.java new file mode 100644 index 0000000000..2e62e65b8a --- /dev/null +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/Filter.java @@ -0,0 +1,41 @@ +package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter; + +import java.time.Duration; +import java.time.Instant; + +import com.google.common.annotations.VisibleForTesting; + +abstract class Filter { + + private static final Duration DEFAULT_EXPIRE_DURATION = Duration.ofMinutes(10); + + private final String id; + private Instant expireTime; + + Filter(final String id) { + this.id = id; + resetExpireTime(); + } + + String getId() { + return id; + } + + void resetExpireTime() { + this.expireTime = Instant.now().plus(DEFAULT_EXPIRE_DURATION); + } + + boolean isExpired() { + return Instant.now().isAfter(expireTime); + } + + @VisibleForTesting + void setExpireTime(final Instant expireTime) { + this.expireTime = expireTime; + } + + @VisibleForTesting + Instant getExpireTime() { + return expireTime; + } +} diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManager.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManager.java index c3b627ea2d..2dbbc2db84 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManager.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManager.java @@ -12,118 +12,52 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.LogWithMetadata; import java.util.ArrayList; +import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import com.google.common.annotations.VisibleForTesting; +import io.vertx.core.AbstractVerticle; /** Manages JSON-RPC filter events. */ -public class FilterManager { +public class FilterManager extends AbstractVerticle { - private final Map blockFilters = new ConcurrentHashMap<>(); - - private final Map pendingTransactionFilters = - new ConcurrentHashMap<>(); - - private final Map logFilters = new ConcurrentHashMap<>(); + private static final int FILTER_TIMEOUT_CHECK_TIMER = 10000; private final FilterIdGenerator filterIdGenerator; - - /** Tracks new blocks being added to the blockchain. */ - private static class BlockFilter { - - private final List blockHashes = new ArrayList<>(); - - BlockFilter() {} - - void addBlockHash(final Hash hash) { - blockHashes.add(hash); - } - - List blockHashes() { - return blockHashes; - } - - void clearBlockHashes() { - blockHashes.clear(); - } - } - - /** Tracks new pending transactions that have arrived in the transaction pool */ - private static class PendingTransactionFilter { - - private final List transactionHashes = new ArrayList<>(); - - PendingTransactionFilter() {} - - void addTransactionHash(final Hash hash) { - transactionHashes.add(hash); - } - - List transactionHashes() { - return transactionHashes; - } - - void clearTransactionHashes() { - transactionHashes.clear(); - } - } - - /** Tracks new log events. */ - private static class LogFilter { - - private final BlockParameter fromBlock; - private final BlockParameter toBlock; - private final LogsQuery logsQuery; - - private final List logs = new ArrayList<>(); - - LogFilter( - final BlockParameter fromBlock, final BlockParameter toBlock, final LogsQuery logsQuery) { - this.fromBlock = fromBlock; - this.toBlock = toBlock; - this.logsQuery = logsQuery; - } - - public BlockParameter getFromBlock() { - return fromBlock; - } - - public BlockParameter getToBlock() { - return toBlock; - } - - public LogsQuery getLogsQuery() { - return logsQuery; - } - - void addLog(final List logs) { - this.logs.addAll(logs); - } - - List logs() { - return logs; - } - - void clearLogs() { - logs.clear(); - } - } - + private final FilterRepository filterRepository; private final BlockchainQueries blockchainQueries; public FilterManager( final BlockchainQueries blockchainQueries, final TransactionPool transactionPool, - final FilterIdGenerator filterIdGenerator) { + final FilterIdGenerator filterIdGenerator, + final FilterRepository filterRepository) { this.filterIdGenerator = filterIdGenerator; + this.filterRepository = filterRepository; checkNotNull(blockchainQueries.getBlockchain()); blockchainQueries.getBlockchain().observeBlockAdded(this::recordBlockEvent); transactionPool.addTransactionListener(this::recordPendingTransactionEvent); this.blockchainQueries = blockchainQueries; } + @Override + public void start() { + startFilterTimeoutTimer(); + } + + @Override + public void stop() { + filterRepository.deleteAll(); + } + + private void startFilterTimeoutTimer() { + vertx.setPeriodic( + FILTER_TIMEOUT_CHECK_TIMER, + timerId -> + vertx.executeBlocking( + future -> new FilterTimeoutMonitor(filterRepository).checkFilters(), result -> {})); + } + /** * Installs a new block filter * @@ -131,7 +65,7 @@ public class FilterManager { */ public String installBlockFilter() { final String filterId = filterIdGenerator.nextId(); - blockFilters.put(filterId, new BlockFilter()); + filterRepository.save(new BlockFilter(filterId)); return filterId; } @@ -142,7 +76,7 @@ public class FilterManager { */ public String installPendingTransactionFilter() { final String filterId = filterIdGenerator.nextId(); - pendingTransactionFilters.put(filterId, new PendingTransactionFilter()); + filterRepository.save(new PendingTransactionFilter(filterId)); return filterId; } @@ -157,7 +91,7 @@ public class FilterManager { public String installLogFilter( final BlockParameter fromBlock, final BlockParameter toBlock, final LogsQuery logsQuery) { final String filterId = filterIdGenerator.nextId(); - logFilters.put(filterId, new LogFilter(fromBlock, toBlock, logsQuery)); + filterRepository.save(new LogFilter(filterId, fromBlock, toBlock, logsQuery)); return filterId; } @@ -168,15 +102,19 @@ public class FilterManager { * @return {@code true} if the filter was successfully removed; otherwise {@code false} */ public boolean uninstallFilter(final String filterId) { - return blockFilters.remove(filterId) != null - || pendingTransactionFilters.remove(filterId) != null - || logFilters.remove(filterId) != null; + if (filterRepository.exists(filterId)) { + filterRepository.delete(filterId); + return true; + } else { + return false; + } } public void recordBlockEvent(final BlockAddedEvent event, final Blockchain blockchain) { final Hash blockHash = event.getBlock().getHash(); + Collection blockFilters = filterRepository.getFiltersOfType(BlockFilter.class); blockFilters.forEach( - (filterId, filter) -> { + (filter) -> { synchronized (filter) { filter.addBlockHash(blockHash); } @@ -186,8 +124,9 @@ public class FilterManager { } private void checkBlockchainForMatchingLogsForFilters() { + Collection logFilters = filterRepository.getFiltersOfType(LogFilter.class); logFilters.forEach( - (filterId, filter) -> { + (filter) -> { final long headBlockNumber = blockchainQueries.headBlockNumber(); final long toBlockNumber = filter.getToBlock().getNumber().orElse(blockchainQueries.headBlockNumber()); @@ -199,12 +138,14 @@ public class FilterManager { @VisibleForTesting void recordPendingTransactionEvent(final Transaction transaction) { + Collection pendingTransactionFilters = + filterRepository.getFiltersOfType(PendingTransactionFilter.class); if (pendingTransactionFilters.isEmpty()) { return; } pendingTransactionFilters.forEach( - (filterId, filter) -> { + (filter) -> { synchronized (filter) { filter.addTransactionHash(transaction.hash()); } @@ -218,7 +159,7 @@ public class FilterManager { * @return the new block hashes that have occurred since the filter was last checked */ public List blockChanges(final String filterId) { - final BlockFilter filter = blockFilters.get(filterId); + final BlockFilter filter = filterRepository.getFilter(filterId, BlockFilter.class).orElse(null); if (filter == null) { return null; } @@ -227,6 +168,7 @@ public class FilterManager { synchronized (filter) { hashes = new ArrayList<>(filter.blockHashes()); filter.clearBlockHashes(); + filter.resetExpireTime(); } return hashes; } @@ -238,7 +180,8 @@ public class FilterManager { * @return the new pending transaction hashes that have occurred since the filter was last checked */ public List pendingTransactionChanges(final String filterId) { - final PendingTransactionFilter filter = pendingTransactionFilters.get(filterId); + final PendingTransactionFilter filter = + filterRepository.getFilter(filterId, PendingTransactionFilter.class).orElse(null); if (filter == null) { return null; } @@ -247,12 +190,13 @@ public class FilterManager { synchronized (filter) { hashes = new ArrayList<>(filter.transactionHashes()); filter.clearTransactionHashes(); + filter.resetExpireTime(); } return hashes; } public List logsChanges(final String filterId) { - final LogFilter filter = logFilters.get(filterId); + final LogFilter filter = filterRepository.getFilter(filterId, LogFilter.class).orElse(null); if (filter == null) { return null; } @@ -261,14 +205,17 @@ public class FilterManager { synchronized (filter) { logs = new ArrayList<>(filter.logs()); filter.clearLogs(); + filter.resetExpireTime(); } return logs; } public List logs(final String filterId) { - final LogFilter filter = logFilters.get(filterId); + final LogFilter filter = filterRepository.getFilter(filterId, LogFilter.class).orElse(null); if (filter == null) { return null; + } else { + filter.resetExpireTime(); } final long fromBlockNumber = @@ -278,19 +225,4 @@ public class FilterManager { return blockchainQueries.matchingLogs(fromBlockNumber, toBlockNumber, filter.getLogsQuery()); } - - @VisibleForTesting - int blockFilterCount() { - return blockFilters.size(); - } - - @VisibleForTesting - int pendingTransactionFilterCount() { - return pendingTransactionFilters.size(); - } - - @VisibleForTesting - int logFilterCount() { - return logFilters.size(); - } } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterRepository.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterRepository.java new file mode 100644 index 0000000000..3497408817 --- /dev/null +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterRepository.java @@ -0,0 +1,72 @@ +package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class FilterRepository { + + private final Map filters = new ConcurrentHashMap<>(); + + public FilterRepository() {} + + Collection getFilters() { + return new ArrayList<>(filters.values()); + } + + Collection getFiltersOfType(final Class filterClass) { + return filters + .values() + .stream() + .flatMap(f -> getIfTypeMatches(f, filterClass).map(Stream::of).orElseGet(Stream::empty)) + .collect(Collectors.toList()); + } + + Optional getFilter(final String filterId, final Class filterClass) { + final Filter filter = filters.get(filterId); + return getIfTypeMatches(filter, filterClass); + } + + @SuppressWarnings("unchecked") + private Optional getIfTypeMatches( + final Filter filter, final Class filterClass) { + if (filter == null) { + return Optional.empty(); + } + + if (!filterClass.isAssignableFrom(filter.getClass())) { + return Optional.empty(); + } + + return Optional.of((T) filter); + } + + boolean exists(final String id) { + return filters.containsKey(id); + } + + void save(final Filter filter) { + if (filter == null) { + throw new IllegalArgumentException("Can't save null filter"); + } + + if (exists(filter.getId())) { + throw new IllegalArgumentException( + String.format("Filter with id %s already exists", filter.getId())); + } + + filters.put(filter.getId(), filter); + } + + void delete(final String id) { + filters.remove(id); + } + + void deleteAll() { + filters.clear(); + } +} diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTimeoutMonitor.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTimeoutMonitor.java new file mode 100644 index 0000000000..310ac52e41 --- /dev/null +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTimeoutMonitor.java @@ -0,0 +1,21 @@ +package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter; + +class FilterTimeoutMonitor { + + private final FilterRepository filterRepository; + + FilterTimeoutMonitor(final FilterRepository filterRepository) { + this.filterRepository = filterRepository; + } + + void checkFilters() { + filterRepository + .getFilters() + .forEach( + filter -> { + if (filter.isExpired()) { + filterRepository.delete(filter.getId()); + } + }); + } +} diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/LogFilter.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/LogFilter.java new file mode 100644 index 0000000000..81f4f11d83 --- /dev/null +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/LogFilter.java @@ -0,0 +1,51 @@ +package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter; + +import tech.pegasys.pantheon.ethereum.jsonrpc.internal.parameters.BlockParameter; +import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.LogWithMetadata; + +import java.util.ArrayList; +import java.util.List; + +class LogFilter extends Filter { + + private final BlockParameter fromBlock; + private final BlockParameter toBlock; + private final LogsQuery logsQuery; + + private final List logs = new ArrayList<>(); + + LogFilter( + final String id, + final BlockParameter fromBlock, + final BlockParameter toBlock, + final LogsQuery logsQuery) { + super(id); + this.fromBlock = fromBlock; + this.toBlock = toBlock; + this.logsQuery = logsQuery; + } + + public BlockParameter getFromBlock() { + return fromBlock; + } + + public BlockParameter getToBlock() { + return toBlock; + } + + public LogsQuery getLogsQuery() { + return logsQuery; + } + + void addLog(final List logs) { + this.logs.addAll(logs); + } + + List logs() { + return logs; + } + + void clearLogs() { + logs.clear(); + } +} diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/PendingTransactionFilter.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/PendingTransactionFilter.java new file mode 100644 index 0000000000..a01c6e35e9 --- /dev/null +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/PendingTransactionFilter.java @@ -0,0 +1,28 @@ +package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter; + +import tech.pegasys.pantheon.ethereum.core.Hash; + +import java.util.ArrayList; +import java.util.List; + +/** Tracks new pending transactions that have arrived in the transaction pool */ +class PendingTransactionFilter extends Filter { + + private final List transactionHashes = new ArrayList<>(); + + PendingTransactionFilter(final String id) { + super(id); + } + + void addTransactionHash(final Hash hash) { + transactionHashes.add(hash); + } + + List transactionHashes() { + return transactionHashes; + } + + void clearTransactionHashes() { + transactionHashes.clear(); + } +} diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java index 12d4fa277a..6f49f931c3 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java @@ -25,9 +25,6 @@ import org.apache.logging.log4j.Logger; /** * The SubscriptionManager is responsible for managing subscriptions and sending messages to the * clients that have an active subscription subscription. - * - *

TODO: The logic to send a notification to a client that has an active subscription TODO: - * handle connection close (remove subscriptions) */ public class SubscriptionManager extends AbstractVerticle { diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/AbstractEthJsonRpcHttpServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/AbstractEthJsonRpcHttpServiceTest.java index 22a83b27e5..7dfbd8a81e 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/AbstractEthJsonRpcHttpServiceTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/AbstractEthJsonRpcHttpServiceTest.java @@ -24,6 +24,7 @@ import tech.pegasys.pantheon.ethereum.eth.EthProtocol; import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.RpcApis; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager; +import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries; import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; @@ -146,8 +147,11 @@ public abstract class AbstractEthJsonRpcHttpServiceTest { final BlockchainQueries blockchainQueries = new BlockchainQueries(blockchain, stateArchive); final FilterIdGenerator filterIdGenerator = mock(FilterIdGenerator.class); + final FilterRepository filterRepository = new FilterRepository(); when(filterIdGenerator.nextId()).thenReturn("0x1"); - filterManager = new FilterManager(blockchainQueries, transactionPoolMock, filterIdGenerator); + filterManager = + new FilterManager( + blockchainQueries, transactionPoolMock, filterIdGenerator, filterRepository); final Set supportedCapabilities = new HashSet<>(); supportedCapabilities.add(EthProtocol.ETH62); diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManagerLogFilterTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManagerLogFilterTest.java index 4c66a472c4..45a64aafac 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManagerLogFilterTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManagerLogFilterTest.java @@ -5,6 +5,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.refEq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -22,12 +24,14 @@ import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.List; +import java.util.Optional; import com.google.common.collect.Lists; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -38,23 +42,23 @@ public class FilterManagerLogFilterTest { @Mock private Blockchain blockchain; @Mock private BlockchainQueries blockchainQueries; @Mock private TransactionPool transactionPool; + @Spy private final FilterRepository filterRepository = new FilterRepository(); @Before public void setupTest() { when(blockchainQueries.getBlockchain()).thenReturn(blockchain); this.filterManager = - new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator()); + new FilterManager( + blockchainQueries, transactionPool, new FilterIdGenerator(), filterRepository); } @Test public void installUninstallNewLogFilter() { - assertThat(filterManager.logFilterCount()).isEqualTo(0); - final String filterId = filterManager.installLogFilter(latest(), latest(), logsQuery()); - assertThat(filterManager.logFilterCount()).isEqualTo(1); + assertThat(filterRepository.exists(filterId)).isTrue(); assertThat(filterManager.uninstallFilter(filterId)).isTrue(); - assertThat(filterManager.logFilterCount()).isEqualTo(0); + assertThat(filterRepository.exists(filterId)).isFalse(); assertThat(filterManager.blockChanges(filterId)).isNull(); } @@ -149,6 +153,26 @@ public class FilterManagerLogFilterTest { assertThat(retrievedLogs).isEqualToComparingFieldByFieldRecursively(Lists.newArrayList(log)); } + @Test + public void getLogsChangesShouldResetFilterExpireDate() { + LogFilter filter = spy(new LogFilter("foo", latest(), latest(), logsQuery())); + doReturn(Optional.of(filter)).when(filterRepository).getFilter(eq("foo"), eq(LogFilter.class)); + + filterManager.logsChanges("foo"); + + verify(filter).resetExpireTime(); + } + + @Test + public void getLogsShouldResetFilterExpireDate() { + LogFilter filter = spy(new LogFilter("foo", latest(), latest(), logsQuery())); + doReturn(Optional.of(filter)).when(filterRepository).getFilter(eq("foo"), eq(LogFilter.class)); + + filterManager.logs("foo"); + + verify(filter).resetExpireTime(); + } + private LogWithMetadata logWithMetadata() { return LogWithMetadata.create( 0, diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManagerTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManagerTest.java index 0a374788ef..7c30f70e7e 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManagerTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterManagerTest.java @@ -1,6 +1,11 @@ package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent; @@ -13,12 +18,14 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; import java.util.List; +import java.util.Optional; import com.google.common.collect.Lists; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -31,6 +38,7 @@ public class FilterManagerTest { @Mock private Blockchain blockchain; @Mock private BlockchainQueries blockchainQueries; @Mock private TransactionPool transactionPool; + @Spy final FilterRepository filterRepository = new FilterRepository(); @Before public void setupTest() { @@ -38,7 +46,8 @@ public class FilterManagerTest { this.blockGenerator = new BlockDataGenerator(); this.currentBlock = blockGenerator.genesisBlock(); this.filterManager = - new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator()); + new FilterManager( + blockchainQueries, transactionPool, new FilterIdGenerator(), filterRepository); } @Test @@ -48,13 +57,11 @@ public class FilterManagerTest { @Test public void installUninstallNewBlockFilter() { - assertThat(filterManager.blockFilterCount()).isEqualTo(0); - final String filterId = filterManager.installBlockFilter(); - assertThat(filterManager.blockFilterCount()).isEqualTo(1); + assertThat(filterRepository.exists(filterId)).isTrue(); assertThat(filterManager.uninstallFilter(filterId)).isTrue(); - assertThat(filterManager.blockFilterCount()).isEqualTo(0); + assertThat(filterRepository.exists(filterId)).isFalse(); assertThat(filterManager.blockChanges(filterId)).isNull(); } @@ -117,15 +124,11 @@ public class FilterManagerTest { @Test public void installUninstallPendingTransactionFilter() { - assertThat(filterManager.pendingTransactionFilterCount()).isEqualTo(0); - final String filterId = filterManager.installPendingTransactionFilter(); - assertThat(filterManager.pendingTransactionFilterCount()).isEqualTo(1); + verify(filterRepository).save(any(Filter.class)); assertThat(filterManager.uninstallFilter(filterId)).isTrue(); - assertThat(filterManager.pendingTransactionFilterCount()).isEqualTo(0); - - assertThat(filterManager.pendingTransactionChanges(filterId)).isNull(); + verify(filterRepository).delete(eq(filterId)); } @Test @@ -184,6 +187,30 @@ public class FilterManagerTest { assertThat(filterManager.pendingTransactionChanges(filterId2)).isEqualTo(expectedHashes2); } + @Test + public void getBlockChangesShouldResetFilterExpireDate() { + BlockFilter filter = spy(new BlockFilter("foo")); + doReturn(Optional.of(filter)) + .when(filterRepository) + .getFilter(eq("foo"), eq(BlockFilter.class)); + + filterManager.blockChanges("foo"); + + verify(filter).resetExpireTime(); + } + + @Test + public void getPendingTransactionsChangesShouldResetFilterExpireDate() { + PendingTransactionFilter filter = spy(new PendingTransactionFilter("foo")); + doReturn(Optional.of(filter)) + .when(filterRepository) + .getFilter(eq("foo"), eq(PendingTransactionFilter.class)); + + filterManager.pendingTransactionChanges("foo"); + + verify(filter).resetExpireTime(); + } + private Hash appendBlockToBlockchain() { final long blockNumber = currentBlock.getHeader().getNumber() + 1; final Hash parentHash = currentBlock.getHash(); diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterRepositoryTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterRepositoryTest.java new file mode 100644 index 0000000000..ba6c92711f --- /dev/null +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterRepositoryTest.java @@ -0,0 +1,191 @@ +package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; + +import java.util.Collection; +import java.util.Optional; + +import org.assertj.core.util.Lists; +import org.junit.Before; +import org.junit.Test; + +public class FilterRepositoryTest { + + private FilterRepository repository; + + @Before + public void before() { + repository = new FilterRepository(); + } + + @Test + public void getFiltersShouldReturnAllFilters() { + BlockFilter filter1 = new BlockFilter("foo"); + BlockFilter filter2 = new BlockFilter("bar"); + repository.save(filter1); + repository.save(filter2); + + Collection filters = repository.getFilters(); + + assertThat(filters).containsExactlyInAnyOrderElementsOf(Lists.newArrayList(filter1, filter2)); + } + + @Test + public void getFiltersShouldReturnEmptyListWhenRepositoryIsEmpty() { + assertThat(repository.getFilters()).isEmpty(); + } + + @Test + public void saveShouldAddFilterToRepository() { + BlockFilter filter = new BlockFilter("id"); + repository.save(filter); + + BlockFilter retrievedFilter = repository.getFilter("id", BlockFilter.class).get(); + + assertThat(retrievedFilter).isEqualToComparingFieldByField(filter); + } + + @Test + public void saveNullFilterShouldFail() { + Throwable throwable = catchThrowable(() -> repository.save(null)); + + assertThat(throwable) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Can't save null filter"); + } + + @Test + public void saveFilterWithSameIdShouldFail() { + BlockFilter filter = new BlockFilter("x"); + repository.save(filter); + + Throwable throwable = catchThrowable(() -> repository.save(filter)); + + assertThat(throwable) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Filter with id x already exists"); + } + + @Test + public void getSingleFilterShouldReturnExistingFilterOfCorrectType() { + BlockFilter filter = new BlockFilter("id"); + repository.save(filter); + + Optional optional = repository.getFilter(filter.getId(), BlockFilter.class); + + assertThat(optional.isPresent()).isTrue(); + assertThat(optional.get()).isEqualToComparingFieldByField(filter); + } + + @Test + public void getSingleFilterShouldReturnEmptyForFilterOfIncorrectType() { + BlockFilter filter = new BlockFilter("id"); + repository.save(filter); + + Optional optional = + repository.getFilter(filter.getId(), PendingTransactionFilter.class); + + assertThat(optional.isPresent()).isFalse(); + } + + @Test + public void getSingleFilterShouldReturnEmptyForAbsentId() { + BlockFilter filter = new BlockFilter("foo"); + repository.save(filter); + + Optional optional = repository.getFilter("bar", BlockFilter.class); + + assertThat(optional.isPresent()).isFalse(); + } + + @Test + public void getSingleFilterShouldReturnEmptyForEmptyRepository() { + Optional optional = repository.getFilter("id", BlockFilter.class); + + assertThat(optional.isPresent()).isFalse(); + } + + @Test + public void getFilterCollectionShouldReturnAllFiltersOfSpecificType() { + BlockFilter blockFilter1 = new BlockFilter("foo"); + BlockFilter blockFilter2 = new BlockFilter("biz"); + PendingTransactionFilter pendingTxFilter1 = new PendingTransactionFilter("bar"); + + Collection expectedFilters = Lists.newArrayList(blockFilter1, blockFilter2); + + repository.save(blockFilter1); + repository.save(blockFilter2); + repository.save(pendingTxFilter1); + + Collection blockFilters = repository.getFiltersOfType(BlockFilter.class); + + assertThat(blockFilters).containsExactlyInAnyOrderElementsOf(expectedFilters); + } + + @Test + public void getFilterCollectionShouldReturnEmptyForNoneMatchingTypes() { + PendingTransactionFilter filter = new PendingTransactionFilter("foo"); + repository.save(filter); + + Collection filters = repository.getFiltersOfType(BlockFilter.class); + + assertThat(filters).isEmpty(); + } + + @Test + public void getFilterCollectionShouldReturnEmptyListForEmptyRepository() { + Collection filters = repository.getFiltersOfType(BlockFilter.class); + + assertThat(filters).isEmpty(); + } + + @Test + public void existsShouldReturnTrueForExistingId() { + BlockFilter filter = new BlockFilter("id"); + repository.save(filter); + + assertThat(repository.exists("id")).isTrue(); + } + + @Test + public void existsShouldReturnFalseForAbsentId() { + BlockFilter filter = new BlockFilter("foo"); + repository.save(filter); + + assertThat(repository.exists("bar")).isFalse(); + } + + @Test + public void existsShouldReturnFalseForEmptyRepository() { + assertThat(repository.exists("id")).isFalse(); + } + + @Test + public void deleteExistingFilterShouldDeleteSuccessfully() { + BlockFilter filter = new BlockFilter("foo"); + repository.save(filter); + repository.delete(filter.getId()); + + assertThat(repository.exists(filter.getId())).isFalse(); + } + + @Test + public void deleteAbsentFilterDoesNothing() { + assertThat(repository.exists("foo")).isFalse(); + repository.delete("foo"); + } + + @Test + public void deleteAllShouldClearFilters() { + BlockFilter filter1 = new BlockFilter("foo"); + BlockFilter filter2 = new BlockFilter("biz"); + repository.save(filter1); + repository.save(filter2); + + repository.deleteAll(); + + assertThat(repository.exists(filter1.getId())).isFalse(); + assertThat(repository.exists(filter2.getId())).isFalse(); + } +} diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTest.java new file mode 100644 index 0000000000..9bce735669 --- /dev/null +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTest.java @@ -0,0 +1,36 @@ +package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.time.Instant; + +import org.junit.Test; + +public class FilterTest { + + @Test + public void filterJustCreatedShouldNotBeExpired() { + BlockFilter filter = new BlockFilter("foo"); + + assertThat(filter.isExpired()).isFalse(); + } + + @Test + public void isExpiredShouldReturnTrueForExpiredFilter() { + BlockFilter filter = new BlockFilter("foo"); + filter.setExpireTime(Instant.now().minusSeconds(1)); + + assertThat(filter.isExpired()).isTrue(); + } + + @Test + public void resetExpireDateShouldIncrementExpireDate() { + BlockFilter filter = new BlockFilter("foo"); + filter.setExpireTime(Instant.now().minus(Duration.ofDays(1))); + filter.resetExpireTime(); + + assertThat(filter.getExpireTime()) + .isBeforeOrEqualTo(Instant.now().plus(Duration.ofMinutes(10))); + } +} diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTimeoutMonitorTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTimeoutMonitorTest.java new file mode 100644 index 0000000000..098e47242f --- /dev/null +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/filter/FilterTimeoutMonitorTest.java @@ -0,0 +1,64 @@ +package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import com.google.common.collect.Lists; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class FilterTimeoutMonitorTest { + + @Mock private FilterRepository filterRepository; + + private FilterTimeoutMonitor timeoutMonitor; + + @Before + public void before() { + timeoutMonitor = new FilterTimeoutMonitor(filterRepository); + } + + @Test + public void expiredFilterShouldBeDeleted() { + Filter filter = spy(new BlockFilter("foo")); + when(filter.isExpired()).thenReturn(true); + when(filterRepository.getFilters()).thenReturn(Lists.newArrayList(filter)); + + timeoutMonitor.checkFilters(); + + verify(filterRepository).getFilters(); + verify(filterRepository).delete("foo"); + verifyNoMoreInteractions(filterRepository); + } + + @Test + public void nonExpiredFilterShouldNotBeDeleted() { + Filter filter = mock(Filter.class); + when(filter.isExpired()).thenReturn(false); + when(filterRepository.getFilters()).thenReturn(Lists.newArrayList(filter)); + + timeoutMonitor.checkFilters(); + + verify(filter).isExpired(); + verifyNoMoreInteractions(filter); + } + + @Test + public void checkEmptyFilterRepositoryDoesNothing() { + when(filterRepository.getFilters()).thenReturn(Collections.emptyList()); + + timeoutMonitor.checkFilters(); + + verify(filterRepository).getFilters(); + verifyNoMoreInteractions(filterRepository); + } +} diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java b/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java index ebc684b3b2..9425cf6e1b 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java @@ -16,6 +16,9 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration; import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.RpcApis; import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcHttpService; import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcMethodsFactory; +import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator; +import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager; +import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketConfiguration; @@ -127,6 +130,8 @@ public class RunnerBuilder { final AbstractMiningCoordinator miningCoordinator = pantheonController.getMiningCoordinator(); + final FilterManager filterManager = createFilterManager(vertx, context, transactionPool); + Optional jsonRpcHttpService = Optional.empty(); if (jsonRpcConfiguration.isEnabled()) { final Map jsonRpcMethods = @@ -139,7 +144,8 @@ public class RunnerBuilder { transactionPool, miningCoordinator, supportedCapabilities, - jsonRpcConfiguration.getRpcApis()); + jsonRpcConfiguration.getRpcApis(), + filterManager); jsonRpcHttpService = Optional.of(new JsonRpcHttpService(vertx, jsonRpcConfiguration, jsonRpcMethods)); } @@ -156,7 +162,8 @@ public class RunnerBuilder { transactionPool, miningCoordinator, supportedCapabilities, - webSocketConfiguration.getRpcApis()); + webSocketConfiguration.getRpcApis(), + filterManager); final SubscriptionManager subscriptionManager = createSubscriptionManager(vertx, context.getBlockchain(), transactionPool); @@ -179,6 +186,18 @@ public class RunnerBuilder { vertx, networkRunner, jsonRpcHttpService, webSocketService, pantheonController, dataDir); } + private FilterManager createFilterManager( + final Vertx vertx, final ProtocolContext context, final TransactionPool transactionPool) { + FilterManager filterManager = + new FilterManager( + new BlockchainQueries(context.getBlockchain(), context.getWorldStateArchive()), + transactionPool, + new FilterIdGenerator(), + new FilterRepository()); + vertx.deployVerticle(filterManager); + return filterManager; + } + private Map jsonRpcMethods( final ProtocolContext context, final ProtocolSchedule protocolSchedule, @@ -188,7 +207,8 @@ public class RunnerBuilder { final TransactionPool transactionPool, final AbstractMiningCoordinator miningCoordinator, final Set supportedCapabilities, - final Collection jsonRpcApis) { + final Collection jsonRpcApis, + final FilterManager filterManager) { final Map methods = new JsonRpcMethodsFactory() .methods( @@ -202,7 +222,8 @@ public class RunnerBuilder { protocolSchedule, miningCoordinator, supportedCapabilities, - jsonRpcApis); + jsonRpcApis, + filterManager); if (context.getConsensusState() instanceof CliqueContext) { // This is checked before entering this if branch