Do not require a minimum block height when downloading headers or blocks (#3911)

* If needed update peer chain state when processing the block headers response

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Do not require a minimum block height when downloading headers or blocks

Peers could have that header, bacause our internal record about the
status of the peer could not always be up-to-date, so it is better
to avoid setting that constraint when selecting peers to download block
headers.

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Add CHANGELOG

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Complete the removal of minimumRequiredBlockNumber from constructors

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

Co-authored-by: Sally MacFarlane <sally.macfarlane@consensys.net>
pull/3985/head
Fabio Di Fabio 2 years ago committed by GitHub
parent d5739fb117
commit 261b1e03fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java
  3. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractPeerRequestTask.java
  4. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTask.java
  5. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBodiesFromPeerTask.java
  6. 39
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetHeadersFromPeerByHashTask.java
  7. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetHeadersFromPeerByNumberTask.java
  8. 18
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetHeadersEndingAtFromPeerByHashTask.java
  9. 1
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTracker.java
  10. 1
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadHeadersStep.java
  11. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStep.java
  12. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java
  13. 1
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/RangeHeadersFetcher.java
  14. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java
  15. 12
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByHashTask.java
  16. 19
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java
  17. 52
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/GetHeadersFromPeerByHashTaskTest.java
  18. 24
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/GetHeadersFromPeerByNumberTaskTest.java
  19. 40
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStepTest.java
  20. 62
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java

@ -12,6 +12,7 @@
- Support `finalized` and `safe` as tags for the block parameter in RPC APIs [#3950](https://github.com/hyperledger/besu/pull/3950)
- Added verification of payload attributes in ForkchoiceUpdated [#3837](https://github.com/hyperledger/besu/pull/3837)
- Add support for Gray Glacier hardfork [#3961](https://github.com/hyperledger/besu/issues/3961)
- Do not require a minimum block height when downloading headers or blocks [#3911](https://github.com/hyperledger/besu/pull/3911)
### Bug Fixes
- alias engine-rpc-port parameter with the former rpc param name [#3958](https://github.com/hyperledger/besu/pull/3958)

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.manager.task;
import static com.google.common.base.Preconditions.checkArgument;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
@ -90,9 +91,10 @@ public abstract class AbstractGetHeadersFromPeerTask
return Optional.empty();
}
final List<BlockHeader> headersList = new ArrayList<>();
final List<BlockHeader> headersList = new ArrayList<>(headers.size());
headersList.add(firstHeader);
BlockHeader prevBlockHeader = firstHeader;
updatePeerChainState(peer, firstHeader);
final int expectedDelta = reverse ? -(skip + 1) : (skip + 1);
for (int i = 1; i < headers.size(); i++) {
final BlockHeader header = headers.get(i);
@ -114,11 +116,24 @@ public abstract class AbstractGetHeadersFromPeerTask
}
prevBlockHeader = header;
headersList.add(header);
updatePeerChainState(peer, header);
}
LOG.debug("Received {} of {} headers requested from peer {}", headersList.size(), count, peer);
return Optional.of(headersList);
}
private void updatePeerChainState(final EthPeer peer, final BlockHeader blockHeader) {
if (blockHeader.getNumber() > peer.chainState().getEstimatedHeight()) {
traceLambda(
LOG,
"Updating chain state for peer {} to block header {}",
peer::getShortNodeId,
blockHeader::toLogString);
peer.chainState().update(blockHeader);
}
LOG.trace("Peer chain state {}", peer.chainState());
}
protected abstract boolean matchesFirstHeader(BlockHeader firstHeader);
}

@ -85,6 +85,10 @@ public abstract class AbstractPeerRequestTask<R> extends AbstractPeerTask<R> {
});
}
public PendingPeerRequest sendRequestToPeer(final PeerRequest request) {
return sendRequestToPeer(request, 0L);
}
public PendingPeerRequest sendRequestToPeer(
final PeerRequest request, final long minimumBlockNumber) {
return ethContext.getEthPeers().executePeerRequest(request, minimumBlockNumber, assignedPeer);

@ -103,7 +103,7 @@ public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
hash.map(
value ->
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule, ethContext, value, blockNumber, metricsSystem))
protocolSchedule, ethContext, value, metricsSystem))
.orElseGet(
() ->
GetHeadersFromPeerByNumberTask.forSingleNumber(

@ -82,14 +82,12 @@ public class GetBodiesFromPeerTask extends AbstractPeerRequestTask<List<Block>>
protected PendingPeerRequest sendRequest() {
final List<Hash> blockHashes =
headers.stream().map(BlockHeader::getHash).collect(Collectors.toList());
final long minimumRequiredBlockNumber = headers.get(headers.size() - 1).getNumber();
return sendRequestToPeer(
peer -> {
LOG.debug("Requesting {} bodies from peer {}.", blockHashes.size(), peer);
return peer.getBodies(blockHashes);
},
minimumRequiredBlockNumber);
});
}
@Override

