add possibility to use the new peer task system when downloading the bodies

Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
pull/7806/head
stefan.pingel@consensys.net 1 month ago
parent 16bcb1a5b7
commit dc1f3bdf9f
  1. 154
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTask.java
  2. 1
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  3. 26
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java
  4. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java
  5. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java
  6. 10
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java
  7. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java
  8. 133
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTask.java
  9. 199
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTaskTest.java
  10. 5
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java
  11. 5
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java
  12. 5
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java
  13. 1
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java
  14. 251
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTaskTest.java

@ -0,0 +1,154 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidation;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GetBodiesFromPeerTask implements PeerTask<List<Block>> {
private static final Logger LOG = LoggerFactory.getLogger(GetBodiesFromPeerTask.class);
private final List<BlockHeader> blockHeaders;
private final Supplier<ProtocolSpec> currentProtocolSpecSupplier;
private final ProtocolSchedule protocolSchedule;
private final long requiredBlockchainHeight;
private final List<Block> blocks = new ArrayList<>();
public GetBodiesFromPeerTask(
final List<BlockHeader> blockHeaders,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolSchedule protocolSchedule) {
this.blockHeaders = blockHeaders;
this.currentProtocolSpecSupplier = currentProtocolSpecSupplier;
this.protocolSchedule = protocolSchedule;
this.requiredBlockchainHeight =
blockHeaders.stream()
.mapToLong(BlockHeader::getNumber)
.max()
.orElse(BlockHeader.GENESIS_BLOCK_NUMBER);
}
@Override
public SubProtocol getSubProtocol() {
return EthProtocol.get();
}
@Override
public MessageData getRequestMessage() {
return GetBlockBodiesMessage.create(
blockHeaders.stream().map(BlockHeader::getBlockHash).toList());
}
@Override
public List<Block> parseResponse(final MessageData messageData)
throws InvalidPeerTaskResponseException {
// Blocks returned by this method are in the same order as the headers, but might not be
// complete
if (messageData == null) {
throw new InvalidPeerTaskResponseException();
}
final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(messageData);
final List<BlockBody> blockBodies = blocksMessage.bodies(protocolSchedule);
if (blockBodies.isEmpty() || blockBodies.size() > blockHeaders.size()) {
throw new InvalidPeerTaskResponseException();
}
for (int i = 0; i < blockBodies.size(); i++) {
final BlockBody blockBody = blockBodies.get(i);
final BlockHeader blockHeader = blockHeaders.get(i);
if (!blockBodyMatchesBlockHeader(blockBody, blockHeader)) {
LOG.atDebug().setMessage("Received block body does not match block header").log();
throw new InvalidPeerTaskResponseException();
}
blocks.add(new Block(blockHeader, blockBody));
}
return blocks;
}
private boolean blockBodyMatchesBlockHeader(
final BlockBody blockBody, final BlockHeader blockHeader) {
// this method validates that the block body matches the block header by calculating the roots
// of the block body
// and comparing them to the roots in the block header
if (!BodyValidation.transactionsRoot(blockBody.getTransactions())
.equals(blockHeader.getTransactionsRoot())) {
return false;
}
if (!BodyValidation.ommersHash(blockBody.getOmmers()).equals(blockHeader.getOmmersHash())) {
return false;
}
if (blockBody.getWithdrawals().isPresent()) {
if (blockHeader.getWithdrawalsRoot().isEmpty()) {
return false;
}
if (!BodyValidation.withdrawalsRoot(blockBody.getWithdrawals().get())
.equals(blockHeader.getWithdrawalsRoot().get())) {
return false;
}
} else if (blockHeader.getWithdrawalsRoot().isPresent()) {
return false;
}
if (blockBody.getRequests().isPresent()) {
if (blockHeader.getRequestsRoot().isEmpty()) {
return false;
}
if (!BodyValidation.requestsRoot(blockBody.getRequests().get())
.equals(blockHeader.getRequestsRoot().get())) {
return false;
}
} else if (blockHeader.getRequestsRoot().isPresent()) {
return false;
}
return true;
}
@Override
public Predicate<EthPeer> getPeerRequirementFilter() {
return (ethPeer) ->
ethPeer.getProtocolName().equals(getSubProtocol().getName())
&& (currentProtocolSpecSupplier.get().isPoS()
|| ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight);
}
@Override
public boolean isSuccess(final List<Block> result) {
return !result.isEmpty();
}
}

