From d69c92dc1789932843894833f692ceeba1923135 Mon Sep 17 00:00:00 2001 From: Ratan Rai Sur Date: Thu, 24 Oct 2019 11:23:37 -0400 Subject: [PATCH] Support log reordring from reorgs in `LogSubscriptionService` (#86) Signed-off-by: Ratan Rai Sur --- .../org/hyperledger/besu/RunnerBuilder.java | 10 +- .../IbftMiningCoordinatorTest.java | 4 +- .../api/graphql/GraphQLDataFetchers.java | 2 +- .../pojoadapter/BlockAdapterBase.java | 2 +- .../internal/pojoadapter/LogAdapter.java | 4 +- .../pojoadapter/TransactionAdapter.java | 10 +- .../internal/filter/FilterManager.java | 2 +- .../jsonrpc/internal/filter/LogFilter.java | 2 +- .../internal/methods/EthGetFilterChanges.java | 2 +- .../internal/methods/EthGetFilterLogs.java | 2 +- .../jsonrpc/internal/results/LogResult.java | 4 +- .../jsonrpc/internal/results/LogsResult.java | 2 +- .../logs/LogsSubscriptionService.java | 90 +--- .../ethereum/api/query/BlockchainQueries.java | 123 ++--- .../filter/FilterManagerLogFilterTest.java | 8 +- .../internal/filter/FilterManagerTest.java | 4 +- .../methods/EthGetFilterChangesTest.java | 2 +- .../methods/EthGetFilterLogsTest.java | 2 +- ...ewBlockHeadersSubscriptionServiceTest.java | 3 +- .../logs/LogsSubscriptionServiceTest.java | 430 +++++++++++------- .../api/query/BlockchainQueriesTest.java | 1 + .../eth/eth_getLogs_toBlockOutOfRange.json | 37 +- .../AbstractMiningCoordinatorTest.java | 9 +- .../besu/ethereum/chain/BlockAddedEvent.java | 30 +- .../ethereum/chain/DefaultBlockchain.java | 202 ++++---- .../ethereum/core}/BlockWithReceipts.java | 11 +- .../besu/ethereum/core}/LogWithMetadata.java | 56 ++- .../ethereum/core/BlockDataGenerator.java | 60 ++- .../ethereum/chain/DefaultBlockchainTest.java | 105 ++++- .../sync/fastsync/DownloadReceiptsStep.java | 1 + .../sync/fastsync/FastImportBlocksStep.java | 1 + .../eth/sync/TrailingPeerLimiterTest.java | 7 +- .../fastsync/DownloadReceiptsStepTest.java | 1 + .../fastsync/FastImportBlocksStepTest.java | 1 + 34 files changed, 742 insertions(+), 488 deletions(-) rename ethereum/{eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync => core/src/main/java/org/hyperledger/besu/ethereum/core}/BlockWithReceipts.java (82%) rename ethereum/{api/src/main/java/org/hyperledger/besu/ethereum/api/query => core/src/main/java/org/hyperledger/besu/ethereum/core}/LogWithMetadata.java (63%) diff --git a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java index 5c4455a1f0..94fac0a991 100644 --- a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java @@ -453,8 +453,7 @@ public class RunnerBuilder { final SubscriptionManager subscriptionManager = createSubscriptionManager(vertx, transactionPool); - createLogsSubscriptionService( - context.getBlockchain(), context.getWorldStateArchive(), subscriptionManager); + createLogsSubscriptionService(context.getBlockchain(), subscriptionManager); createNewBlockHeadersSubscriptionService( context.getBlockchain(), context.getWorldStateArchive(), subscriptionManager); @@ -621,12 +620,9 @@ public class RunnerBuilder { } private void createLogsSubscriptionService( - final Blockchain blockchain, - final WorldStateArchive worldStateArchive, - final SubscriptionManager subscriptionManager) { + final Blockchain blockchain, final SubscriptionManager subscriptionManager) { final LogsSubscriptionService logsSubscriptionService = - new LogsSubscriptionService( - subscriptionManager, new BlockchainQueries(blockchain, worldStateArchive)); + new LogsSubscriptionService(subscriptionManager); blockchain.observeBlockAdded(logsSubscriptionService); } diff --git a/consensus/ibft/src/test/java/org/hyperledger/besu/consensus/ibft/blockcreation/IbftMiningCoordinatorTest.java b/consensus/ibft/src/test/java/org/hyperledger/besu/consensus/ibft/blockcreation/IbftMiningCoordinatorTest.java index 32f04c03c7..06f8503718 100644 --- a/consensus/ibft/src/test/java/org/hyperledger/besu/consensus/ibft/blockcreation/IbftMiningCoordinatorTest.java +++ b/consensus/ibft/src/test/java/org/hyperledger/besu/consensus/ibft/blockcreation/IbftMiningCoordinatorTest.java @@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Wei; import org.hyperledger.besu.util.bytes.BytesValue; +import java.util.Collections; import java.util.concurrent.TimeUnit; import org.assertj.core.util.Lists; @@ -85,7 +86,8 @@ public class IbftMiningCoordinatorTest { @Test public void addsNewChainHeadEventWhenNewCanonicalHeadBlockEventReceived() throws Exception { - BlockAddedEvent headAdvancement = BlockAddedEvent.createForHeadAdvancement(block); + BlockAddedEvent headAdvancement = + BlockAddedEvent.createForHeadAdvancement(block, Collections.emptyList()); ibftMiningCoordinator.onBlockAdded(headAdvancement, blockChain); assertThat(eventQueue.size()).isEqualTo(1); diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLDataFetchers.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLDataFetchers.java index e6e282cfd0..df1105274a 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLDataFetchers.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLDataFetchers.java @@ -25,7 +25,6 @@ import org.hyperledger.besu.ethereum.api.graphql.internal.pojoadapter.Transactio import org.hyperledger.besu.ethereum.api.graphql.internal.response.GraphQLError; import org.hyperledger.besu.ethereum.api.query.BlockWithMetadata; import org.hyperledger.besu.ethereum.api.query.BlockchainQueries; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; import org.hyperledger.besu.ethereum.api.query.LogsQuery; import org.hyperledger.besu.ethereum.api.query.TransactionWithMetadata; import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator; @@ -33,6 +32,7 @@ import org.hyperledger.besu.ethereum.core.Account; import org.hyperledger.besu.ethereum.core.Address; import org.hyperledger.besu.ethereum.core.Hash; import org.hyperledger.besu.ethereum.core.LogTopic; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.core.MutableWorldState; import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.ethereum.core.Transaction; diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/BlockAdapterBase.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/BlockAdapterBase.java index ef04200281..00d4c21b77 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/BlockAdapterBase.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/BlockAdapterBase.java @@ -17,13 +17,13 @@ package org.hyperledger.besu.ethereum.api.graphql.internal.pojoadapter; import org.hyperledger.besu.ethereum.api.graphql.GraphQLDataFetcherContext; import org.hyperledger.besu.ethereum.api.query.BlockWithMetadata; import org.hyperledger.besu.ethereum.api.query.BlockchainQueries; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; import org.hyperledger.besu.ethereum.api.query.LogsQuery; import org.hyperledger.besu.ethereum.api.query.TransactionWithMetadata; import org.hyperledger.besu.ethereum.core.Address; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Hash; import org.hyperledger.besu.ethereum.core.LogTopic; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.core.Wei; import org.hyperledger.besu.ethereum.core.WorldState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/LogAdapter.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/LogAdapter.java index ac459a80cd..f160dc2f12 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/LogAdapter.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/LogAdapter.java @@ -15,10 +15,10 @@ package org.hyperledger.besu.ethereum.api.graphql.internal.pojoadapter; import org.hyperledger.besu.ethereum.api.query.BlockchainQueries; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; import org.hyperledger.besu.ethereum.api.query.TransactionWithMetadata; import org.hyperledger.besu.ethereum.core.Hash; import org.hyperledger.besu.ethereum.core.LogTopic; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.util.bytes.Bytes32; import org.hyperledger.besu.util.bytes.BytesValue; @@ -70,6 +70,6 @@ public class LogAdapter extends AdapterBase { return query .getWorldState(blockNumber) - .map(ws -> new AccountAdapter(ws.get(logWithMetadata.getAddress()))); + .map(ws -> new AccountAdapter(ws.get(logWithMetadata.getLogger()))); } } diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/TransactionAdapter.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/TransactionAdapter.java index 0454904a4c..0f36359fd7 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/TransactionAdapter.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/TransactionAdapter.java @@ -15,11 +15,11 @@ package org.hyperledger.besu.ethereum.api.graphql.internal.pojoadapter; import org.hyperledger.besu.ethereum.api.query.BlockchainQueries; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; import org.hyperledger.besu.ethereum.api.query.TransactionReceiptWithMetadata; import org.hyperledger.besu.ethereum.api.query.TransactionWithMetadata; import org.hyperledger.besu.ethereum.core.Address; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.core.MutableWorldState; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.TransactionReceipt; @@ -167,13 +167,13 @@ public class TransactionAdapter extends AdapterBase { public List getLogs(final DataFetchingEnvironment environment) { final BlockchainQueries query = getBlockchainQueries(environment); final Hash hash = transactionWithMetadata.getTransaction().getHash(); - final Optional tranRpt = + final Optional maybeTransactionReceiptWithMetadata = query.transactionReceiptByTransactionHash(hash); final List results = new ArrayList<>(); - if (tranRpt.isPresent()) { + if (maybeTransactionReceiptWithMetadata.isPresent()) { final List logs = - BlockchainQueries.generateLogWithMetadataForTransaction( - tranRpt.get().getReceipt(), + LogWithMetadata.generate( + maybeTransactionReceiptWithMetadata.get().getReceipt(), transactionWithMetadata.getBlockNumber().get(), transactionWithMetadata.getBlockHash().get(), hash, diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManager.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManager.java index f832cb6b96..f51ac4b539 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManager.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManager.java @@ -18,11 +18,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.BlockParameter; import org.hyperledger.besu.ethereum.api.query.BlockchainQueries; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; import org.hyperledger.besu.ethereum.api.query.LogsQuery; import org.hyperledger.besu.ethereum.chain.BlockAddedEvent; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/LogFilter.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/LogFilter.java index 2125f85173..54b5c40dae 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/LogFilter.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/LogFilter.java @@ -15,8 +15,8 @@ package org.hyperledger.besu.ethereum.api.jsonrpc.internal.filter; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.BlockParameter; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; import org.hyperledger.besu.ethereum.api.query.LogsQuery; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import java.util.ArrayList; import java.util.List; diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterChanges.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterChanges.java index 8b54704ba4..620335f0e7 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterChanges.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterChanges.java @@ -23,8 +23,8 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorR import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.LogsResult; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import java.util.List; import java.util.stream.Collectors; diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterLogs.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterLogs.java index 3c654844be..f251e7f963 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterLogs.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterLogs.java @@ -23,7 +23,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorR import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.LogsResult; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import java.util.List; diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/results/LogResult.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/results/LogResult.java index 1d4885d69e..e252211bd5 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/results/LogResult.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/results/LogResult.java @@ -14,8 +14,8 @@ */ package org.hyperledger.besu.ethereum.api.jsonrpc.internal.results; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; import org.hyperledger.besu.ethereum.core.LogTopic; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import java.util.ArrayList; import java.util.List; @@ -53,7 +53,7 @@ public class LogResult implements JsonRpcResult { this.blockHash = logWithMetadata.getBlockHash().toString(); this.transactionHash = logWithMetadata.getTransactionHash().toString(); this.transactionIndex = Quantity.create(logWithMetadata.getTransactionIndex()); - this.address = logWithMetadata.getAddress().toString(); + this.address = logWithMetadata.getLogger().toString(); this.data = logWithMetadata.getData().toString(); this.topics = new ArrayList<>(logWithMetadata.getTopics().size()); this.removed = logWithMetadata.isRemoved(); diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/results/LogsResult.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/results/LogsResult.java index aa7ca1e3ac..c27722627a 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/results/LogsResult.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/results/LogsResult.java @@ -14,7 +14,7 @@ */ package org.hyperledger.besu.ethereum.api.jsonrpc.internal.results; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import java.util.ArrayList; import java.util.List; diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java index 38adb86e3d..b4f49d323f 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java @@ -17,97 +17,37 @@ package org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.logs; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.LogResult; import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager; import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType; -import org.hyperledger.besu.ethereum.api.query.BlockchainQueries; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; -import org.hyperledger.besu.ethereum.api.query.TransactionReceiptWithMetadata; import org.hyperledger.besu.ethereum.chain.BlockAddedEvent; import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; import org.hyperledger.besu.ethereum.chain.Blockchain; -import org.hyperledger.besu.ethereum.core.Log; import java.util.List; -import java.util.Optional; public class LogsSubscriptionService implements BlockAddedObserver { private final SubscriptionManager subscriptionManager; - private final BlockchainQueries blockchainQueries; - public LogsSubscriptionService( - final SubscriptionManager subscriptionManager, final BlockchainQueries blockchainQueries) { + public LogsSubscriptionService(final SubscriptionManager subscriptionManager) { this.subscriptionManager = subscriptionManager; - this.blockchainQueries = blockchainQueries; } @Override - public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) { + public void onBlockAdded(final BlockAddedEvent event, final Blockchain __) { final List logsSubscriptions = subscriptionManager.subscriptionsOfType(SubscriptionType.LOGS, LogsSubscription.class); - if (logsSubscriptions.isEmpty()) { - return; - } - - event.getAddedTransactions().stream() - .map(tx -> blockchainQueries.transactionReceiptByTransactionHash(tx.getHash())) - .filter(Optional::isPresent) - .map(Optional::get) - .forEachOrdered( - receiptWithMetadata -> { - final List logs = receiptWithMetadata.getReceipt().getLogs(); - sendLogsToMatchingSubscriptions(logs, logsSubscriptions, receiptWithMetadata, false); - }); - - event.getRemovedTransactions().stream() - .map(tx -> blockchainQueries.transactionReceiptByTransactionHash(tx.getHash())) - .filter(Optional::isPresent) - .map(Optional::get) - .forEachOrdered( - receiptWithMetadata -> { - final List logs = receiptWithMetadata.getReceipt().getLogs(); - sendLogsToMatchingSubscriptions(logs, logsSubscriptions, receiptWithMetadata, true); - }); - } - - private void sendLogsToMatchingSubscriptions( - final List logs, - final List logsSubscriptions, - final TransactionReceiptWithMetadata receiptWithMetadata, - final boolean removed) { - for (int logIndex = 0; logIndex < logs.size(); logIndex++) { - for (final LogsSubscription subscription : logsSubscriptions) { - if (subscription.getLogsQuery().matches(logs.get(logIndex))) { - sendLogToSubscription(receiptWithMetadata, removed, logIndex, subscription); - } - } - } - } - - private void sendLogToSubscription( - final TransactionReceiptWithMetadata receiptWithMetadata, - final boolean removed, - final int logIndex, - final LogsSubscription subscription) { - final LogWithMetadata logWithMetaData = logWithMetadata(logIndex, receiptWithMetadata, removed); - subscriptionManager.sendMessage( - subscription.getSubscriptionId(), new LogResult(logWithMetaData)); - } - - // @formatter:off - private LogWithMetadata logWithMetadata( - final int logIndex, - final TransactionReceiptWithMetadata transactionReceiptWithMetadata, - final boolean removed) { - return new LogWithMetadata( - logIndex, - transactionReceiptWithMetadata.getBlockNumber(), - transactionReceiptWithMetadata.getBlockHash(), - transactionReceiptWithMetadata.getTransactionHash(), - transactionReceiptWithMetadata.getTransactionIndex(), - transactionReceiptWithMetadata.getReceipt().getLogs().get(logIndex).getLogger(), - transactionReceiptWithMetadata.getReceipt().getLogs().get(logIndex).getData(), - transactionReceiptWithMetadata.getReceipt().getLogs().get(logIndex).getTopics(), - removed); + event + .getLogsWithMetadata() + .forEach( + logWithMetadata -> + logsSubscriptions.stream() + .filter( + logsSubscription -> + logsSubscription.getLogsQuery().matches(logWithMetadata)) + .forEach( + logsSubscription -> + subscriptionManager.sendMessage( + logsSubscription.getSubscriptionId(), + new LogResult(logWithMetadata)))); } - // @formatter:on } diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java index 7b86891fbc..331eeb76ed 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java @@ -24,6 +24,7 @@ import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockBody; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.core.MutableWorldState; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.TransactionReceipt; @@ -34,12 +35,14 @@ import org.hyperledger.besu.util.bytes.BytesValue; import org.hyperledger.besu.util.uint.UInt256; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; - -import com.google.common.collect.Lists; +import java.util.stream.IntStream; +import java.util.stream.LongStream; public class BlockchainQueries { @@ -483,100 +486,32 @@ public class BlockchainQueries { */ public List matchingLogs( final long fromBlockNumber, final long toBlockNumber, final LogsQuery query) { - if (fromBlockNumber > toBlockNumber || toBlockNumber > headBlockNumber()) { - return Lists.newArrayList(); - } - List matchingLogs = Lists.newArrayList(); - for (long blockNumber = fromBlockNumber; blockNumber <= toBlockNumber; blockNumber++) { - final Hash blockhash = blockchain.getBlockHashByNumber(blockNumber).get(); - final boolean logHasBeenRemoved = !blockchain.blockIsOnCanonicalChain(blockhash); - final List receipts = blockchain.getTxReceipts(blockhash).get(); - final List transaction = - blockchain.getBlockBody(blockhash).get().getTransactions(); - matchingLogs = - generateLogWithMetadata( - receipts, - blockNumber, - query, - blockhash, - matchingLogs, - transaction, - logHasBeenRemoved); - } - return matchingLogs; - } - - public List matchingLogs(final Hash blockhash, final LogsQuery query) { - final List matchingLogs = Lists.newArrayList(); - Optional blockHeader = blockchain.getBlockHeader(blockhash); - if (!blockHeader.isPresent()) { - return matchingLogs; + return LongStream.rangeClosed(fromBlockNumber, toBlockNumber) + .mapToObj(blockchain::getBlockHashByNumber) + .takeWhile(Optional::isPresent) + .flatMap(Optional::stream) + .flatMap(hash -> matchingLogs(hash, query).stream()) + .collect(Collectors.toList()); + } + + public List matchingLogs(final Hash blockHash, final LogsQuery query) { + final Optional blockHeader = blockchain.getBlockHeader(blockHash); + if (blockHeader.isEmpty()) { + return Collections.emptyList(); } - final List receipts = blockchain.getTxReceipts(blockhash).get(); - final List transaction = - blockchain.getBlockBody(blockhash).get().getTransactions(); + final List receipts = blockchain.getTxReceipts(blockHash).get(); + final List transactions = + blockchain.getBlockBody(blockHash).get().getTransactions(); final long number = blockHeader.get().getNumber(); - final boolean logHasBeenRemoved = !blockchain.blockIsOnCanonicalChain(blockhash); - return generateLogWithMetadata( - receipts, number, query, blockhash, matchingLogs, transaction, logHasBeenRemoved); - } - - public static List generateLogWithMetadataForTransaction( - final TransactionReceipt receipt, - final long number, - final Hash blockhash, - final Hash transactionHash, - final int transactionIndex, - final boolean removed) { - - final List logs = new ArrayList<>(); - for (int logIndex = 0; logIndex < receipt.getLogs().size(); ++logIndex) { - - final LogWithMetadata logWithMetaData = - new LogWithMetadata( - logIndex, - number, - blockhash, - transactionHash, - transactionIndex, - receipt.getLogs().get(logIndex).getLogger(), - receipt.getLogs().get(logIndex).getData(), - receipt.getLogs().get(logIndex).getTopics(), - removed); - logs.add(logWithMetaData); - } - - return logs; - } - - private List generateLogWithMetadata( - final List receipts, - final long number, - final LogsQuery query, - final Hash blockhash, - final List matchingLogs, - final List transaction, - final boolean removed) { - for (int transactionIndex = 0; transactionIndex < receipts.size(); ++transactionIndex) { - final TransactionReceipt receipt = receipts.get(transactionIndex); - for (int logIndex = 0; logIndex < receipt.getLogs().size(); ++logIndex) { - if (query.matches(receipt.getLogs().get(logIndex))) { - final LogWithMetadata logWithMetaData = - new LogWithMetadata( - logIndex, - number, - blockhash, - transaction.get(transactionIndex).getHash(), - transactionIndex, - receipts.get(transactionIndex).getLogs().get(logIndex).getLogger(), - receipts.get(transactionIndex).getLogs().get(logIndex).getData(), - receipts.get(transactionIndex).getLogs().get(logIndex).getTopics(), - removed); - matchingLogs.add(logWithMetaData); - } - } - } - return matchingLogs; + final boolean removed = !blockchain.blockIsOnCanonicalChain(blockHash); + return IntStream.range(0, receipts.size()) + .mapToObj( + i -> + LogWithMetadata.generate( + receipts.get(i), number, blockHash, transactions.get(i).getHash(), i, removed)) + .flatMap(Collection::stream) + .filter(query::matches) + .collect(Collectors.toList()); } /** diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerLogFilterTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerLogFilterTest.java index 25904f3c17..e62079b7f0 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerLogFilterTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerLogFilterTest.java @@ -28,13 +28,14 @@ import static org.mockito.Mockito.when; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.BlockParameter; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.Quantity; import org.hyperledger.besu.ethereum.api.query.BlockchainQueries; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; import org.hyperledger.besu.ethereum.api.query.LogsQuery; import org.hyperledger.besu.ethereum.chain.BlockAddedEvent; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.Address; +import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.util.bytes.BytesValue; @@ -145,8 +146,11 @@ public class FilterManagerLogFilterTest { } private void recordNewBlockEvent() { + final BlockDataGenerator gen = new BlockDataGenerator(); + final Block block = gen.block(); filterManager.recordBlockEvent( - BlockAddedEvent.createForHeadAdvancement(new BlockDataGenerator().block()), + BlockAddedEvent.createForHeadAdvancement( + block, LogWithMetadata.generate(block, gen.receipts(block), false)), blockchainQueries.getBlockchain()); } diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerTest.java index 500abbe188..31e12ee6fd 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerTest.java @@ -31,6 +31,7 @@ import org.hyperledger.besu.ethereum.core.Hash; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; +import java.util.Collections; import java.util.List; import java.util.Optional; @@ -232,7 +233,8 @@ public class FilterManagerTest { new BlockDataGenerator.BlockOptions().setBlockNumber(blockNumber).setParentHash(parentHash); currentBlock = blockGenerator.block(options); filterManager.recordBlockEvent( - BlockAddedEvent.createForHeadAdvancement(currentBlock), blockchainQueries.getBlockchain()); + BlockAddedEvent.createForHeadAdvancement(currentBlock, Collections.emptyList()), + blockchainQueries.getBlockchain()); return currentBlock.getHash(); } diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterChangesTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterChangesTest.java index 1c60d3d920..7b640376d1 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterChangesTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterChangesTest.java @@ -31,9 +31,9 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorR import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.LogsResult; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; import org.hyperledger.besu.ethereum.core.Address; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.util.bytes.BytesValue; import java.util.List; diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterLogsTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterLogsTest.java index e3a673a330..c543108cba 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterLogsTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetFilterLogsTest.java @@ -29,9 +29,9 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorR import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.LogsResult; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; import org.hyperledger.besu.ethereum.core.Address; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.util.bytes.BytesValue; import java.util.ArrayList; diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java index dde8e560b5..681b8b50cf 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java @@ -174,7 +174,8 @@ public class NewBlockHeadersSubscriptionServiceTest { final BlockBody blockBody = new BlockBody(Collections.emptyList(), Collections.emptyList()); final Block testBlock = new Block(blockHeader, blockBody); newBlockHeadersSubscriptionService.onBlockAdded( - BlockAddedEvent.createForHeadAdvancement(testBlock), blockchainQueries.getBlockchain()); + BlockAddedEvent.createForHeadAdvancement(testBlock, Collections.emptyList()), + blockchainQueries.getBlockchain()); verify(blockchainQueries, times(1)).getBlockchain(); } diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java index 4f0caf009f..3bb25154e7 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java @@ -14,249 +14,365 @@ */ package org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.logs; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.refEq; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.hyperledger.besu.crypto.SECP256K1.KeyPair; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.FilterParameter; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.LogResult; +import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.Quantity; import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager; -import org.hyperledger.besu.ethereum.api.query.BlockchainQueries; -import org.hyperledger.besu.ethereum.api.query.LogWithMetadata; -import org.hyperledger.besu.ethereum.api.query.TransactionReceiptWithMetadata; -import org.hyperledger.besu.ethereum.chain.BlockAddedEvent; -import org.hyperledger.besu.ethereum.chain.Blockchain; +import org.hyperledger.besu.ethereum.api.query.TopicsParameter; +import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.Address; import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions; import org.hyperledger.besu.ethereum.core.BlockHeader; -import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; -import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.BlockWithReceipts; +import org.hyperledger.besu.ethereum.core.InMemoryStorageProvider; import org.hyperledger.besu.ethereum.core.Log; +import org.hyperledger.besu.ethereum.core.LogTopic; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.TransactionReceipt; -import org.hyperledger.besu.ethereum.core.TransactionTestFixture; -import org.hyperledger.besu.util.bytes.BytesValue; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.collect.Lists; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class LogsSubscriptionServiceTest { - private final KeyPair keyPair = KeyPair.generate(); - private final BlockHeaderTestFixture blockHeaderTestFixture = new BlockHeaderTestFixture(); - private final TransactionTestFixture txTestFixture = new TransactionTestFixture(); + private final BlockDataGenerator gen = new BlockDataGenerator(1); + private final MutableBlockchain blockchain = + InMemoryStorageProvider.createInMemoryBlockchain(gen.genesisBlock()); private LogsSubscriptionService logsSubscriptionService; + private final AtomicLong nextSubscriptionId = new AtomicLong(); @Mock private SubscriptionManager subscriptionManager; - @Mock private BlockchainQueries blockchainQueries; - @Mock private Blockchain blockchain; @Before public void before() { - logsSubscriptionService = new LogsSubscriptionService(subscriptionManager, blockchainQueries); + logsSubscriptionService = new LogsSubscriptionService(subscriptionManager); + blockchain.observeBlockAdded(logsSubscriptionService); } @Test - public void shouldSendLogMessageWhenBlockAddedEventHasAddedTransactionsMatchingSubscription() { - final Address address = Address.fromHexString("0x0"); - final LogsSubscription subscription = createSubscription(address); - final Transaction transaction = createTransaction(); - final Log log = createLog(address); - final LogResult expectedLogResult = createLogResult(transaction, log, false); + public void singleMatchingLogEvent() { + final BlockWithReceipts blockWithReceipts = generateBlock(2, 2, 2); + final Block block = blockWithReceipts.getBlock(); + final List receipts = blockWithReceipts.getReceipts(); - logsSubscriptionService.onBlockAdded(createBlockAddedEvent(transaction, null), blockchain); + final int txIndex = 1; + final int logIndex = 1; + final Log targetLog = receipts.get(txIndex).getLogs().get(logIndex); - verify(subscriptionManager) - .sendMessage( - ArgumentMatchers.eq(subscription.getSubscriptionId()), refEq(expectedLogResult)); - } + final LogsSubscription subscription = createSubscription(targetLog.getLogger()); + registerSubscriptions(subscription); + blockchain.appendBlock(blockWithReceipts.getBlock(), blockWithReceipts.getReceipts()); - @Test - public void shouldSendLogMessageWhenBlockAddedEventHasRemovedTransactionsMatchingSubscription() { - final Address address = Address.fromHexString("0x0"); - final LogsSubscription subscription = createSubscription(address); - final Transaction transaction = createTransaction(); - final Log log = createLog(address); - final LogResult expectedLogResult = createLogResult(transaction, log, true); + final ArgumentCaptor captor = ArgumentCaptor.forClass(LogResult.class); + verify(subscriptionManager).sendMessage(eq(subscription.getSubscriptionId()), captor.capture()); - logsSubscriptionService.onBlockAdded(createBlockAddedEvent(null, transaction), blockchain); + final List logResults = captor.getAllValues(); - verify(subscriptionManager) - .sendMessage( - ArgumentMatchers.eq(subscription.getSubscriptionId()), refEq(expectedLogResult)); + assertThat(logResults).hasSize(1); + final LogResult result = logResults.get(0); + assertLogResultMatches(result, block, receipts, txIndex, logIndex, false); } @Test - public void shouldSendMessageForAllLogsMatchingSubscription() { - final Address address = Address.fromHexString("0x0"); - final Log log = createLog(address); - final LogsSubscription subscription = createSubscription(address); - final List addedTransactions = createTransactionsWithLog(log); - final List removedTransactions = createTransactionsWithLog(log); + public void singleMatchingLogEmittedThenRemovedInReorg() { + // Create block that emits an event + final BlockWithReceipts blockWithReceipts = generateBlock(2, 2, 2); + final Block block = blockWithReceipts.getBlock(); + final List receipts = blockWithReceipts.getReceipts(); + + final int txIndex = 1; + final int logIndex = 1; + final Log targetLog = receipts.get(txIndex).getLogs().get(logIndex); + + final LogsSubscription subscription = createSubscription(targetLog.getLogger()); + registerSubscriptions(subscription); + blockchain.appendBlock(blockWithReceipts.getBlock(), blockWithReceipts.getReceipts()); + + // Cause a reorg that removes the block which emitted an event + BlockHeader parentHeader = blockchain.getGenesisBlock().getHeader(); + while (!blockchain.getChainHeadHash().equals(parentHeader.getHash())) { + final BlockWithReceipts newBlock = generateBlock(parentHeader, 2, 0, 0); + parentHeader = newBlock.getBlock().getHeader(); + blockchain.appendBlock(newBlock.getBlock(), newBlock.getReceipts()); + } - logsSubscriptionService.onBlockAdded( - createBlockAddedEvent(addedTransactions, removedTransactions), blockchain); + final ArgumentCaptor captor = ArgumentCaptor.forClass(LogResult.class); + verify(subscriptionManager, times(2)) + .sendMessage(eq(subscription.getSubscriptionId()), captor.capture()); - final int totalOfLogs = addedTransactions.size() + removedTransactions.size(); + final List logResults = captor.getAllValues(); - verify(subscriptionManager, times(totalOfLogs)) - .sendMessage(ArgumentMatchers.eq(subscription.getSubscriptionId()), any()); + assertThat(logResults).hasSize(2); + final LogResult firstLog = logResults.get(0); + assertLogResultMatches(firstLog, block, receipts, txIndex, logIndex, false); + final LogResult secondLog = logResults.get(1); + assertLogResultMatches(secondLog, block, receipts, txIndex, logIndex, true); } @Test - public void shouldSendLogMessageToAllMatchingSubscriptions() { - final Address address = Address.fromHexString("0x0"); - final List subscriptions = createSubscriptions(address); - final Transaction transaction = createTransaction(); - final Log log = createLog(address); - final LogResult expectedLogResult = createLogResult(transaction, log, false); - - logsSubscriptionService.onBlockAdded(createBlockAddedEvent(transaction, null), blockchain); + public void singleMatchingLogEmittedThenMovedInReorg() { + // Create block that emits an event + final BlockWithReceipts blockWithReceipts = generateBlock(2, 2, 2); + final Block block = blockWithReceipts.getBlock(); + final List receipts = blockWithReceipts.getReceipts(); + + final int txIndex = 1; + final int logIndex = 1; + final Log targetLog = receipts.get(txIndex).getLogs().get(logIndex); + + final LogsSubscription subscription = createSubscription(targetLog.getLogger()); + registerSubscriptions(subscription); + blockchain.appendBlock(blockWithReceipts.getBlock(), blockWithReceipts.getReceipts()); + + // Cause a reorg that removes the block which emitted an event + BlockHeader parentHeader = blockchain.getGenesisBlock().getHeader(); + while (!blockchain.getChainHeadHash().equals(parentHeader.getHash())) { + final BlockWithReceipts newBlock = generateBlock(parentHeader, 2, 0, 0); + parentHeader = newBlock.getBlock().getHeader(); + blockchain.appendBlock(newBlock.getBlock(), newBlock.getReceipts()); + } - verify(subscriptionManager, times(subscriptions.size())) - .sendMessage(any(), refEq(expectedLogResult)); + // Now add another block that re-emits the target log + final BlockWithReceipts newBlockWithLog = + generateBlock(1, () -> Collections.singletonList(targetLog)); + blockchain.appendBlock(newBlockWithLog.getBlock(), newBlockWithLog.getReceipts()); + // Sanity check + assertThat(blockchain.getChainHeadHash()).isEqualTo(newBlockWithLog.getBlock().getHash()); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(LogResult.class); + verify(subscriptionManager, times(3)) + .sendMessage(eq(subscription.getSubscriptionId()), captor.capture()); + + final List logResults = captor.getAllValues(); + + assertThat(logResults).hasSize(3); + final LogResult originalLog = logResults.get(0); + assertLogResultMatches(originalLog, block, receipts, txIndex, logIndex, false); + final LogResult removedLog = logResults.get(1); + assertLogResultMatches(removedLog, block, receipts, txIndex, logIndex, true); + final LogResult updatedLog = logResults.get(2); + assertLogResultMatches( + updatedLog, newBlockWithLog.getBlock(), newBlockWithLog.getReceipts(), 0, 0, false); } @Test - public void shouldNotSendLogMessageWhenBlockAddedEventHasNoTransactions() { - final Address address = Address.fromHexString("0x0"); - createSubscription(address); + public void multipleMatchingLogsEmitted() { + final Log targetLog = gen.log(); + final Log otherLog = gen.log(); + final List logs = Arrays.asList(targetLog, otherLog); + + final LogsSubscription subscription = createSubscription(targetLog.getLogger()); + registerSubscriptions(subscription); + + // Generate blocks with multiple logs matching subscription + final int txCount = 2; + final List targetBlocks = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + final BlockWithReceipts blockWithReceipts = generateBlock(txCount, () -> logs); + targetBlocks.add(blockWithReceipts); + blockchain.appendBlock(blockWithReceipts.getBlock(), blockWithReceipts.getReceipts()); + + // Add another block with unrelated logs + final BlockWithReceipts otherBlock = generateBlock(txCount, 2, 2); + blockchain.appendBlock(otherBlock.getBlock(), otherBlock.getReceipts()); + } - logsSubscriptionService.onBlockAdded( - createBlockAddedEvent(Collections.emptyList(), Collections.emptyList()), blockchain); + final ArgumentCaptor captor = ArgumentCaptor.forClass(LogResult.class); + verify(subscriptionManager, times(targetBlocks.size() * txCount)) + .sendMessage(eq(subscription.getSubscriptionId()), captor.capture()); + final List logResults = captor.getAllValues(); + + // Verify all logs are emitted + assertThat(logResults).hasSize(targetBlocks.size() * txCount); + for (int i = 0; i < targetBlocks.size(); i++) { + final BlockWithReceipts targetBlock = targetBlocks.get(i); + for (int j = 0; j < txCount; j++) { + final int resultIndex = i * txCount + j; + assertLogResultMatches( + logResults.get(resultIndex), + targetBlock.getBlock(), + targetBlock.getReceipts(), + j, + 0, + false); + } + } + } - verify(subscriptionManager).subscriptionsOfType(any(), any()); - verify(subscriptionManager, times(0)).sendMessage(any(), any()); + @Test + public void multipleSubscriptionsForSingleMatchingLog() { + final BlockWithReceipts blockWithReceipts = generateBlock(2, 2, 2); + final Block block = blockWithReceipts.getBlock(); + final List receipts = blockWithReceipts.getReceipts(); + + final int txIndex = 1; + final int logIndex = 1; + final Log targetLog = receipts.get(txIndex).getLogs().get(logIndex); + + final List subscriptions = + Stream.generate(() -> createSubscription(targetLog.getLogger())) + .limit(3) + .collect(Collectors.toList()); + registerSubscriptions(subscriptions); + blockchain.appendBlock(blockWithReceipts.getBlock(), blockWithReceipts.getReceipts()); + + for (LogsSubscription subscription : subscriptions) { + final ArgumentCaptor captor = ArgumentCaptor.forClass(LogResult.class); + verify(subscriptionManager) + .sendMessage(eq(subscription.getSubscriptionId()), captor.capture()); + + final List logResults = captor.getAllValues(); + + assertThat(logResults).hasSize(1); + final LogResult result = logResults.get(0); + assertLogResultMatches(result, block, receipts, txIndex, logIndex, false); + } } @Test - public void shouldNotSendLogMessageWhenLogsDoNotMatchAnySubscription() { - createSubscription(Address.fromHexString("0x0")); - final Transaction transaction = createTransaction(); - final Log log = createLog(Address.fromHexString("0x1")); - createLogResult(transaction, log, false); + public void noLogsEmitted() { + final Address address = Address.fromHexString("0x0"); + final LogsSubscription subscription = createSubscription(address); + registerSubscriptions(subscription); - logsSubscriptionService.onBlockAdded(createBlockAddedEvent(transaction, null), blockchain); + final BlockWithReceipts blockWithReceipts = generateBlock(2, 0, 0); + blockchain.appendBlock(blockWithReceipts.getBlock(), blockWithReceipts.getReceipts()); - verify(subscriptionManager).subscriptionsOfType(any(), any()); - verify(subscriptionManager, times(0)).sendMessage(any(), any()); + final ArgumentCaptor captor = ArgumentCaptor.forClass(LogResult.class); + verify(subscriptionManager, times(0)) + .sendMessage(eq(subscription.getSubscriptionId()), captor.capture()); } - private Transaction createTransaction() { - return txTestFixture.createTransaction(keyPair); + @Test + public void noMatchingLogsEmitted() { + final Address address = Address.fromHexString("0x0"); + final LogsSubscription subscription = createSubscription(address); + registerSubscriptions(subscription); + + final BlockWithReceipts blockWithReceipts = generateBlock(2, 2, 2); + blockchain.appendBlock(blockWithReceipts.getBlock(), blockWithReceipts.getReceipts()); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(LogResult.class); + verify(subscriptionManager, times(0)) + .sendMessage(eq(subscription.getSubscriptionId()), captor.capture()); } - private Log createLog(final Address address) { - return new Log(address, BytesValue.EMPTY, Collections.emptyList()); + private void assertLogResultMatches( + final LogResult result, + final Block block, + final List receipts, + final int txIndex, + final int logIndex, + final boolean isRemoved) { + final Transaction expectedTransaction = block.getBody().getTransactions().get(txIndex); + final Log expectedLog = receipts.get(txIndex).getLogs().get(logIndex); + + assertThat(result.getLogIndex()).isEqualTo(Quantity.create(logIndex)); + assertThat(result.getTransactionIndex()).isEqualTo(Quantity.create(txIndex)); + assertThat(result.getBlockNumber()).isEqualTo(Quantity.create(block.getHeader().getNumber())); + assertThat(result.getBlockHash()).isEqualTo(block.getHash().toString()); + assertThat(result.getTransactionHash()).isEqualTo(expectedTransaction.getHash().toString()); + assertThat(result.getAddress()).isEqualTo(expectedLog.getLogger().toString()); + assertThat(result.getData()).isEqualTo(expectedLog.getData().toString()); + assertThat(result.getTopics()) + .isEqualTo( + expectedLog.getTopics().stream().map(LogTopic::toString).collect(Collectors.toList())); + assertThat(result.isRemoved()).isEqualTo(isRemoved); } - private LogsSubscription createSubscription(final Address address) { - final FilterParameter filterParameter = - new FilterParameter(null, null, Lists.newArrayList(address.toString()), null, null); - final LogsSubscription logsSubscription = new LogsSubscription(1L, "conn", filterParameter); - when(subscriptionManager.subscriptionsOfType(any(), any())) - .thenReturn(Lists.newArrayList(logsSubscription)); - return logsSubscription; + private BlockWithReceipts generateBlock( + final int txCount, final int logsPerTx, final int topicsPerLog) { + final BlockHeader parent = blockchain.getChainHeadHeader(); + return generateBlock(parent, txCount, () -> gen.logs(logsPerTx, topicsPerLog)); } - private List createSubscriptions(final Address address) { - final List subscriptions = new ArrayList<>(); - for (long i = 0; i < 3; i++) { - final FilterParameter filterParameter = - new FilterParameter(null, null, Lists.newArrayList(address.toString()), null, null); - subscriptions.add(new LogsSubscription(i, "conn", filterParameter)); - } - when(subscriptionManager.subscriptionsOfType(any(), any())) - .thenReturn(Lists.newArrayList(subscriptions)); - return subscriptions; + private BlockWithReceipts generateBlock( + final BlockHeader parentHeader, + final int txCount, + final int logsPerTx, + final int topicsPerLog) { + return generateBlock(parentHeader, txCount, () -> gen.logs(logsPerTx, topicsPerLog)); } - private LogResult createLogResult( - final Transaction transaction, final Log log, final boolean removed) { - final TransactionReceiptWithMetadata txReceiptWithMetadata = - createTransactionWithLog(transaction, log); - final LogWithMetadata logWithMetadata = createLogWithMetadata(txReceiptWithMetadata, removed); - return new LogResult(logWithMetadata); + private BlockWithReceipts generateBlock( + final int txCount, final Supplier> logsSupplier) { + final BlockHeader parent = blockchain.getChainHeadHeader(); + return generateBlock(parent, txCount, logsSupplier); } - private TransactionReceiptWithMetadata createTransactionWithLog( - final Transaction transaction, final Log log) { - final BlockHeader blockHeader = blockHeaderTestFixture.buildHeader(); - final TransactionReceipt transactionReceipt = - new TransactionReceipt(Hash.ZERO, 1L, Lists.newArrayList(log), Optional.empty()); - final TransactionReceiptWithMetadata transactionReceiptWithMetadata = - TransactionReceiptWithMetadata.create( - transactionReceipt, - transaction, - transaction.getHash(), - 0, - 1L, - blockHeader.getHash(), - blockHeader.getNumber()); + private BlockWithReceipts generateBlock( + final BlockHeader parentHeader, final int txCount, final Supplier> logsSupplier) { + final List receipts = new ArrayList<>(); + final List logs = new ArrayList<>(); + final BlockOptions blockOptions = BlockOptions.create(); + for (int i = 0; i < txCount; i++) { + final Transaction tx = gen.transaction(); + final TransactionReceipt receipt = gen.receipt(logsSupplier.get()); + + receipts.add(receipt); + receipt.getLogs().forEach(logs::add); + blockOptions.addTransaction(tx); + } - when(blockchainQueries.transactionReceiptByTransactionHash(eq(transaction.getHash()))) - .thenReturn(Optional.of(transactionReceiptWithMetadata)); + blockOptions.setParentHash(parentHeader.getHash()); + blockOptions.setBlockNumber(parentHeader.getNumber() + 1L); + final Block block = gen.block(blockOptions); - return transactionReceiptWithMetadata; + return new BlockWithReceipts(block, receipts); } - private BlockAddedEvent createBlockAddedEvent( - final Transaction addedTransaction, final Transaction removedTransaction) { - final Block block = mock(Block.class); - return BlockAddedEvent.createForChainReorg( - block, - addedTransaction != null ? Lists.newArrayList(addedTransaction) : Collections.emptyList(), - removedTransaction != null - ? Lists.newArrayList(removedTransaction) - : Collections.emptyList()); + private LogsSubscription createSubscription(final Address address) { + return createSubscription(Arrays.asList(address), Collections.emptyList()); } - private BlockAddedEvent createBlockAddedEvent( - final List addedTransactions, final List removedTransactions) { - final Block block = mock(Block.class); - return BlockAddedEvent.createForChainReorg( - block, - addedTransactions != null ? Lists.newArrayList(addedTransactions) : Collections.emptyList(), - removedTransactions != null - ? Lists.newArrayList(removedTransactions) - : Collections.emptyList()); + private LogsSubscription createSubscription( + final List
addresses, final List> logTopics) { + // TODO: FilterParameter constructor should work with proper types instead of Strings + final List addressStrings = + addresses.stream().map(Address::toString).collect(Collectors.toList()); + final List> topicStrings = + logTopics.stream() + .map(topics -> topics.stream().map(LogTopic::toString).collect(Collectors.toList())) + .collect(Collectors.toList()); + + final FilterParameter filterParameter = + new FilterParameter(null, null, addressStrings, new TopicsParameter(topicStrings), null); + final LogsSubscription logsSubscription = + new LogsSubscription(nextSubscriptionId.incrementAndGet(), "conn", filterParameter); + return logsSubscription; } - private List createTransactionsWithLog(final Log log) { - final ArrayList transactions = - Lists.newArrayList(createTransaction(), createTransaction(), createTransaction()); - transactions.forEach(tx -> createTransactionWithLog(tx, log)); - return transactions; + private void registerSubscriptions(final LogsSubscription... subscriptions) { + registerSubscriptions(Arrays.asList(subscriptions)); } - private LogWithMetadata createLogWithMetadata( - final TransactionReceiptWithMetadata transactionReceiptWithMetadata, final boolean removed) { - return new LogWithMetadata( - 0, - transactionReceiptWithMetadata.getBlockNumber(), - transactionReceiptWithMetadata.getBlockHash(), - transactionReceiptWithMetadata.getTransactionHash(), - transactionReceiptWithMetadata.getTransactionIndex(), - transactionReceiptWithMetadata.getReceipt().getLogs().get(0).getLogger(), - transactionReceiptWithMetadata.getReceipt().getLogs().get(0).getData(), - transactionReceiptWithMetadata.getReceipt().getLogs().get(0).getTopics(), - removed); + private void registerSubscriptions(final List subscriptions) { + when(subscriptionManager.subscriptionsOfType(any(), any())) + .thenReturn(Lists.newArrayList(subscriptions)); } } diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueriesTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueriesTest.java index 4ff416d8db..23e446b13f 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueriesTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueriesTest.java @@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.core.Wei; diff --git a/ethereum/api/src/test/resources/org/hyperledger/besu/ethereum/api/jsonrpc/eth/eth_getLogs_toBlockOutOfRange.json b/ethereum/api/src/test/resources/org/hyperledger/besu/ethereum/api/jsonrpc/eth/eth_getLogs_toBlockOutOfRange.json index fbbe13dcde..22cbf3e71e 100644 --- a/ethereum/api/src/test/resources/org/hyperledger/besu/ethereum/api/jsonrpc/eth/eth_getLogs_toBlockOutOfRange.json +++ b/ethereum/api/src/test/resources/org/hyperledger/besu/ethereum/api/jsonrpc/eth/eth_getLogs_toBlockOutOfRange.json @@ -3,17 +3,40 @@ "id": 406, "jsonrpc": "2.0", "method": "eth_getLogs", - "params": [{ - "fromBlock": "0x17", - "toBlock": "0x21", - "address": [], - "topics": [["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null]] - }] + "params": [ + { + "fromBlock": "0x20", + "toBlock": "0x21", + "address": [], + "topics": [ + [ + "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", + null + ] + ] + } + ] }, "response": { "jsonrpc": "2.0", "id": 406, - "result" : [] + "result": [ + { + "logIndex": "0x0", + "removed": false, + "blockNumber": "0x20", + "blockHash": "0x71d59849ddd98543bdfbe8548f5eed559b07b8aaf196369f39134500eab68e53", + "transactionHash": "0xcef53f2311d7c80e9086d661e69ac11a5f3d081e28e02a9ba9b66749407ac310", + "transactionIndex": "0x0", + "address": "0x6295ee1b4f6dd65047762f924ecd367c17eabf8f", + "data": "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffe9000000000000000000000000000000000000000000000000000000000000002a", + "topics": [ + "0x0000000000000000000000000000000000000000000000000000000000000001", + "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", + "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + ] + } + ] }, "statusCode": 200 } \ No newline at end of file diff --git a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractMiningCoordinatorTest.java b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractMiningCoordinatorTest.java index ac558966ab..a5cce67ec4 100644 --- a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractMiningCoordinatorTest.java +++ b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractMiningCoordinatorTest.java @@ -96,7 +96,8 @@ public class AbstractMiningCoordinatorTest { when(syncState.isInSync()).thenReturn(false); miningCoordinator.enable(); - miningCoordinator.onBlockAdded(BlockAddedEvent.createForHeadAdvancement(BLOCK), blockchain); + miningCoordinator.onBlockAdded( + BlockAddedEvent.createForHeadAdvancement(BLOCK, Collections.emptyList()), blockchain); verifyNoMoreInteractions(minerExecutor, blockMiner); } @@ -106,7 +107,8 @@ public class AbstractMiningCoordinatorTest { when(syncState.isInSync()).thenReturn(true); miningCoordinator.enable(); - miningCoordinator.onBlockAdded(BlockAddedEvent.createForHeadAdvancement(BLOCK), blockchain); + miningCoordinator.onBlockAdded( + BlockAddedEvent.createForHeadAdvancement(BLOCK, Collections.emptyList()), blockchain); verify(blockMiner).cancel(); verify(minerExecutor, times(2)).startAsyncMining(any(), any()); @@ -124,7 +126,8 @@ public class AbstractMiningCoordinatorTest { @Test public void shouldNotStartMiningWhenBlockAddedAndInSyncIfMinerNotEnabled() { when(syncState.isInSync()).thenReturn(true); - miningCoordinator.onBlockAdded(BlockAddedEvent.createForHeadAdvancement(BLOCK), blockchain); + miningCoordinator.onBlockAdded( + BlockAddedEvent.createForHeadAdvancement(BLOCK, Collections.emptyList()), blockchain); verifyNoMoreInteractions(minerExecutor, blockMiner); } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BlockAddedEvent.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BlockAddedEvent.java index b19713e2e9..9a4573452c 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BlockAddedEvent.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BlockAddedEvent.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.chain; import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.core.Transaction; import java.util.Collections; @@ -26,6 +27,7 @@ public class BlockAddedEvent { private final List addedTransactions; private final List removedTransactions; private final EventType eventType; + private List logsWithMetadata; public enum EventType { HEAD_ADVANCED, @@ -37,29 +39,41 @@ public class BlockAddedEvent { final EventType eventType, final Block block, final List addedTransactions, - final List removedTransactions) { + final List removedTransactions, + final List logsWithMetadata) { this.eventType = eventType; this.block = block; this.addedTransactions = addedTransactions; this.removedTransactions = removedTransactions; + this.logsWithMetadata = logsWithMetadata; } - public static BlockAddedEvent createForHeadAdvancement(final Block block) { + public static BlockAddedEvent createForHeadAdvancement( + final Block block, final List logsWithMetadata) { return new BlockAddedEvent( - EventType.HEAD_ADVANCED, block, block.getBody().getTransactions(), Collections.emptyList()); + EventType.HEAD_ADVANCED, + block, + block.getBody().getTransactions(), + Collections.emptyList(), + logsWithMetadata); } public static BlockAddedEvent createForChainReorg( final Block block, final List addedTransactions, - final List removedTransactions) { + final List removedTransactions, + final List logsWithMetadata) { return new BlockAddedEvent( - EventType.CHAIN_REORG, block, addedTransactions, removedTransactions); + EventType.CHAIN_REORG, block, addedTransactions, removedTransactions, logsWithMetadata); } public static BlockAddedEvent createForFork(final Block block) { return new BlockAddedEvent( - EventType.FORK, block, Collections.emptyList(), Collections.emptyList()); + EventType.FORK, + block, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList()); } public Block getBlock() { @@ -81,4 +95,8 @@ public class BlockAddedEvent { public List getRemovedTransactions() { return removedTransactions; } + + public List getLogsWithMetadata() { + return logsWithMetadata; + } } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java index 9bb8c1f05a..713d718435 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java @@ -22,7 +22,9 @@ import static java.util.stream.Collectors.toList; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockBody; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.metrics.BesuMetricCategory; @@ -41,8 +43,11 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; public class DefaultBlockchain implements MutableBlockchain { @@ -134,21 +139,11 @@ public class DefaultBlockchain implements MutableBlockchain { private static boolean validateStorageNonEmpty(final BlockchainStorage blockchainStorage) { // Run a few basic checks to make sure data looks available and consistent - final Optional maybeHead = blockchainStorage.getChainHead(); - if (maybeHead.isEmpty()) { - return false; - } - final Optional genesisHash = - blockchainStorage.getBlockHash(BlockHeader.GENESIS_BLOCK_NUMBER); - if (genesisHash.isEmpty()) { - return false; - } - final Optional td = blockchainStorage.getTotalDifficulty(maybeHead.get()); - if (td.isEmpty()) { - return false; - } - - return true; + return blockchainStorage + .getChainHead() + .flatMap(blockchainStorage::getTotalDifficulty) + .isPresent() + && blockchainStorage.getBlockHash(BlockHeader.GENESIS_BLOCK_NUMBER).isPresent(); } @Override @@ -171,6 +166,11 @@ public class DefaultBlockchain implements MutableBlockchain { return chainHeader; } + @Override + public Block getChainHeadBlock() { + return new Block(chainHeader, blockchainStorage.getBlockBody(chainHeader.getHash()).get()); + } + @Override public Optional getBlockHeader(final long blockNumber) { return blockchainStorage.getBlockHash(blockNumber).flatMap(blockchainStorage::getBlockHeader); @@ -225,16 +225,16 @@ public class DefaultBlockchain implements MutableBlockchain { if (blockIsAlreadyTracked(block)) { return; } - if (!blockIsConnected(block)) { - throw new IllegalArgumentException("Attempt to append non-connected block."); - } + checkArgument(blockIsConnected(block), "Attempt to append non-connected block."); - final BlockAddedEvent blockAddedEvent = appendBlockHelper(block, receipts); + final BlockAddedEvent blockAddedEvent = + appendBlockHelper(new BlockWithReceipts(block, receipts)); notifyBlockAdded(blockAddedEvent); } - private BlockAddedEvent appendBlockHelper( - final Block block, final List receipts) { + private BlockAddedEvent appendBlockHelper(final BlockWithReceipts blockWithReceipts) { + final Block block = blockWithReceipts.getBlock(); + final List receipts = blockWithReceipts.getReceipts(); final Hash hash = block.getHash(); final UInt256 td = calculateTotalDifficulty(block); @@ -246,7 +246,8 @@ public class DefaultBlockchain implements MutableBlockchain { updater.putTotalDifficulty(hash, td); // Update canonical chain data - final BlockAddedEvent blockAddedEvent = updateCanonicalChainData(updater, block, td); + final BlockAddedEvent blockAddedEvent = + updateCanonicalChainData(updater, blockWithReceipts, td); updater.commit(); if (blockAddedEvent.isNewCanonicalHead()) { @@ -261,19 +262,19 @@ public class DefaultBlockchain implements MutableBlockchain { return block.getHeader().getDifficulty(); } - final Optional maybeParentId = - blockchainStorage.getTotalDifficulty(block.getHeader().getParentHash()); - if (!maybeParentId.isPresent()) { - throw new IllegalStateException("Blockchain is missing total difficulty data."); - } - final UInt256 parentTd = maybeParentId.get(); - return block.getHeader().getDifficulty().plus(parentTd); + final UInt256 parentTotalDifficulty = + blockchainStorage + .getTotalDifficulty(block.getHeader().getParentHash()) + .orElseThrow( + () -> new IllegalStateException("Blockchain is missing total difficulty data.")); + return block.getHeader().getDifficulty().plus(parentTotalDifficulty); } private BlockAddedEvent updateCanonicalChainData( final BlockchainStorage.Updater updater, - final Block newBlock, + final BlockWithReceipts blockWithReceipts, final UInt256 totalDifficulty) { + final Block newBlock = blockWithReceipts.getBlock(); final Hash chainHead = blockchainStorage.getChainHead().orElse(null); if (newBlock.getHeader().getNumber() != BlockHeader.GENESIS_BLOCK_NUMBER && chainHead == null) { throw new IllegalStateException("Blockchain is missing chain head."); @@ -286,11 +287,14 @@ public class DefaultBlockchain implements MutableBlockchain { updater.putBlockHash(newBlock.getHeader().getNumber(), newBlockHash); updater.setChainHead(newBlockHash); indexTransactionForBlock(updater, newBlockHash, newBlock.getBody().getTransactions()); - return BlockAddedEvent.createForHeadAdvancement(newBlock); + return BlockAddedEvent.createForHeadAdvancement( + newBlock, + LogWithMetadata.generate( + blockWithReceipts.getBlock(), blockWithReceipts.getReceipts(), false)); } else if (totalDifficulty.compareTo(blockchainStorage.getTotalDifficulty(chainHead).get()) > 0) { // New block represents a chain reorganization - return handleChainReorg(updater, newBlock); + return handleChainReorg(updater, blockWithReceipts); } else { // New block represents a fork return handleFork(updater, newBlock); @@ -307,11 +311,11 @@ public class DefaultBlockchain implements MutableBlockchain { final Collection forkHeads = blockchainStorage.getForkHeads(); // Check to see if this block advances any existing fork. - final Hash parentHash = fork.getHeader().getParentHash(); - final Optional parent = - forkHeads.stream().filter(head -> head.equals(parentHash)).findAny(); // This block will replace its parent - parent.ifPresent(forkHeads::remove); + forkHeads.stream() + .filter(head -> head.equals(fork.getHeader().getParentHash())) + .findAny() + .ifPresent(forkHeads::remove); forkHeads.add(fork.getHash()); @@ -320,58 +324,59 @@ public class DefaultBlockchain implements MutableBlockchain { } private BlockAddedEvent handleChainReorg( - final BlockchainStorage.Updater updater, final Block newChainHead) { - final Hash oldChainHead = blockchainStorage.getChainHead().get(); - BlockHeader oldChain = blockchainStorage.getBlockHeader(oldChainHead).get(); - BlockHeader newChain = newChainHead.getHeader(); + final BlockchainStorage.Updater updater, final BlockWithReceipts newChainHeadWithReceipts) { + BlockWithReceipts oldChainWithReceipts = getBlockWithReceipts(chainHeader).get(); + BlockWithReceipts currentOldChainWithReceipts = oldChainWithReceipts; + BlockWithReceipts currentNewChainWithReceipts = newChainHeadWithReceipts; // Update chain head - updater.setChainHead(newChain.getHash()); + updater.setChainHead(currentNewChainWithReceipts.getHeader().getHash()); - // Track transactions to be added and removed + // Track transactions and logs to be added and removed final Map> newTransactions = new HashMap<>(); final List removedTransactions = new ArrayList<>(); + final List addedLogsWithMetadata = new ArrayList<>(); + final List removedLogsWithMetadata = new ArrayList<>(); - while (newChain.getNumber() > oldChain.getNumber()) { + while (currentNewChainWithReceipts.getNumber() > currentOldChainWithReceipts.getNumber()) { // If new chain is longer than old chain, walk back until we meet the old chain by number // adding indexing for new chain along the way. - final Hash blockHash = newChain.getHash(); - updater.putBlockHash(newChain.getNumber(), blockHash); - final List newTxs = - blockHash.equals(newChainHead.getHash()) - ? newChainHead.getBody().getTransactions() - : blockchainStorage.getBlockBody(blockHash).get().getTransactions(); - newTransactions.put(blockHash, newTxs); - - newChain = blockchainStorage.getBlockHeader(newChain.getParentHash()).get(); + final Hash blockHash = currentNewChainWithReceipts.getHash(); + updater.putBlockHash(currentNewChainWithReceipts.getNumber(), blockHash); + + newTransactions.put( + blockHash, currentNewChainWithReceipts.getBlock().getBody().getTransactions()); + addAddedLogsWithMetadata(addedLogsWithMetadata, currentNewChainWithReceipts); + + currentNewChainWithReceipts = getParentBlockWithReceipts(currentNewChainWithReceipts); } - while (oldChain.getNumber() > newChain.getNumber()) { + while (currentOldChainWithReceipts.getNumber() > currentNewChainWithReceipts.getNumber()) { // If oldChain is longer than new chain, walk back until we meet the new chain by number, // updating as we go. - updater.removeBlockHash(oldChain.getNumber()); + updater.removeBlockHash(currentOldChainWithReceipts.getNumber()); + removedTransactions.addAll( - blockchainStorage.getBlockBody(oldChain.getHash()).get().getTransactions()); + currentOldChainWithReceipts.getBlock().getBody().getTransactions()); + addRemovedLogsWithMetadata(removedLogsWithMetadata, currentOldChainWithReceipts); - oldChain = blockchainStorage.getBlockHeader(oldChain.getParentHash()).get(); + currentOldChainWithReceipts = getParentBlockWithReceipts(currentOldChainWithReceipts); } - while (!oldChain.getHash().equals(newChain.getHash())) { + while (!currentOldChainWithReceipts.getHash().equals(currentNewChainWithReceipts.getHash())) { // Walk back until we meet the common ancestor between the two chains, updating as we go. - final Hash newBlockHash = newChain.getHash(); - updater.putBlockHash(newChain.getNumber(), newBlockHash); - - // Collect transaction to be updated - final List newTxs = - newBlockHash.equals(newChainHead.getHash()) - ? newChainHead.getBody().getTransactions() - : blockchainStorage.getBlockBody(newBlockHash).get().getTransactions(); - newTransactions.put(newBlockHash, newTxs); + final Hash newBlockHash = currentNewChainWithReceipts.getHash(); + updater.putBlockHash(currentNewChainWithReceipts.getNumber(), newBlockHash); + + newTransactions.put( + newBlockHash, currentNewChainWithReceipts.getBlock().getBody().getTransactions()); removedTransactions.addAll( - blockchainStorage.getBlockBody(oldChain.getHash()).get().getTransactions()); + currentOldChainWithReceipts.getBlock().getBody().getTransactions()); + addAddedLogsWithMetadata(addedLogsWithMetadata, currentNewChainWithReceipts); + addRemovedLogsWithMetadata(removedLogsWithMetadata, currentOldChainWithReceipts); - newChain = blockchainStorage.getBlockHeader(newChain.getParentHash()).get(); - oldChain = blockchainStorage.getBlockHeader(oldChain.getParentHash()).get(); + currentNewChainWithReceipts = getParentBlockWithReceipts(currentNewChainWithReceipts); + currentOldChainWithReceipts = getParentBlockWithReceipts(currentOldChainWithReceipts); } // Update indexed transactions @@ -386,16 +391,20 @@ public class DefaultBlockchain implements MutableBlockchain { // Update tracked forks final Collection forks = blockchainStorage.getForkHeads(); // Old head is now a fork - forks.add(oldChainHead); + forks.add(oldChainWithReceipts.getHash()); // Remove new chain head's parent if it was tracked as a fork final Optional parentFork = - forks.stream().filter(f -> f.equals(newChainHead.getHeader().getParentHash())).findAny(); + forks.stream() + .filter(f -> f.equals(newChainHeadWithReceipts.getHeader().getParentHash())) + .findAny(); parentFork.ifPresent(forks::remove); updater.setForkHeads(forks); return BlockAddedEvent.createForChainReorg( - newChainHead, + newChainHeadWithReceipts.getBlock(), newTransactions.values().stream().flatMap(Collection::stream).collect(toList()), - removedTransactions); + removedTransactions, + Stream.concat(removedLogsWithMetadata.stream(), addedLogsWithMetadata.stream()) + .collect(Collectors.toUnmodifiableList())); } @Override @@ -407,12 +416,11 @@ public class DefaultBlockchain implements MutableBlockchain { final BlockchainStorage.Updater updater = blockchainStorage.updater(); try { - final Optional oldBlockHeader = - blockchainStorage.getBlockHeader(blockHash.get()); - final Optional oldBlockBody = blockchainStorage.getBlockBody(blockHash.get()); - final Block block = new Block(oldBlockHeader.get(), oldBlockBody.get()); + final BlockHeader oldBlockHeader = blockchainStorage.getBlockHeader(blockHash.get()).get(); + final BlockWithReceipts blockWithReceipts = getBlockWithReceipts(oldBlockHeader).get(); + final Block block = blockWithReceipts.getBlock(); - handleChainReorg(updater, block); + handleChainReorg(updater, blockWithReceipts); updater.commit(); updateCacheForNewCanonicalHead(block, calculateTotalDifficulty(block)); @@ -458,7 +466,7 @@ public class DefaultBlockchain implements MutableBlockchain { genesisBlock.getHeader().getNumber() == BlockHeader.GENESIS_BLOCK_NUMBER, "Invalid genesis block."); final Optional maybeHead = blockchainStorage.getChainHead(); - if (!maybeHead.isPresent()) { + if (maybeHead.isEmpty()) { // Initialize blockchain store with genesis block. final BlockchainStorage.Updater updater = blockchainStorage.updater(); final Hash hash = genesisBlock.getHash(); @@ -472,7 +480,7 @@ public class DefaultBlockchain implements MutableBlockchain { } else { // Verify genesis block is consistent with stored blockchain. final Optional genesisHash = getBlockHashByNumber(BlockHeader.GENESIS_BLOCK_NUMBER); - if (!genesisHash.isPresent()) { + if (genesisHash.isEmpty()) { throw new IllegalStateException("Blockchain is missing genesis block data."); } if (!genesisHash.get().equals(genesisBlock.getHash())) { @@ -497,6 +505,40 @@ public class DefaultBlockchain implements MutableBlockchain { return blockchainStorage.getBlockHeader(block.getHeader().getParentHash()).isPresent(); } + private void addAddedLogsWithMetadata( + final List logsWithMetadata, final BlockWithReceipts blockWithReceipts) { + logsWithMetadata.addAll( + 0, + LogWithMetadata.generate( + blockWithReceipts.getBlock(), blockWithReceipts.getReceipts(), false)); + } + + private void addRemovedLogsWithMetadata( + final List logsWithMetadata, final BlockWithReceipts blockWithReceipts) { + logsWithMetadata.addAll( + Lists.reverse( + LogWithMetadata.generate( + blockWithReceipts.getBlock(), blockWithReceipts.getReceipts(), true))); + } + + private Optional getBlockWithReceipts(final BlockHeader blockHeader) { + return blockchainStorage + .getBlockBody(blockHeader.getHash()) + .map(body -> new Block(blockHeader, body)) + .flatMap( + block -> + blockchainStorage + .getTransactionReceipts(blockHeader.getHash()) + .map(receipts -> new BlockWithReceipts(block, receipts))); + } + + private BlockWithReceipts getParentBlockWithReceipts(final BlockWithReceipts blockWithReceipts) { + return blockchainStorage + .getBlockHeader(blockWithReceipts.getHeader().getParentHash()) + .flatMap(this::getBlockWithReceipts) + .get(); + } + @Override public long observeBlockAdded(final BlockAddedObserver observer) { checkNotNull(observer); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/BlockWithReceipts.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/BlockWithReceipts.java similarity index 82% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/BlockWithReceipts.java rename to ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/BlockWithReceipts.java index 6b96a8dd25..ae642b065a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/BlockWithReceipts.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/BlockWithReceipts.java @@ -12,23 +12,18 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.fastsync; - -import org.hyperledger.besu.ethereum.core.Block; -import org.hyperledger.besu.ethereum.core.BlockHeader; -import org.hyperledger.besu.ethereum.core.Hash; -import org.hyperledger.besu.ethereum.core.TransactionReceipt; +package org.hyperledger.besu.ethereum.core; import java.util.List; import java.util.Objects; import com.google.common.base.MoreObjects; -class BlockWithReceipts { +public class BlockWithReceipts { private final Block block; private final List receipts; - BlockWithReceipts(final Block block, final List receipts) { + public BlockWithReceipts(final Block block, final List receipts) { this.block = block; this.receipts = receipts; } diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/LogWithMetadata.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogWithMetadata.java similarity index 63% rename from ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/LogWithMetadata.java rename to ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogWithMetadata.java index be4bd2e5c9..79a616f22a 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/LogWithMetadata.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogWithMetadata.java @@ -14,18 +14,16 @@ * SPDX-License-Identifier: Apache-2.0 * */ -package org.hyperledger.besu.ethereum.api.query; +package org.hyperledger.besu.ethereum.core; -import org.hyperledger.besu.ethereum.core.Address; -import org.hyperledger.besu.ethereum.core.Hash; -import org.hyperledger.besu.ethereum.core.LogTopic; import org.hyperledger.besu.util.bytes.BytesValue; +import java.util.ArrayList; import java.util.List; import com.google.common.base.MoreObjects; -public class LogWithMetadata { +public class LogWithMetadata extends Log { private final int logIndex; private final long blockNumber; @@ -47,6 +45,7 @@ public class LogWithMetadata { final BytesValue data, final List topics, final boolean removed) { + super(address, data, topics); this.logIndex = logIndex; this.blockNumber = blockNumber; this.blockHash = blockHash; @@ -58,6 +57,48 @@ public class LogWithMetadata { this.removed = removed; } + public static List generate( + final TransactionReceipt receipt, + final long number, + final Hash blockHash, + final Hash transactionHash, + final int transactionIndex, + final boolean removed) { + + final List logs = new ArrayList<>(); + for (int logIndex = 0; logIndex < receipt.getLogs().size(); ++logIndex) { + logs.add( + new LogWithMetadata( + logIndex, + number, + blockHash, + transactionHash, + transactionIndex, + receipt.getLogs().get(logIndex).getLogger(), + receipt.getLogs().get(logIndex).getData(), + receipt.getLogs().get(logIndex).getTopics(), + removed)); + } + + return logs; + } + + public static List generate( + final Block block, final List receipts, final boolean removed) { + final List logsWithMetadata = new ArrayList<>(); + for (int txi = 0; txi < receipts.size(); ++txi) { + logsWithMetadata.addAll( + generate( + receipts.get(txi), + block.getHeader().getNumber(), + block.getHash(), + block.getBody().getTransactions().get(txi).getHash(), + txi, + removed)); + } + return logsWithMetadata; + } + // The index of this log within the entire ordered list of logs associated with the block this log // belongs to. public int getLogIndex() { @@ -80,14 +121,17 @@ public class LogWithMetadata { return transactionIndex; } - public Address getAddress() { + @Override + public Address getLogger() { return address; } + @Override public BytesValue getData() { return data; } + @Override public List getTopics() { return topics; } diff --git a/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java b/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java index a15d0fe748..f58a4b38df 100644 --- a/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java +++ b/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java @@ -35,13 +35,16 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.OptionalLong; import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.bouncycastle.jcajce.provider.asymmetric.ec.BCECPrivateKey; import org.bouncycastle.jcajce.provider.asymmetric.ec.BCECPublicKey; @@ -352,6 +355,10 @@ public class BlockDataGenerator { return receipt(positiveLong()); } + public TransactionReceipt receipt(final List logs) { + return new TransactionReceipt(hash(), positiveLong(), logs, Optional.empty()); + } + public UInt256 storageKey() { return uint256(); } @@ -368,8 +375,22 @@ public class BlockDataGenerator { return receipts; } + public List logs(final int logsCount, final int topicsPerLog) { + return Stream.generate(() -> log(topicsPerLog)).limit(logsCount).collect(Collectors.toList()); + } + public Log log() { - return new Log(address(), bytesValue(5 + random.nextInt(10)), Collections.emptyList()); + return log(0); + } + + public Log log(final int topicCount) { + final List topics = + Stream.generate(this::logTopic).limit(topicCount).collect(Collectors.toList()); + return new Log(address(), bytesValue(5, 15), topics); + } + + private LogTopic logTopic() { + return LogTopic.create(bytesValue(LogTopic.SIZE)); } private Bytes32 bytes32() { @@ -380,6 +401,18 @@ public class BlockDataGenerator { return BytesValue.wrap(bytes(size)); } + public BytesValue bytesValue() { + return bytesValue(1, 20); + } + + public BytesValue bytesValue(final int minSize, final int maxSize) { + checkArgument(minSize >= 0); + checkArgument(maxSize >= 0); + checkArgument(maxSize > minSize); + final int size = random.nextInt(maxSize - minSize) + minSize; + return BytesValue.wrap(bytes(size)); + } + /** * Creates a UInt256 with a value that fits within maxByteSize * @@ -412,18 +445,6 @@ public class BlockDataGenerator { return new LogsBloomFilter(BytesValue.of(bytes(LogsBloomFilter.BYTE_SIZE))); } - public BytesValue bytesValue() { - return bytesValue(1, 20); - } - - public BytesValue bytesValue(final int minSize, final int maxSize) { - checkArgument(minSize >= 0); - checkArgument(maxSize >= 0); - checkArgument(maxSize > minSize); - final int size = random.nextInt(maxSize - minSize) + minSize; - return BytesValue.wrap(bytes(size)); - } - private byte[] bytes(final int size) { return bytes(size, 0); } @@ -466,7 +487,7 @@ public class BlockDataGenerator { private Optional parentHash = Optional.empty(); private Optional stateRoot = Optional.empty(); private Optional difficulty = Optional.empty(); - private Optional> transactions = Optional.empty(); + private List transactions = new ArrayList<>(); private Optional extraData = Optional.empty(); private Optional blockHeaderFunctions = Optional.empty(); @@ -475,7 +496,7 @@ public class BlockDataGenerator { } public List getTransactions(final List defaultValue) { - return transactions.orElse(defaultValue); + return transactions.isEmpty() ? defaultValue : transactions; } public long getBlockNumber(final long defaultValue) { @@ -503,13 +524,14 @@ public class BlockDataGenerator { } public BlockOptions addTransaction(final Transaction... tx) { - if (!transactions.isPresent()) { - transactions = Optional.of(new ArrayList<>()); - } - transactions.get().addAll(Arrays.asList(tx)); + transactions.addAll(Arrays.asList(tx)); return this; } + public BlockOptions addTransaction(final Collection txs) { + return addTransaction(txs.toArray(new Transaction[] {})); + } + public BlockOptions setBlockNumber(final long blockNumber) { this.blockNumber = OptionalLong.of(blockNumber); return this; diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchainTest.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchainTest.java index 5125986897..7ec4c028fb 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchainTest.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchainTest.java @@ -21,6 +21,7 @@ import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; @@ -38,6 +39,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import com.google.common.collect.Lists; import org.junit.Test; public class DefaultBlockchainTest { @@ -156,9 +158,16 @@ public class DefaultBlockchainTest { final BlockDataGenerator.BlockOptions options = new BlockDataGenerator.BlockOptions() .setBlockNumber(1L) + .addTransaction(gen.transactions(5)) .setParentHash(genesisBlock.getHash()); final Block newBlock = gen.block(options); final List receipts = gen.receipts(newBlock); + blockchain.observeBlockAdded( + ((event, blockchain1) -> + assertThat(event.getLogsWithMetadata()) + .containsExactly( + LogWithMetadata.generate(newBlock, receipts, false) + .toArray(new LogWithMetadata[] {})))); blockchain.appendBlock(newBlock, receipts); assertBlockIsHead(blockchain, newBlock); @@ -226,15 +235,25 @@ public class DefaultBlockchainTest { public void appendBlockWithReorgToChainAtEqualHeight() { final BlockDataGenerator gen = new BlockDataGenerator(1); - // Setup an initial blockchain + // Setup final int chainLength = 3; final List chain = gen.blockSequence(chainLength); final List> blockReceipts = chain.stream().map(gen::receipts).collect(Collectors.toList()); final KeyValueStorage kvStore = new InMemoryKeyValueStorage(); final DefaultBlockchain blockchain = createMutableBlockchain(kvStore, chain.get(0)); + + // Listen to block events and add the Logs here + List logsWithMetadata = new ArrayList<>(); + blockchain.observeBlockAdded( + (event, __) -> logsWithMetadata.addAll(event.getLogsWithMetadata())); + List expectedLogsWithMetadata = new ArrayList<>(); + + // Add initial blocks for (int i = 1; i < chain.size(); i++) { blockchain.appendBlock(chain.get(i), blockReceipts.get(i)); + expectedLogsWithMetadata.addAll( + LogWithMetadata.generate(chain.get(i), blockReceipts.get(i), false)); } assertThat(blockchain.getForks()).isEmpty(); final Block originalHead = chain.get(chainLength - 1); @@ -267,6 +286,15 @@ public class DefaultBlockchainTest { assertThat(blockchain.getTransactionByHash(tx.getHash())).isNotPresent(); } + // LogWithMetadata reflecting removal of originalHead's logs + final List removedLogs = + Lists.reverse( + LogWithMetadata.generate( + originalHead, blockchain.getTxReceipts(originalHead.getHash()).get(), true)); + expectedLogsWithMetadata.addAll(removedLogs); + // LogWithMetadata reflecting addition of originalHead's logs + expectedLogsWithMetadata.addAll(LogWithMetadata.generate(fork, forkReceipts, false)); + assertBlockIsHead(blockchain, fork); assertTotalDifficultiesAreConsistent(blockchain, fork); // Old chain head should now be tracked as a fork. @@ -277,6 +305,8 @@ public class DefaultBlockchainTest { for (int i = commonAncestor + 1; i < chainLength; i++) { assertThat(blockchain.blockIsOnCanonicalChain(chain.get(i).getHash())).isFalse(); } + assertThat(logsWithMetadata) + .containsExactly(expectedLogsWithMetadata.toArray(new LogWithMetadata[] {})); } @Test @@ -290,8 +320,15 @@ public class DefaultBlockchainTest { chain.stream().map(gen::receipts).collect(Collectors.toList()); final KeyValueStorage kvStore = new InMemoryKeyValueStorage(); final DefaultBlockchain blockchain = createMutableBlockchain(kvStore, chain.get(0)); + // Listen to block events and add the Logs here + List logsWithMetadata = new ArrayList<>(); + blockchain.observeBlockAdded( + (event, __) -> logsWithMetadata.addAll(event.getLogsWithMetadata())); + List expectedLogsWithMetadata = new ArrayList<>(); for (int i = 1; i < chain.size(); i++) { blockchain.appendBlock(chain.get(i), blockReceipts.get(i)); + expectedLogsWithMetadata.addAll( + LogWithMetadata.generate(chain.get(i), blockReceipts.get(i), false)); } final Block originalHead = chain.get(originalChainLength - 1); @@ -364,6 +401,19 @@ public class DefaultBlockchainTest { for (final Transaction tx : removedTransactions) { assertThat(blockchain.getTransactionByHash(tx.getHash())).isNotPresent(); } + // LogWithMetadata reflecting removal of logs + for (int i = originalChainLength - 1; i >= forkStart; i--) { + final Block currentBlock = chain.get(i); + expectedLogsWithMetadata.addAll( + Lists.reverse( + LogWithMetadata.generate( + currentBlock, blockchain.getTxReceipts(currentBlock.getHash()).get(), true))); + } + // LogWithMetadata reflecting addition of logs + for (int i = 0; i < forkBlocks.size(); i++) { + expectedLogsWithMetadata.addAll( + LogWithMetadata.generate(forkBlocks.get(i), forkReceipts.get(i), false)); + } // Check that blockNumber index for previous chain head has been removed assertThat(blockchain.getBlockHashByNumber(originalChainLength - 1)).isNotPresent(); @@ -375,6 +425,8 @@ public class DefaultBlockchainTest { for (int i = commonAncestor + 1; i < originalChainLength; i++) { assertThat(blockchain.blockIsOnCanonicalChain(chain.get(i).getHash())).isFalse(); } + assertThat(logsWithMetadata) + .containsExactly(expectedLogsWithMetadata.toArray(new LogWithMetadata[] {})); } @Test @@ -388,8 +440,15 @@ public class DefaultBlockchainTest { chain.stream().map(gen::receipts).collect(Collectors.toList()); final KeyValueStorage kvStore = new InMemoryKeyValueStorage(); final DefaultBlockchain blockchain = createMutableBlockchain(kvStore, chain.get(0)); + // Listen to block events and add the Logs here + List logsWithMetadata = new ArrayList<>(); + blockchain.observeBlockAdded( + (event, __) -> logsWithMetadata.addAll(event.getLogsWithMetadata())); + List expectedLogsWithMetadata = new ArrayList<>(); for (int i = 1; i < chain.size(); i++) { blockchain.appendBlock(chain.get(i), blockReceipts.get(i)); + expectedLogsWithMetadata.addAll( + LogWithMetadata.generate(chain.get(i), blockReceipts.get(i), false)); } final Block originalHead = chain.get(originalChainLength - 1); @@ -456,6 +515,19 @@ public class DefaultBlockchainTest { for (final Transaction tx : removedTransactions) { assertThat(blockchain.getTransactionByHash(tx.getHash())).isNotPresent(); } + // LogWithMetadata reflecting removal of logs + for (int i = originalChainLength - 1; i >= forkStart; i--) { + final Block currentBlock = chain.get(i); + expectedLogsWithMetadata.addAll( + Lists.reverse( + LogWithMetadata.generate( + currentBlock, blockchain.getTxReceipts(currentBlock.getHash()).get(), true))); + } + // LogWithMetadata reflecting addition of logs + for (int i = 0; i < forkBlocks.size(); i++) { + expectedLogsWithMetadata.addAll( + LogWithMetadata.generate(forkBlocks.get(i), forkReceipts.get(i), false)); + } // Old chain head should now be tracked as a fork. forks = blockchain.getForks(); assertThat(forks.size()).isEqualTo(1); @@ -464,6 +536,8 @@ public class DefaultBlockchainTest { for (int i = commonAncestor + 1; i < originalChainLength; i++) { assertThat(blockchain.blockIsOnCanonicalChain(chain.get(i).getHash())).isFalse(); } + assertThat(logsWithMetadata) + .containsExactly(expectedLogsWithMetadata.toArray(new LogWithMetadata[] {})); } @Test @@ -477,8 +551,15 @@ public class DefaultBlockchainTest { chain.stream().map(gen::receipts).collect(Collectors.toList()); final KeyValueStorage kvStore = new InMemoryKeyValueStorage(); final DefaultBlockchain blockchain = createMutableBlockchain(kvStore, chain.get(0)); + // Listen to block events and add the Logs here + List logsWithMetadata = new ArrayList<>(); + blockchain.observeBlockAdded( + (event, __) -> logsWithMetadata.addAll(event.getLogsWithMetadata())); + List expectedLogsWithMetadata = new ArrayList<>(); for (int i = 1; i < chain.size(); i++) { blockchain.appendBlock(chain.get(i), blockReceipts.get(i)); + expectedLogsWithMetadata.addAll( + LogWithMetadata.generate(chain.get(i), blockReceipts.get(i), false)); } final Transaction overlappingTx = chain.get(chainLength - 1).getBody().getTransactions().get(0); @@ -517,6 +598,18 @@ public class DefaultBlockchainTest { assertThat(actualTransaction).isNotPresent(); } } + // LogWithMetadata reflecting removal of logs + for (int i = chainLength - 1; i >= forkBlock; i--) { + final Block currentBlock = chain.get(i); + expectedLogsWithMetadata.addAll( + Lists.reverse( + LogWithMetadata.generate( + currentBlock, blockchain.getTxReceipts(currentBlock.getHash()).get(), true))); + } + // LogWithMetadata reflecting addition of logs + expectedLogsWithMetadata.addAll(LogWithMetadata.generate(fork, forkReceipts, false)); + assertThat(logsWithMetadata) + .containsExactly(expectedLogsWithMetadata.toArray(new LogWithMetadata[] {})); } @Test @@ -569,8 +662,15 @@ public class DefaultBlockchainTest { chain.stream().map(gen::receipts).collect(Collectors.toList()); final KeyValueStorage kvStore = new InMemoryKeyValueStorage(); final DefaultBlockchain blockchain = createMutableBlockchain(kvStore, chain.get(0)); + // Listen to block events and add the Logs here + List logsWithMetadata = new ArrayList<>(); + blockchain.observeBlockAdded( + (event, __) -> logsWithMetadata.addAll(event.getLogsWithMetadata())); + List expectedLogsWithMetadata = new ArrayList<>(); for (int i = 1; i < chain.size(); i++) { blockchain.appendBlock(chain.get(i), blockReceipts.get(i)); + expectedLogsWithMetadata.addAll( + LogWithMetadata.generate(chain.get(i), blockReceipts.get(i), false)); } final Block originalHead = chain.get(originalChainLength - 1); @@ -638,6 +738,9 @@ public class DefaultBlockchainTest { // Head should not have changed assertBlockIsHead(blockchain, originalHead); + // We should only have the log events from when we initially created the chain. None from forks. + assertThat(logsWithMetadata) + .containsExactly(expectedLogsWithMetadata.toArray(new LogWithMetadata[] {})); } @Test diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java index 996dcdcb91..cd57de371d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java @@ -19,6 +19,7 @@ import static java.util.stream.Collectors.toList; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStep.java index e2559b3c17..5179bd788c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStep.java @@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.BlockImporter; +import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.eth.sync.ValidationPolicy; import org.hyperledger.besu.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/TrailingPeerLimiterTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/TrailingPeerLimiterTest.java index 9ec9369b8b..ddd136c2f7 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/TrailingPeerLimiterTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/TrailingPeerLimiterTest.java @@ -35,6 +35,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.Di import org.hyperledger.besu.util.uint.UInt256; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.junit.Before; @@ -116,7 +117,8 @@ public class TrailingPeerLimiterTest { BlockAddedEvent.createForHeadAdvancement( new Block( new BlockHeaderTestFixture().number(500).buildHeader(), - new BlockBody(emptyList(), emptyList()))); + new BlockBody(emptyList(), emptyList())), + Collections.emptyList()); trailingPeerLimiter.onBlockAdded(blockAddedEvent, blockchain); assertDisconnections(ethPeer1); @@ -132,7 +134,8 @@ public class TrailingPeerLimiterTest { BlockAddedEvent.createForHeadAdvancement( new Block( new BlockHeaderTestFixture().number(599).buildHeader(), - new BlockBody(emptyList(), emptyList()))); + new BlockBody(emptyList(), emptyList())), + Collections.emptyList()); trailingPeerLimiter.onBlockAdded(blockAddedEvent, blockchain); assertDisconnections(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java index 7cf995155b..8740413e31 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java @@ -21,6 +21,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStepTest.java index f5cc2bf533..c054aaf3e4 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStepTest.java @@ -28,6 +28,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.BlockImporter; +import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.eth.sync.ValidationPolicy; import org.hyperledger.besu.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;