@ -32,20 +32,17 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask
private static final Logger LOG = LoggerFactory.getLogger(GetHeadersFromPeerByHashTask.class);
private final Hash referenceHash;
private final long minimumRequiredBlockNumber;
@VisibleForTesting
GetHeadersFromPeerByHashTask(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash referenceHash,
final long minimumRequiredBlockNumber,
final int count,
final int skip,
final boolean reverse,
final MetricsSystem metricsSystem) {
super(protocolSchedule, ethContext, count, skip, reverse, metricsSystem);
this.minimumRequiredBlockNumber = minimumRequiredBlockNumber;
checkNotNull(referenceHash);
this.referenceHash = referenceHash;
}
@ -54,65 +51,40 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash firstHash,
final long firstBlockNumber,
final int segmentLength,
final MetricsSystem metricsSystem) {
return new GetHeadersFromPeerByHashTask(
protocolSchedule,
ethContext,
firstHash,
firstBlockNumber,
segmentLength,
0,
false,
metricsSystem);
protocolSchedule, ethContext, firstHash, segmentLength, 0, false, metricsSystem);
}
public static AbstractGetHeadersFromPeerTask startingAtHash(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash firstHash,
final long firstBlockNumber,
final int segmentLength,
final int skip,
final MetricsSystem metricsSystem) {
return new GetHeadersFromPeerByHashTask(
protocolSchedule,
ethContext,
firstHash,
firstBlockNumber,
segmentLength,
skip,
false,
metricsSystem);
protocolSchedule, ethContext, firstHash, segmentLength, skip, false, metricsSystem);
}
public static AbstractGetHeadersFromPeerTask endingAtHash(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash lastHash,
final long lastBlockNumber,
final int segmentLength,
final MetricsSystem metricsSystem) {
return new GetHeadersFromPeerByHashTask(
protocolSchedule,
ethContext,
lastHash,
lastBlockNumber,
segmentLength,
0,
true,
metricsSystem);
protocolSchedule, ethContext, lastHash, segmentLength, 0, true, metricsSystem);
}
public static AbstractGetHeadersFromPeerTask forSingleHash(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final long minimumRequiredBlockNumber,
final MetricsSystem metricsSystem) {
return new GetHeadersFromPeerByHashTask(
protocolSchedule, ethContext, hash, minimumRequiredBlockNumber, 1, 0, false, metricsSystem);
protocolSchedule, ethContext, hash, 1, 0, false, metricsSystem);
}
@Override
@ -121,8 +93,7 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask
peer -> {
LOG.debug("Requesting {} headers from peer {}.", count, peer);
return peer.getHeadersByHash(referenceHash, count, skip, reverse);
},
minimumRequiredBlockNumber);
});
}
@Override

@ -79,8 +79,7 @@ public class GetHeadersFromPeerByNumberTask extends AbstractGetHeadersFromPeerTa
peer -> {
LOG.debug("Requesting {} headers from peer {}.", count, peer);
return peer.getHeadersByNumber(blockNumber, count, skip, reverse);
},
blockNumber);
});
}
@Override