@ -138,6 +138,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
syncState,
metricsSystem,
terminationCondition,
peerTaskExecutor,
syncDurationMetrics));
if (SyncMode.FAST.equals(syncConfig.getSyncMode())) {

@ -17,7 +17,9 @@ package org.hyperledger.besu.ethereum.eth.sync;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksTask;
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksWithPeerTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -31,19 +33,41 @@ public class DownloadBodiesStep
private final ProtocolSchedule protocolSchedule;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final SynchronizerConfiguration synchronizerConfiguration;
private final PeerTaskExecutor peerTaskExecutor;
public DownloadBodiesStep(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SynchronizerConfiguration synchronizerConfiguration,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
this.synchronizerConfiguration = synchronizerConfiguration;
this.metricsSystem = metricsSystem;
}
@Override
public CompletableFuture<List<Block>> apply(final List<BlockHeader> blockHeaders) {
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, blockHeaders, metricsSystem)
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
return ethContext
.getScheduler()
.scheduleServiceTask(() -> getBodiesWithPeerTaskSystem(blockHeaders));
} else {
return CompleteBlocksTask.forHeaders(
protocolSchedule, ethContext, blockHeaders, metricsSystem)
.run();
}
}
private CompletableFuture<List<Block>> getBodiesWithPeerTaskSystem(
final List<BlockHeader> headers) {
final CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
new CompleteBlocksWithPeerTask(protocolSchedule, headers, peerTaskExecutor);
final List<Block> blocks = completeBlocksWithPeerTask.getBlocks();
return CompletableFuture.completedFuture(blocks);
}
}