@ -35,7 +35,6 @@ public class RetryingGetHeadersEndingAtFromPeerByHashTask
private final Hash referenceHash;
private final ProtocolSchedule protocolSchedule;
private final long minimumRequiredBlockNumber;
private final int count;
@VisibleForTesting
@ -43,12 +42,10 @@ public class RetryingGetHeadersEndingAtFromPeerByHashTask
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash referenceHash,
final long minimumRequiredBlockNumber,
final int count,
final MetricsSystem metricsSystem) {
super(ethContext, 3, List::isEmpty, metricsSystem);
this.protocolSchedule = protocolSchedule;
this.minimumRequiredBlockNumber = minimumRequiredBlockNumber;
this.count = count;
checkNotNull(referenceHash);
this.referenceHash = referenceHash;
@ -58,16 +55,10 @@ public class RetryingGetHeadersEndingAtFromPeerByHashTask
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash referenceHash,
final long minimumRequiredBlockNumber,
final int count,
final MetricsSystem metricsSystem) {
return new RetryingGetHeadersEndingAtFromPeerByHashTask(
protocolSchedule,
ethContext,
referenceHash,
minimumRequiredBlockNumber,
count,
metricsSystem);
protocolSchedule, ethContext, referenceHash, count, metricsSystem);
}
@Override
@ -75,12 +66,7 @@ public class RetryingGetHeadersEndingAtFromPeerByHashTask
final Optional<EthPeer> assignedPeer) {
final AbstractGetHeadersFromPeerTask task =
GetHeadersFromPeerByHashTask.endingAtHash(
protocolSchedule,
getEthContext(),
referenceHash,
minimumRequiredBlockNumber,
count,
getMetricsSystem());
protocolSchedule, getEthContext(), referenceHash, count, getMetricsSystem());
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(

@ -73,7 +73,6 @@ public class ChainHeadTracker implements ConnectCallback {
protocolSchedule,
ethContext,
Hash.wrap(peer.chainState().getBestBlock().getHash()),
0,
metricsSystem)
.assignPeer(peer)
.run()

@ -96,7 +96,6 @@ public class DownloadHeadersStep
protocolSchedule,
ethContext,
range.getStart().getHash(),
range.getStart().getNumber(),
headerRequestSize,
metricsSystem)
.assignPeer(range.getSyncTarget())

@ -62,12 +62,6 @@ public class BackwardSyncStep {
protected CompletableFuture<List<BlockHeader>> requestHeaders(final Hash hash) {
final int batchSize = context.getBatchSize();
debugLambda(LOG, "Requesting header for hash {}", hash::toHexString);
final Optional<BlockHeader> maybeFinalizedHeader =
context
.getProtocolContext()
.getBlockchain()
.getFinalized()
.flatMap(context.getProtocolContext().getBlockchain()::getBlockHeader);
final RetryingGetHeadersEndingAtFromPeerByHashTask
retryingGetHeadersEndingAtFromPeerByHashTask =
@ -75,7 +69,6 @@ public class BackwardSyncStep {
context.getProtocolSchedule(),
context.getEthContext(),
hash,
maybeFinalizedHeader.map(BlockHeader::getNumber).orElse(0L),
batchSize,
context.getMetricsSystem());
return context

@ -253,11 +253,7 @@ public class FastSyncActions {
private CompletableFuture<FastSyncState> downloadPivotBlockHeader(final Hash hash) {
return RetryingGetHeaderFromPeerByHashTask.byHash(
protocolSchedule,
ethContext,
hash,
pivotBlockSelector.getMinRequiredBlockNumber(),
metricsSystem)
protocolSchedule, ethContext, hash, metricsSystem)
.getHeader()
.thenApply(
blockHeader -> {

@ -106,7 +106,6 @@ public class RangeHeadersFetcher {
protocolSchedule,
ethContext,
referenceHeader.getHash(),
referenceHeader.getNumber(),
// + 1 because lastHeader will be returned as well.
headerCount + 1,
skip,

@ -166,12 +166,7 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask<List<Bl
// Ask for count + 1 because we'll retrieve the previous header as well
final AbstractGetHeadersFromPeerTask headersTask =
GetHeadersFromPeerByHashTask.endingAtHash(
protocolSchedule,
ethContext,
referenceHash,
referenceHeaderForNextRequest.getNumber(),
count + 1,
metricsSystem);
protocolSchedule, ethContext, referenceHash, count + 1, metricsSystem);
assignedPeer.ifPresent(headersTask::assignPeer);
return headersTask.run();
});

@ -37,18 +37,15 @@ public class RetryingGetHeaderFromPeerByHashTask
private final Hash referenceHash;
private final ProtocolSchedule protocolSchedule;
private final long minimumRequiredBlockNumber;
@VisibleForTesting
RetryingGetHeaderFromPeerByHashTask(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash referenceHash,
final long minimumRequiredBlockNumber,
final MetricsSystem metricsSystem) {
super(ethContext, 3, List::isEmpty, metricsSystem);
this.protocolSchedule = protocolSchedule;
this.minimumRequiredBlockNumber = minimumRequiredBlockNumber;
checkNotNull(referenceHash);
this.referenceHash = referenceHash;
}
@ -57,10 +54,9 @@ public class RetryingGetHeaderFromPeerByHashTask
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash referenceHash,
final long minimumRequiredBlockNumber,
final MetricsSystem metricsSystem) {
return new RetryingGetHeaderFromPeerByHashTask(
protocolSchedule, ethContext, referenceHash, minimumRequiredBlockNumber, metricsSystem);
protocolSchedule, ethContext, referenceHash, metricsSystem);
}
@Override
@ -68,11 +64,7 @@ public class RetryingGetHeaderFromPeerByHashTask
final Optional<EthPeer> assignedPeer) {
final AbstractGetHeadersFromPeerTask task =
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule,
getEthContext(),
referenceHash,
minimumRequiredBlockNumber,
getMetricsSystem());
protocolSchedule, getEthContext(), referenceHash, getMetricsSystem());
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(

@ -16,18 +16,25 @@ package org.hyperledger.besu.ethereum.eth.manager.ethtaskutils;
import static org.assertj.core.api.Assertions.assertThat;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.EthTaskException;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.testutil.TestClock;
import org.hyperledger.besu.util.ExceptionUtils;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.junit.Test;
@ -147,4 +154,16 @@ public abstract class PeerMessageTaskTest<T>
}
protected abstract void assertPartialResultMatchesExpectation(T requestedData, T partialResponse);
protected EthPeer createPeer() {
final PeerConnection peerConnection = new MockPeerConnection(Set.of(EthProtocol.ETH66));
final Consumer<EthPeer> onPeerReady = (peer) -> {};
return new EthPeer(
peerConnection,
EthProtocol.NAME,
onPeerReady,
Collections.emptyList(),
TestClock.fixed(),
Collections.emptyList());
}
}