@ -152,7 +152,8 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
final RangeHeadersValidationStep validateHeadersJoinUpStep =
new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy);
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
new DownloadBodiesStep(
protocolSchedule, ethContext, peerTaskExecutor, syncConfig, metricsSystem);
final DownloadReceiptsStep downloadReceiptsStep =
new DownloadReceiptsStep(
currentProtocolSpecSupplier, ethContext, peerTaskExecutor, syncConfig, metricsSystem);

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fullsync;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -35,7 +36,8 @@ public class FullSyncChainDownloader {
final SyncState syncState,
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition,
final SyncDurationMetrics syncDurationMetrics) {
final SyncDurationMetrics syncDurationMetrics,
final PeerTaskExecutor peerTaskExecutor) {
final FullSyncTargetManager syncTargetManager =
new FullSyncTargetManager(
@ -54,6 +56,7 @@ public class FullSyncChainDownloader {
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
metricsSystem,
terminationCondition),
ethContext.getScheduler(),

@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.DownloadBodiesStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadHeadersStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadPipelineFactory;
@ -53,21 +54,25 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory
() -> HeaderValidationMode.DETACHED_ONLY;
private final BetterSyncTargetEvaluator betterSyncTargetEvaluator;
private final SyncTerminationCondition fullSyncTerminationCondition;
private final PeerTaskExecutor peerTaskExecutor;
public FullSyncDownloadPipelineFactory(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final MetricsSystem metricsSystem,
final SyncTerminationCondition syncTerminationCondition) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
this.metricsSystem = metricsSystem;
this.fullSyncTerminationCondition = syncTerminationCondition;
betterSyncTargetEvaluator = new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers());
this.betterSyncTargetEvaluator =
new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers());
}
@Override
@ -104,7 +109,8 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory
final RangeHeadersValidationStep validateHeadersJoinUpStep =
new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy);
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
new DownloadBodiesStep(
protocolSchedule, ethContext, peerTaskExecutor, syncConfig, metricsSystem);
final ExtractTxSignaturesStep extractTxSignaturesStep = new ExtractTxSignaturesStep();
final FullImportBlockStep importBlockStep =
new FullImportBlockStep(

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fullsync;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements;
@ -45,6 +46,7 @@ public class FullSyncDownloader {
final SyncState syncState,
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition,
final PeerTaskExecutor peerTaskExecutor,
final SyncDurationMetrics syncDurationMetrics) {
this.syncConfig = syncConfig;
this.protocolContext = protocolContext;
@ -59,7 +61,8 @@ public class FullSyncDownloader {
syncState,
metricsSystem,
terminationCondition,
syncDurationMetrics);
syncDurationMetrics,
peerTaskExecutor);
}
public CompletableFuture<Void> start() {

@ -0,0 +1,133 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.tasks;
import static com.google.common.base.Preconditions.checkArgument;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Given a set of headers, "completes" them by repeatedly requesting additional data (bodies) needed
* to create the blocks that correspond to the supplied headers.
*/
public class CompleteBlocksWithPeerTask {
private static final Logger LOG = LoggerFactory.getLogger(CompleteBlocksWithPeerTask.class);
private final ProtocolSchedule protocolSchedule;
private final List<BlockHeader> headersToGet = new ArrayList<>();
private final PeerTaskExecutor peerTaskExecutor;
private final Block[] result;
private final int resultSize;
private int nextIndex = 0;
private int remainingBlocks = 0;
public CompleteBlocksWithPeerTask(
final ProtocolSchedule protocolSchedule,
final List<BlockHeader> headers,
final PeerTaskExecutor peerTaskExecutor) {
checkArgument(!headers.isEmpty(), "Must supply a non-empty headers list");
this.protocolSchedule = protocolSchedule;
this.peerTaskExecutor = peerTaskExecutor;
resultSize = headers.size();
result = new Block[resultSize];
remainingBlocks = resultSize;
for (int i = 0; i < resultSize; i++) {
final BlockHeader header = headers.get(i);
if (BlockHeader.hasEmptyBlock(header)) {
final Block emptyBlock =
new Block(header, createEmptyBodyBasedOnProtocolSchedule(protocolSchedule, header));
result[i] = emptyBlock;
remainingBlocks--;
} else {
headersToGet.add(header);
}
}
this.nextIndex = findNextIndex(0);
}
private BlockBody createEmptyBodyBasedOnProtocolSchedule(
final ProtocolSchedule protocolSchedule, final BlockHeader header) {
return new BlockBody(
Collections.emptyList(),
Collections.emptyList(),
isWithdrawalsEnabled(protocolSchedule, header)
? Optional.of(Collections.emptyList())
: Optional.empty(),
Optional.empty());
}
private boolean isWithdrawalsEnabled(
final ProtocolSchedule protocolSchedule, final BlockHeader header) {
return protocolSchedule.getByBlockHeader(header).getWithdrawalsProcessor().isPresent();
}
public List<Block> getBlocks() {
while (remainingBlocks > 0) {
LOG.atDebug()
.setMessage("Requesting {} bodies from peer")
.addArgument(headersToGet.size())
.log();
final GetBodiesFromPeerTask task =
new GetBodiesFromPeerTask(
headersToGet,
() -> protocolSchedule.getByBlockHeader(headersToGet.getLast()),
protocolSchedule);
final PeerTaskExecutorResult<List<Block>> executionResult = peerTaskExecutor.execute(task);
if (executionResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS
&& executionResult.result().isPresent()) {
final List<Block> blockList = executionResult.result().get();
LOG.atDebug()
.setMessage("Received {} bodies out of {} from peer")
.addArgument(blockList.size())
.addArgument(headersToGet.size())
.log();
blockList.forEach(
block -> {
remainingBlocks--;
result[nextIndex] = block;
nextIndex = findNextIndex(nextIndex + 1);
});
}
}
return List.of(result);
}
private int findNextIndex(final int startIndex) {
for (int i = startIndex; i < resultSize; i++) {
if (result[i] == null) {
return i;
}
}
return -1; // This only happens when we have finished processing all headers
}
}

@ -0,0 +1,199 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.encoding.EncodingContext;
import org.hyperledger.besu.ethereum.core.encoding.TransactionDecoder;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.ChainState;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidation;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.tuweni.bytes.Bytes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class GetBodiesFromPeerTaskTest {
private static final String FRONTIER_TX_RLP =
"0xf901fc8032830138808080b901ae60056013565b6101918061001d6000396000f35b3360008190555056006001600060e060020a6000350480630a874df61461003a57806341c0e1b514610058578063a02b161e14610066578063dbbdf0831461007757005b610045600435610149565b80600160a060020a031660005260206000f35b610060610161565b60006000f35b6100716004356100d4565b60006000f35b61008560043560243561008b565b60006000f35b600054600160a060020a031632600160a060020a031614156100ac576100b1565b6100d0565b8060018360005260205260406000208190555081600060005260206000a15b5050565b600054600160a060020a031633600160a060020a031614158015610118575033600160a060020a0316600182600052602052604060002054600160a060020a031614155b61012157610126565b610146565b600060018260005260205260406000208190555080600060005260206000a15b50565b60006001826000526020526040600020549050919050565b600054600160a060020a031633600160a060020a0316146101815761018f565b600054600160a060020a0316ff5b561ca0c5689ed1ad124753d54576dfb4b571465a41900a1dff4058d8adf16f752013d0a01221cbd70ec28c94a3b55ec771bcbc70778d6ee0b51ca7ea9514594c861b1884";
private static final Transaction TX =
TransactionDecoder.decodeRLP(
new BytesValueRLPInput(Bytes.fromHexString(FRONTIER_TX_RLP), false),
EncodingContext.BLOCK_BODY);
public static final List<Transaction> TRANSACTION_LIST = List.of(TX);
public static final BlockBody BLOCK_BODY =
new BlockBody(TRANSACTION_LIST, Collections.emptyList(), Optional.empty(), Optional.empty());
@Test
public void testGetSubProtocol() {
GetBodiesFromPeerTask task = new GetBodiesFromPeerTask(Collections.emptyList(), null, null);
Assertions.assertEquals(EthProtocol.get(), task.getSubProtocol());
}
@Test
public void testGetRequestMessage() {
GetBodiesFromPeerTask task =
new GetBodiesFromPeerTask(
List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null, null);
MessageData messageData = task.getRequestMessage();
GetBlockBodiesMessage getBlockBodiesMessage = GetBlockBodiesMessage.readFrom(messageData);
Assertions.assertEquals(EthPV62.GET_BLOCK_BODIES, getBlockBodiesMessage.getCode());
Iterable<Hash> hashesInMessage = getBlockBodiesMessage.hashes();
List<Hash> expectedHashes =
List.of(
Hash.fromHexString(StringUtils.repeat("00", 31) + "11"),
Hash.fromHexString(StringUtils.repeat("00", 31) + "21"),
Hash.fromHexString(StringUtils.repeat("00", 31) + "31"));
List<Hash> actualHashes = new ArrayList<>();
hashesInMessage.forEach(actualHashes::add);
Assertions.assertEquals(3, actualHashes.size());
Assertions.assertEquals(
expectedHashes.stream().sorted().toList(), actualHashes.stream().sorted().toList());
}
@Test
public void testParseResponseWithNullResponseMessage() {
GetBodiesFromPeerTask task = new GetBodiesFromPeerTask(Collections.emptyList(), null, null);
Assertions.assertThrows(InvalidPeerTaskResponseException.class, () -> task.parseResponse(null));
}
@Test
public void testParseResponseForInvalidResponse() {
GetBodiesFromPeerTask task = new GetBodiesFromPeerTask(List.of(mockBlockHeader(1)), null, null);
// body does not match header
BlockBodiesMessage bodiesMessage = BlockBodiesMessage.create(List.of(BLOCK_BODY));
Assertions.assertThrows(
InvalidPeerTaskResponseException.class, () -> task.parseResponse(bodiesMessage));
}
@Test
public void testParseResponse() throws InvalidPeerTaskResponseException {
final BlockHeader nonEmptyBlockHeaderMock =
getNonEmptyBlockHeaderMock(BodyValidation.transactionsRoot(TRANSACTION_LIST).toString());
GetBodiesFromPeerTask task =
new GetBodiesFromPeerTask(List.of(nonEmptyBlockHeaderMock), null, null);
final BlockBodiesMessage blockBodiesMessage = BlockBodiesMessage.create(List.of(BLOCK_BODY));
List<Block> result = task.parseResponse(blockBodiesMessage);
assertThat(result.size()).isEqualTo(1);
assertThat(result.getFirst().getBody().getTransactions()).isEqualTo(TRANSACTION_LIST);
}
@Test
public void testGetPeerRequirementFilter() {
BlockHeader blockHeader1 = mockBlockHeader(1);
BlockHeader blockHeader2 = mockBlockHeader(2);
BlockHeader blockHeader3 = mockBlockHeader(3);
ProtocolSpec protocolSpec = Mockito.mock(ProtocolSpec.class);
Mockito.when(protocolSpec.isPoS()).thenReturn(false);
GetBodiesFromPeerTask task =
new GetBodiesFromPeerTask(
List.of(blockHeader1, blockHeader2, blockHeader3), () -> protocolSpec, null);
EthPeer failForIncorrectProtocol = mockPeer("incorrectProtocol", 5);
EthPeer failForShortChainHeight = mockPeer("incorrectProtocol", 1);
EthPeer successfulCandidate = mockPeer(EthProtocol.NAME, 5);
Assertions.assertFalse(task.getPeerRequirementFilter().test(failForIncorrectProtocol));
Assertions.assertFalse(task.getPeerRequirementFilter().test(failForShortChainHeight));
Assertions.assertTrue(task.getPeerRequirementFilter().test(successfulCandidate));
}
@Test
public void testIsSuccessForPartialSuccess() {
GetBodiesFromPeerTask task = new GetBodiesFromPeerTask(Collections.emptyList(), null, null);
Assertions.assertFalse(task.isSuccess(Collections.emptyList()));
}
@Test
public void testIsSuccessForFullSuccess() {
GetBodiesFromPeerTask task = new GetBodiesFromPeerTask(Collections.emptyList(), null, null);
final List<Block> blockHeaders = List.of(mock(Block.class));
Assertions.assertTrue(task.isSuccess(blockHeaders));
}
private BlockHeader mockBlockHeader(final long blockNumber) {
BlockHeader blockHeader = Mockito.mock(BlockHeader.class);
Mockito.when(blockHeader.getNumber()).thenReturn(blockNumber);
// second to last hex digit indicates the blockNumber, last hex digit indicates the usage of the
// hash
Mockito.when(blockHeader.getHash())
.thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "1"));
Mockito.when(blockHeader.getBlockHash())
.thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "1"));
Mockito.when(blockHeader.getReceiptsRoot())
.thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "2"));
return blockHeader;
}
private static BlockHeader getNonEmptyBlockHeaderMock(final String transactionsRootHexString) {
final BlockHeader blockHeader = mock(BlockHeader.class);
when(blockHeader.getTransactionsRoot())
.thenReturn(Hash.fromHexStringLenient(transactionsRootHexString));
when(blockHeader.getOmmersHash()).thenReturn(Hash.EMPTY_LIST_HASH);
when(blockHeader.getWithdrawalsRoot()).thenReturn(Optional.empty());
when(blockHeader.getRequestsRoot()).thenReturn(Optional.empty());
return blockHeader;
}
private EthPeer mockPeer(final String protocol, final long chainHeight) {
EthPeer ethPeer = Mockito.mock(EthPeer.class);
ChainState chainState = Mockito.mock(ChainState.class);
Mockito.when(ethPeer.getProtocolName()).thenReturn(protocol);
Mockito.when(ethPeer.chainState()).thenReturn(chainState);
Mockito.when(chainState.getEstimatedHeight()).thenReturn(chainHeight);
return ethPeer;
}
}

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.sync.fullsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
@ -27,6 +28,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
@ -91,7 +93,8 @@ public class FullSyncChainDownloaderForkTest {
syncState,
metricsSystem,
SyncTerminationCondition.never(),
SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS);
SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS,
mock(PeerTaskExecutor.class));
}
private ChainDownloader downloader() {

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fullsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain;
import static org.mockito.Mockito.mock;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
@ -34,6 +35,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
@ -123,7 +125,8 @@ public class FullSyncChainDownloaderTest {
syncState,
metricsSystem,
SyncTerminationCondition.never(),
SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS);
SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS,
mock(PeerTaskExecutor.class));
}
private ChainDownloader downloader() {

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.sync.fullsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
@ -27,6 +28,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
@ -109,7 +111,8 @@ public class FullSyncChainDownloaderTotalTerminalDifficultyTest {
syncState,
metricsSystem,
terminalCondition,
SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS);
SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS,
mock(PeerTaskExecutor.class));
}
private SynchronizerConfiguration.Builder syncConfigBuilder() {

@ -98,6 +98,7 @@ public class FullSyncDownloaderTest {
syncState,
metricsSystem,
SyncTerminationCondition.never(),
null,
SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS);
}