@ -17,14 +17,12 @@ package org.hyperledger.besu.ethereum.eth.manager.task;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hyperledger.besu.ethereum.referencetests.ReferenceTestBlockchain.generateTestBlockHash;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest;
import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage;
@ -41,12 +39,10 @@ import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class GetHeadersFromPeerByHashTaskTest extends PeerMessageTaskTest<List<BlockHeader>> {
@Mock private EthPeer peerMock;
@Override
protected void assertPartialResultMatchesExpectation(
@ -73,12 +69,7 @@ public class GetHeadersFromPeerByHashTaskTest extends PeerMessageTaskTest<List<B
final List<BlockHeader> requestedData) {
final BlockHeader firstHeader = requestedData.get(0);
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
ethContext,
firstHeader.getHash(),
firstHeader.getNumber(),
requestedData.size(),
metricsSystem);
protocolSchedule, ethContext, firstHeader.getHash(), requestedData.size(), metricsSystem);
}
@Test
@ -122,7 +113,6 @@ public class GetHeadersFromPeerByHashTaskTest extends PeerMessageTaskTest<List<B
protocolSchedule,
ethContext,
blockchain.getBlockHashByNumber(startNumber).get(),
startNumber,
count,
skip,
reverse,
@ -145,56 +135,46 @@ public class GetHeadersFromPeerByHashTaskTest extends PeerMessageTaskTest<List<B
@Test
public void checkThatSequentialHeadersFormingAChainWorks() {
final int startNumber = 1;
final BlockHeader block1 =
new BlockHeaderTestFixture().number(1).parentHash(generateTestBlockHash(0)).buildHeader();
final BlockHeader block2 =
new BlockHeaderTestFixture().number(2).parentHash(block1.getHash()).buildHeader();
final List<BlockHeader> headers = Arrays.asList(block1, block2);
final EthPeer peer = createPeer();
final AbstractGetHeadersFromPeerTask task =
new GetHeadersFromPeerByHashTask(
protocolSchedule,
ethContext,
block1.getHash(),
startNumber,
2,
0,
false,
metricsSystem);
protocolSchedule, ethContext, block1.getHash(), 2, 0, false, metricsSystem);
final Optional<List<BlockHeader>> optionalBlockHeaders =
task.processResponse(false, BlockHeadersMessage.create(headers), peerMock);
task.processResponse(false, BlockHeadersMessage.create(headers), peer);
assertThat(optionalBlockHeaders).isNotNull();
assertThat(optionalBlockHeaders).isPresent();
final List<BlockHeader> blockHeaders = optionalBlockHeaders.get();
MatcherAssert.assertThat(blockHeaders, hasSize(2));
verify(peerMock, times(0)).disconnect(any());
assertThat(peer.chainState().getEstimatedHeight()).isEqualTo(2);
assertThat(peer.isDisconnected()).isFalse();
}
@Test
public void checkThatSequentialHeadersNotFormingAChainFails() {
final int startNumber = 1;
final BlockHeader block1 =
new BlockHeaderTestFixture().number(1).parentHash(generateTestBlockHash(0)).buildHeader();
final BlockHeader block2 =
new BlockHeaderTestFixture().number(2).parentHash(generateTestBlockHash(1)).buildHeader();
final List<BlockHeader> headers = Arrays.asList(block1, block2);
final EthPeer peer = createPeer();
final AbstractGetHeadersFromPeerTask task =
new GetHeadersFromPeerByHashTask(
protocolSchedule,
ethContext,
block1.getHash(),
startNumber,
2,
0,
false,
metricsSystem);
protocolSchedule, ethContext, block1.getHash(), 2, 0, false, metricsSystem);
final Optional<List<BlockHeader>> optionalBlockHeaders =
task.processResponse(false, BlockHeadersMessage.create(headers), peerMock);
task.processResponse(false, BlockHeadersMessage.create(headers), peer);
assertThat(optionalBlockHeaders).isNotNull();
assertThat(optionalBlockHeaders).isEmpty();
verify(peerMock).disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);
assertThat(peer.isDisconnected()).isTrue();
assertThat(((MockPeerConnection) peer.getConnection()).getDisconnectReason().get())
.isEqualTo(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);
}
}

@ -17,14 +17,12 @@ package org.hyperledger.besu.ethereum.eth.manager.task;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hyperledger.besu.ethereum.referencetests.ReferenceTestBlockchain.generateTestBlockHash;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest;
import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage;
@ -41,13 +39,10 @@ import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class GetHeadersFromPeerByNumberTaskTest extends PeerMessageTaskTest<List<BlockHeader>> {
@Mock private EthPeer peerMock;
@Override
protected void assertPartialResultMatchesExpectation(
final List<BlockHeader> requestedData, final List<BlockHeader> partialResponse) {
@ -138,16 +133,20 @@ public class GetHeadersFromPeerByNumberTaskTest extends PeerMessageTaskTest<List
final BlockHeader block2 =
new BlockHeaderTestFixture().number(2).parentHash(block1.getHash()).buildHeader();
final List<BlockHeader> headers = Arrays.asList(block1, block2);
final EthPeer peer = createPeer();
final AbstractGetHeadersFromPeerTask task =
new GetHeadersFromPeerByNumberTask(
protocolSchedule, ethContext, block1.getNumber(), 2, 0, false, metricsSystem);
final Optional<List<BlockHeader>> optionalBlockHeaders =
task.processResponse(false, BlockHeadersMessage.create(headers), peerMock);
task.processResponse(false, BlockHeadersMessage.create(headers), peer);
assertThat(optionalBlockHeaders).isNotNull();
assertThat(optionalBlockHeaders).isPresent();
final List<BlockHeader> blockHeaders = optionalBlockHeaders.get();
MatcherAssert.assertThat(blockHeaders, hasSize(2));
verify(peerMock, times(0)).disconnect(any());
assertThat(peer.isDisconnected()).isFalse();
assertThat(peer.chainState().getEstimatedHeight()).isEqualTo(2);
}
@Test
@ -158,13 +157,18 @@ public class GetHeadersFromPeerByNumberTaskTest extends PeerMessageTaskTest<List
final BlockHeader block2 =
new BlockHeaderTestFixture().number(2).parentHash(generateTestBlockHash(1)).buildHeader();
final List<BlockHeader> headers = Arrays.asList(block1, block2);
final EthPeer peer = createPeer();
final AbstractGetHeadersFromPeerTask task =
new GetHeadersFromPeerByNumberTask(
protocolSchedule, ethContext, block1.getNumber(), 2, 0, false, metricsSystem);
final Optional<List<BlockHeader>> optionalBlockHeaders =
task.processResponse(false, BlockHeadersMessage.create(headers), peerMock);
task.processResponse(false, BlockHeadersMessage.create(headers), peer);
assertThat(optionalBlockHeaders).isNotNull();
assertThat(optionalBlockHeaders).isEmpty();
verify(peerMock).disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);
assertThat(peer.isDisconnected()).isTrue();
assertThat(((MockPeerConnection) peer.getConnection()).getDisconnectReason().get())
.isEqualTo(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);
}
}

@ -16,7 +16,6 @@ package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForClassTypes.failBecauseExceptionWasNotThrown;
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@ -34,7 +33,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.mainnet.MainnetProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@ -42,9 +40,6 @@ import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.junit.Before;
@ -185,41 +180,6 @@ public class BackwardSyncStepTest {
assertThat(blockHeader).isEqualTo(lookingForBlock.getHeader());
}
@Test
public void shouldNotRequestHeaderBeforeLastFinalizedBlock() throws Exception {
final MutableBlockchain localBlockchain = context.getProtocolContext().getBlockchain();
extendBlockchain(REMOTE_HEIGHT + 2, localBlockchain);
localBlockchain.setFinalized(
localBlockchain.getBlockHashByNumber(REMOTE_HEIGHT + 1).orElseThrow());
BackwardSyncStep step = new BackwardSyncStep(context, createBackwardChain(REMOTE_HEIGHT - 1));
final Block lookingForBlock = getBlockByNumber(REMOTE_HEIGHT - 2);
final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(remoteBlockchain);
final CompletableFuture<List<BlockHeader>> future =
step.requestHeaders(lookingForBlock.getHeader().getHash());
ScheduledExecutorService schedExecutor = Executors.newScheduledThreadPool(2);
schedExecutor.submit(
() -> peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()));
schedExecutor.scheduleWithFixedDelay(
ethScheduler::expirePendingTimeouts, 0, 100, TimeUnit.MILLISECONDS);
future
.handle(
(r, t) -> {
if (t == null || !(t.getCause() instanceof MaxRetriesReachedException)) {
failBecauseExceptionWasNotThrown(MaxRetriesReachedException.class);
}
return r;
})
.thenRun(schedExecutor::shutdownNow)
.join();
}
@Test
public void shouldThrowWhenResponseIsEmptyWhenRequestingHeader() throws Exception {
BackwardSyncStep step = new BackwardSyncStep(context, createBackwardChain(REMOTE_HEIGHT - 1));

@ -16,27 +16,27 @@ package org.hyperledger.besu.ethereum.eth.sync.tasks;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.PeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class CompleteBlocksTaskTest extends RetryingMessageTaskTest<List<Block>> {
@ -47,7 +47,7 @@ public class CompleteBlocksTaskTest extends RetryingMessageTaskTest<List<Block>>
protected List<Block> generateDataToBeRequested(final int nbBlock) {
// Setup data to be requested and expected response
final List<Block> blocks = new ArrayList<>();
final List<Block> blocks = new ArrayList<>(nbBlock);
for (long i = 0; i < nbBlock; i++) {
final BlockHeader header = blockchain.getBlockHeader(10 + i).get();
final BlockBody body = blockchain.getBlockBody(header.getHash()).get();
@ -57,7 +57,7 @@ public class CompleteBlocksTaskTest extends RetryingMessageTaskTest<List<Block>>
}
@Override
protected EthTask<List<Block>> createTask(final List<Block> requestedData) {
protected CompleteBlocksTask createTask(final List<Block> requestedData) {
final List<BlockHeader> headersToComplete =
requestedData.stream().map(Block::getHeader).collect(Collectors.toList());
return CompleteBlocksTask.forHeaders(
@ -82,29 +82,33 @@ public class CompleteBlocksTaskTest extends RetryingMessageTaskTest<List<Block>>
@SuppressWarnings("unchecked")
@Test
public void shouldReduceTheBlockSegmentSizeAfterEachRetry() {
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
peerCountToTimeout.set(3);
final List<Block> requestedData = generateDataToBeRequested(10);
final EthTask<List<Block>> task = createTask(requestedData);
final CompleteBlocksTask task = createTask(requestedData);
final CompletableFuture<List<Block>> future = task.run();
ArgumentCaptor<Long> blockNumbersCaptor = ArgumentCaptor.forClass(Long.class);
final List<MessageData> messageCollector = new ArrayList<>();
verify(ethPeers, times(4))
.executePeerRequest(
any(PeerRequest.class), blockNumbersCaptor.capture(), any(Optional.class));
respondingPeer.respond(
RespondingEthPeer.wrapResponderWithCollector(
RespondingEthPeer.emptyResponder(), messageCollector));
assertThat(future.isDone()).isFalse();
assertThat(blockNumbersCaptor.getAllValues().get(0)).isEqualTo(19);
assertThat(blockNumbersCaptor.getAllValues().get(1)).isEqualTo(14);
assertThat(blockNumbersCaptor.getAllValues().get(2)).isEqualTo(13);
assertThat(blockNumbersCaptor.getAllValues().get(3)).isEqualTo(10);
assertThat(batchSize(messageCollector.get(0))).isEqualTo(10);
assertThat(batchSize(messageCollector.get(1))).isEqualTo(5);
assertThat(batchSize(messageCollector.get(2))).isEqualTo(4);
assertThat(future.isCompletedExceptionally()).isTrue();
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class);
}
@SuppressWarnings("unchecked")
@Test
public void shouldNotReduceTheBlockSegmentSizeIfOnlyOneBlockNeeded() {
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
peerCountToTimeout.set(3);
final List<Block> requestedData = generateDataToBeRequested(1);
@ -112,16 +116,20 @@ public class CompleteBlocksTaskTest extends RetryingMessageTaskTest<List<Block>>
final EthTask<List<Block>> task = createTask(requestedData);
final CompletableFuture<List<Block>> future = task.run();
ArgumentCaptor<Long> blockNumbersCaptor = ArgumentCaptor.forClass(Long.class);
final List<MessageData> messageCollector = new ArrayList<>();
respondingPeer.respond(
RespondingEthPeer.wrapResponderWithCollector(
RespondingEthPeer.emptyResponder(), messageCollector));
verify(ethPeers, times(4))
.executePeerRequest(
any(PeerRequest.class), blockNumbersCaptor.capture(), any(Optional.class));
assertThat(batchSize(messageCollector.get(0))).isEqualTo(1);
assertThat(batchSize(messageCollector.get(1))).isEqualTo(1);
assertThat(batchSize(messageCollector.get(2))).isEqualTo(1);
assertThat(future.isCompletedExceptionally()).isTrue();
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class);
}
assertThat(future.isDone()).isFalse();
assertThat(blockNumbersCaptor.getAllValues().get(0)).isEqualTo(10);
assertThat(blockNumbersCaptor.getAllValues().get(1)).isEqualTo(10);
assertThat(blockNumbersCaptor.getAllValues().get(2)).isEqualTo(10);
assertThat(blockNumbersCaptor.getAllValues().get(3)).isEqualTo(10);
private long batchSize(final MessageData msg) {
return ((GetBlockBodiesMessage) msg).hashes().spliterator().getExactSizeIfKnown();
}
}

Loading…
Cancel
Save