@ -0,0 +1,251 @@
/*
* Copyright contributors to Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.tasks;
import static java.util.Arrays.asList;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.WithdrawalsProcessor;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class CompleteBlocksWithPeerTaskTest {
@BeforeAll
public static void setUp() {}
@Test
public void shouldFailWhenEmptyHeaders() {
assertThatThrownBy(() -> new CompleteBlocksWithPeerTask(null, Collections.emptyList(), null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Must supply a non-empty headers list");
}
@Test
public void shouldReturnEmptyBlock() {
final ProtocolSchedule protocolSchedule = getProtocolScheduleMock();
final BlockHeader blockHeader = getEmptyBlockHeaderMock();
final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class);
CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
new CompleteBlocksWithPeerTask(protocolSchedule, List.of(blockHeader), peerTaskExecutor);
assertThat(completeBlocksWithPeerTask.getBlocks()).isNotEmpty();
assertThat(completeBlocksWithPeerTask.getBlocks().size()).isEqualTo(1);
assertThat(BlockHeader.hasEmptyBlock(completeBlocksWithPeerTask.getBlocks().get(0).getHeader()))
.isTrue();
verify(peerTaskExecutor, Mockito.never()).execute(any());
}
@Test
public void shouldCreateWithdrawalsAwareEmptyBlock_whenWithdrawalsAreEnabled() {
final ProtocolSchedule mockProtocolSchedule = Mockito.mock(ProtocolSchedule.class);
final ProtocolSpec mockParisSpec = Mockito.mock(ProtocolSpec.class);
final ProtocolSpec mockShanghaiSpec = Mockito.mock(ProtocolSpec.class);
final WithdrawalsProcessor mockWithdrawalsProcessor = Mockito.mock(WithdrawalsProcessor.class);
final BlockHeader header1 =
new BlockHeaderTestFixture().number(1).withdrawalsRoot(null).buildHeader();
final BlockHeader header2 =
new BlockHeaderTestFixture().number(2).withdrawalsRoot(Hash.EMPTY_TRIE_HASH).buildHeader();
when(mockProtocolSchedule.getByBlockHeader((eq(header1)))).thenReturn(mockParisSpec);
when(mockParisSpec.getWithdrawalsProcessor()).thenReturn(Optional.empty());
when(mockProtocolSchedule.getByBlockHeader((eq(header2)))).thenReturn(mockShanghaiSpec);
when(mockShanghaiSpec.getWithdrawalsProcessor())
.thenReturn(Optional.of(mockWithdrawalsProcessor));
final List<Block> expectedBlocks = getExpectedBlocks(header1, header2);
final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class);
when(peerTaskExecutor.execute(any()))
.thenReturn(
new PeerTaskExecutorResult<>(
Optional.of(expectedBlocks), PeerTaskExecutorResponseCode.SUCCESS));
final CompleteBlocksWithPeerTask task =
new CompleteBlocksWithPeerTask(
mockProtocolSchedule, asList(header1, header2), peerTaskExecutor);
final List<Block> blocks = task.getBlocks();
assertThat(blocks).isEqualTo(expectedBlocks);
}
@Test
public void shouldReturnNonEmptyBlock() {
final Block block = mock(Block.class);
final ProtocolSchedule protocolSchedule = getProtocolScheduleMock();
final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class);
final BlockHeader nonEmptyBlockHeaderMock = getNonEmptyBlockHeaderMock("0x01", "0x02");
when(peerTaskExecutor.execute(any()))
.thenReturn(
new PeerTaskExecutorResult<>(
Optional.of(List.of(block)), PeerTaskExecutorResponseCode.SUCCESS));
CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
new CompleteBlocksWithPeerTask(
protocolSchedule, List.of(nonEmptyBlockHeaderMock), peerTaskExecutor);
assertThat(completeBlocksWithPeerTask.getBlocks()).isNotEmpty();
assertThat(completeBlocksWithPeerTask.getBlocks().size()).isEqualTo(1);
assertThat(completeBlocksWithPeerTask.getBlocks().get(0)).isEqualTo(block);
}
@Test
public void shouldReturnBlocksInRightOrderWhenEmptyAndNonEmptyBlocksRequested() {
final Block block1 = mock(Block.class);
final Block block3 = mock(Block.class);
final BlockHeader emptyBlockHeaderMock = getEmptyBlockHeaderMock();
final BlockHeader nonEmptyBlockHeaderMock1 = getNonEmptyBlockHeaderMock("0x01", "0x02");
final BlockHeader nonEmptyBlockHeaderMock3 = getNonEmptyBlockHeaderMock("0x03", "0x04");
final ProtocolSchedule protocolSchedule = getProtocolScheduleMock();
final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class);
when(peerTaskExecutor.execute(any()))
.thenReturn(
new PeerTaskExecutorResult<>(
Optional.of(List.of(block1, block3)), PeerTaskExecutorResponseCode.SUCCESS));
CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
new CompleteBlocksWithPeerTask(
protocolSchedule,
List.of(
nonEmptyBlockHeaderMock1,
emptyBlockHeaderMock,
nonEmptyBlockHeaderMock3,
emptyBlockHeaderMock),
peerTaskExecutor);
assertThat(completeBlocksWithPeerTask.getBlocks()).isNotEmpty();
assertThat(completeBlocksWithPeerTask.getBlocks().size()).isEqualTo(4);
assertThat(completeBlocksWithPeerTask.getBlocks().get(0)).isEqualTo(block1);
assertThat(BlockHeader.hasEmptyBlock(completeBlocksWithPeerTask.getBlocks().get(1).getHeader()))
.isTrue();
assertThat(completeBlocksWithPeerTask.getBlocks().get(2)).isEqualTo(block3);
assertThat(BlockHeader.hasEmptyBlock(completeBlocksWithPeerTask.getBlocks().get(3).getHeader()))
.isTrue();
}
@Test
public void shouldRequestMoreBodiesUntilFinished() {
final Block block1 = mock(Block.class);
final Block block3 = mock(Block.class);
final BlockHeader emptyBlockHeaderMock = getEmptyBlockHeaderMock();
final BlockHeader nonEmptyBlockHeaderMock1 = getNonEmptyBlockHeaderMock("0x01", "0x02");
final BlockHeader nonEmptyBlockHeaderMock3 = getNonEmptyBlockHeaderMock("0x03", "0x04");
final ProtocolSchedule protocolSchedule = getProtocolScheduleMock();
final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class);
when(peerTaskExecutor.execute(any()))
.thenReturn(
new PeerTaskExecutorResult<>(
Optional.of(List.of(block1)), PeerTaskExecutorResponseCode.SUCCESS))
.thenReturn(
new PeerTaskExecutorResult<>(
Optional.of(List.of(block3)), PeerTaskExecutorResponseCode.SUCCESS));
CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
new CompleteBlocksWithPeerTask(
protocolSchedule,
List.of(
nonEmptyBlockHeaderMock1,
emptyBlockHeaderMock,
nonEmptyBlockHeaderMock3,
emptyBlockHeaderMock),
peerTaskExecutor);
assertThat(completeBlocksWithPeerTask.getBlocks()).isNotEmpty();
assertThat(completeBlocksWithPeerTask.getBlocks().size()).isEqualTo(4);
assertThat(completeBlocksWithPeerTask.getBlocks().get(0)).isEqualTo(block1);
assertThat(BlockHeader.hasEmptyBlock(completeBlocksWithPeerTask.getBlocks().get(1).getHeader()))
.isTrue();
assertThat(completeBlocksWithPeerTask.getBlocks().get(2)).isEqualTo(block3);
assertThat(BlockHeader.hasEmptyBlock(completeBlocksWithPeerTask.getBlocks().get(3).getHeader()))
.isTrue();
}
private static ProtocolSchedule getProtocolScheduleMock() {
final ProtocolSchedule protocolSchedule = mock(ProtocolSchedule.class);
final ProtocolSpec protocolSpec = mock(ProtocolSpec.class);
final Optional<WithdrawalsProcessor> optional = Optional.of(mock(WithdrawalsProcessor.class));
when(protocolSpec.getWithdrawalsProcessor()).thenReturn(optional);
when(protocolSchedule.getByBlockHeader(any())).thenReturn(protocolSpec);
return protocolSchedule;
}
private static BlockHeader getEmptyBlockHeaderMock() {
final BlockHeader blockHeader = mock(BlockHeader.class);
when(blockHeader.getTransactionsRoot()).thenReturn(Hash.EMPTY_TRIE_HASH);
when(blockHeader.getOmmersHash()).thenReturn(Hash.EMPTY_LIST_HASH);
when(blockHeader.getWithdrawalsRoot()).thenReturn(Optional.empty());
when(blockHeader.getRequestsRoot()).thenReturn(Optional.empty());
return blockHeader;
}
private static BlockHeader getNonEmptyBlockHeaderMock(
final String transactionsRootHexString, final String ommersHash) {
final BlockHeader blockHeader = mock(BlockHeader.class);
when(blockHeader.getTransactionsRoot())
.thenReturn(Hash.fromHexStringLenient(transactionsRootHexString));
when(blockHeader.getOmmersHash()).thenReturn(Hash.fromHexStringLenient(ommersHash));
when(blockHeader.getWithdrawalsRoot()).thenReturn(Optional.empty());
when(blockHeader.getRequestsRoot()).thenReturn(Optional.empty());
return blockHeader;
}
private static List<Block> getExpectedBlocks(
final BlockHeader header1, final BlockHeader header2) {
final Block block1 =
new Block(
header1,
new BlockBody(
Collections.emptyList(),
Collections.emptyList(),
Optional.empty(),
Optional.empty()));
final Block block2 =
new Block(
header2,
new BlockBody(
Collections.emptyList(),
Collections.emptyList(),
Optional.of(Collections.emptyList()),
Optional.empty()));
return asList(block1, block2);
}
}
Loading…
Cancel
Save