7311: Update after merge and make discussed changes from walkthrough discussion

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7638/head
Matilda Clerke 2 months ago
parent fae39a8db7
commit 07852dceed
  1. 16
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  2. 7
      besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java
  3. 7
      besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java
  4. 7
      besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java
  5. 14
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java
  6. 24
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java
  7. 86
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java
  8. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java
  9. 7
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  10. 39
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java
  11. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java
  12. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java
  13. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java

@ -55,8 +55,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors; import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors;
import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRequestSender; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRequestSender;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
@ -656,10 +654,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
} }
final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler); final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler);
final PeerSelector peerSelector = new DefaultPeerSelector(currentProtocolSpecSupplier);
ethPeers.streamAllPeers().forEach(peerSelector::addPeer);
final PeerTaskExecutor peerTaskExecutor = final PeerTaskExecutor peerTaskExecutor =
new PeerTaskExecutor(peerSelector, new PeerTaskRequestSender(), scheduler, metricsSystem); new PeerTaskExecutor(ethPeers, new PeerTaskRequestSender(), scheduler, metricsSystem);
final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode()); final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint); final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint);
@ -699,8 +695,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
scheduler, scheduler,
peerValidators, peerValidators,
Optional.empty(), Optional.empty(),
forkIdManager, forkIdManager);
peerSelector);
final PivotBlockSelector pivotBlockSelector = final PivotBlockSelector pivotBlockSelector =
createPivotSelector( createPivotSelector(
@ -1035,7 +1030,6 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
* @param peerValidators the peer validators * @param peerValidators the peer validators
* @param mergePeerFilter the merge peer filter * @param mergePeerFilter the merge peer filter
* @param forkIdManager the fork id manager * @param forkIdManager the fork id manager
* @param peerSelector the PeerSelector
* @return the eth protocol manager * @return the eth protocol manager
*/ */
protected EthProtocolManager createEthProtocolManager( protected EthProtocolManager createEthProtocolManager(
@ -1049,8 +1043,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final EthScheduler scheduler, final EthScheduler scheduler,
final List<PeerValidator> peerValidators, final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter, final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager, final ForkIdManager forkIdManager) {
final PeerSelector peerSelector) {
return new EthProtocolManager( return new EthProtocolManager(
protocolContext.getBlockchain(), protocolContext.getBlockchain(),
networkId, networkId,
@ -1064,8 +1057,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
mergePeerFilter, mergePeerFilter,
synchronizerConfiguration, synchronizerConfiguration,
scheduler, scheduler,
forkIdManager, forkIdManager);
peerSelector);
} }
/** /**

@ -42,7 +42,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -244,8 +243,7 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
final EthScheduler scheduler, final EthScheduler scheduler,
final List<PeerValidator> peerValidators, final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter, final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager, final ForkIdManager forkIdManager) {
final PeerSelector peerSelector) {
return besuControllerBuilderSchedule return besuControllerBuilderSchedule
.get(0L) .get(0L)
.createEthProtocolManager( .createEthProtocolManager(
@ -259,8 +257,7 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
scheduler, scheduler,
peerValidators, peerValidators,
mergePeerFilter, mergePeerFilter,
forkIdManager, forkIdManager);
peerSelector);
} }
@Override @Override

@ -34,7 +34,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -100,8 +99,7 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
final EthScheduler scheduler, final EthScheduler scheduler,
final List<PeerValidator> peerValidators, final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter, final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager, final ForkIdManager forkIdManager) {
final PeerSelector peerSelector) {
var mergeContext = protocolContext.getConsensusContext(MergeContext.class); var mergeContext = protocolContext.getConsensusContext(MergeContext.class);
@ -131,8 +129,7 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
scheduler, scheduler,
peerValidators, peerValidators,
filterToUse, filterToUse,
forkIdManager, forkIdManager);
peerSelector);
return ethProtocolManager; return ethProtocolManager;
} }

@ -40,7 +40,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer; import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
@ -165,8 +164,7 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
final EthScheduler scheduler, final EthScheduler scheduler,
final List<PeerValidator> peerValidators, final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter, final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager, final ForkIdManager forkIdManager) {
final PeerSelector peerSelector) {
return mergeBesuControllerBuilder.createEthProtocolManager( return mergeBesuControllerBuilder.createEthProtocolManager(
protocolContext, protocolContext,
synchronizerConfiguration, synchronizerConfiguration,
@ -178,8 +176,7 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
scheduler, scheduler,
peerValidators, peerValidators,
mergePeerFilter, mergePeerFilter,
forkIdManager, forkIdManager);
peerSelector);
} }
@Override @Override

@ -23,7 +23,6 @@ import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.StatusMessage; import org.hyperledger.besu.ethereum.eth.messages.StatusMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
@ -70,7 +69,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
private final Hash genesisHash; private final Hash genesisHash;
private final ForkIdManager forkIdManager; private final ForkIdManager forkIdManager;
private final PeerSelector peerSelector;
private final BigInteger networkId; private final BigInteger networkId;
private final EthPeers ethPeers; private final EthPeers ethPeers;
private final EthMessages ethMessages; private final EthMessages ethMessages;
@ -94,8 +92,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final Optional<MergePeerFilter> mergePeerFilter, final Optional<MergePeerFilter> mergePeerFilter,
final SynchronizerConfiguration synchronizerConfiguration, final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler, final EthScheduler scheduler,
final ForkIdManager forkIdManager, final ForkIdManager forkIdManager) {
final PeerSelector peerSelector) {
this.networkId = networkId; this.networkId = networkId;
this.peerValidators = peerValidators; this.peerValidators = peerValidators;
this.scheduler = scheduler; this.scheduler = scheduler;
@ -105,7 +102,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
this.genesisHash = blockchain.getBlockHashByNumber(0L).orElse(Hash.ZERO); this.genesisHash = blockchain.getBlockHashByNumber(0L).orElse(Hash.ZERO);
this.forkIdManager = forkIdManager; this.forkIdManager = forkIdManager;
this.peerSelector = peerSelector;
this.ethPeers = ethPeers; this.ethPeers = ethPeers;
this.ethMessages = ethMessages; this.ethMessages = ethMessages;
@ -144,8 +140,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final List<PeerValidator> peerValidators, final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter, final Optional<MergePeerFilter> mergePeerFilter,
final SynchronizerConfiguration synchronizerConfiguration, final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler, final EthScheduler scheduler) {
final PeerSelector peerSelector) {
this( this(
blockchain, blockchain,
networkId, networkId,
@ -163,8 +158,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
blockchain, blockchain,
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(),
ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()), ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()));
peerSelector);
} }
public EthContext ethContext() { public EthContext ethContext() {
@ -343,7 +337,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
public void handleNewConnection(final PeerConnection connection) { public void handleNewConnection(final PeerConnection connection) {
ethPeers.registerNewConnection(connection, peerValidators); ethPeers.registerNewConnection(connection, peerValidators);
final EthPeer peer = ethPeers.peer(connection); final EthPeer peer = ethPeers.peer(connection);
peerSelector.addPeer(peer);
final Capability cap = connection.capability(getSupportedProtocol()); final Capability cap = connection.capability(getSupportedProtocol());
final ForkId latestForkId = final ForkId latestForkId =
cap.getVersion() >= 64 ? forkIdManager.getForkIdForChainHead() : null; cap.getVersion() >= 64 ? forkIdManager.getForkIdForChainHead() : null;
@ -375,7 +368,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final DisconnectReason reason, final DisconnectReason reason,
final boolean initiatedByPeer) { final boolean initiatedByPeer) {
final boolean wasActiveConnection = ethPeers.registerDisconnect(connection); final boolean wasActiveConnection = ethPeers.registerDisconnect(connection);
peerSelector.removePeer(connection.getPeer());
LOG.atDebug() LOG.atDebug()
.setMessage("Disconnect - active Connection? {} - {} - {} - {} {} - {} peers left") .setMessage("Disconnect - active Connection? {} - {} - {} - {} {} - {} peers left")
.addArgument(wasActiveConnection) .addArgument(wasActiveConnection)

@ -18,6 +18,7 @@ import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException; import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRetryBehavior; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRetryBehavior;
@ -32,6 +33,7 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Predicate;
public class GetReceiptsFromPeerTask public class GetReceiptsFromPeerTask
implements PeerTask<Map<BlockHeader, List<TransactionReceipt>>> { implements PeerTask<Map<BlockHeader, List<TransactionReceipt>>> {
@ -39,6 +41,7 @@ public class GetReceiptsFromPeerTask
private final Collection<BlockHeader> blockHeaders; private final Collection<BlockHeader> blockHeaders;
private final BodyValidator bodyValidator; private final BodyValidator bodyValidator;
private final Map<Hash, List<BlockHeader>> headersByReceiptsRoot = new HashMap<>(); private final Map<Hash, List<BlockHeader>> headersByReceiptsRoot = new HashMap<>();
private final long requiredBlockchainHeight;
public GetReceiptsFromPeerTask( public GetReceiptsFromPeerTask(
final Collection<BlockHeader> blockHeaders, final BodyValidator bodyValidator) { final Collection<BlockHeader> blockHeaders, final BodyValidator bodyValidator) {
@ -50,6 +53,12 @@ public class GetReceiptsFromPeerTask
headersByReceiptsRoot headersByReceiptsRoot
.computeIfAbsent(header.getReceiptsRoot(), key -> new ArrayList<>()) .computeIfAbsent(header.getReceiptsRoot(), key -> new ArrayList<>())
.add(header)); .add(header));
requiredBlockchainHeight =
blockHeaders.stream()
.mapToLong(BlockHeader::getNumber)
.max()
.orElse(BlockHeader.GENESIS_BLOCK_NUMBER);
} }
@Override @Override
@ -57,14 +66,6 @@ public class GetReceiptsFromPeerTask
return EthProtocol.get(); return EthProtocol.get();
} }
@Override
public long getRequiredBlockNumber() {
return blockHeaders.stream()
.mapToLong(BlockHeader::getNumber)
.max()
.orElse(BlockHeader.GENESIS_BLOCK_NUMBER);
}
@Override @Override
public MessageData getRequestMessage() { public MessageData getRequestMessage() {
// Since we have to match up the data by receipt root, we only need to request receipts // Since we have to match up the data by receipt root, we only need to request receipts
@ -106,4 +107,11 @@ public class GetReceiptsFromPeerTask
return List.of( return List.of(
PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS, PeerTaskRetryBehavior.RETRY_WITH_SAME_PEER); PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS, PeerTaskRetryBehavior.RETRY_WITH_SAME_PEER);
} }
@Override
public Predicate<EthPeer> getPeerRequirementFilter() {
return (ethPeer) ->
ethPeer.getProtocolName().equals(getSubProtocol().getName())
&& ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight;
}
} }

@ -22,7 +22,6 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
@ -32,18 +31,15 @@ import org.hyperledger.besu.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator; import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function; import java.util.function.Function;
import com.google.common.collect.Lists;
public class DownloadReceiptsStep public class DownloadReceiptsStep
implements Function<List<Block>, CompletableFuture<List<BlockWithReceipts>>> { implements Function<List<Block>, CompletableFuture<List<BlockWithReceipts>>> {
private final EthContext ethContext; private final EthContext ethContext;
private final PeerTaskExecutor peerTaskExecutor; private final PeerTaskExecutor peerTaskExecutor;
private final SynchronizerConfiguration synchronizerConfiguration; private final SynchronizerConfiguration synchronizerConfiguration;
@ -64,49 +60,43 @@ public class DownloadReceiptsStep
public CompletableFuture<List<BlockWithReceipts>> apply(final List<Block> blocks) { public CompletableFuture<List<BlockWithReceipts>> apply(final List<Block> blocks) {
final List<BlockHeader> headers = blocks.stream().map(Block::getHeader).collect(toList()); final List<BlockHeader> headers = blocks.stream().map(Block::getHeader).collect(toList());
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
return CompletableFuture.supplyAsync( return ethContext
() -> { .getScheduler()
Map<BlockHeader, List<TransactionReceipt>> getReceipts = new ConcurrentHashMap<>(); .scheduleSyncWorkerTask(
do { () -> {
List<List<BlockHeader>> blockHeaderSubLists = Lists.partition(headers, 20); Map<BlockHeader, List<TransactionReceipt>> getReceipts = new ConcurrentHashMap<>();
List<PeerTask<Map<BlockHeader, List<TransactionReceipt>>>> tasks = new ArrayList<>(); do {
for (List<BlockHeader> blockHeaderSubList : blockHeaderSubLists) { GetReceiptsFromPeerTask task =
tasks.add(new GetReceiptsFromPeerTask(blockHeaderSubList, new BodyValidator())); new GetReceiptsFromPeerTask(headers, new BodyValidator());
} PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>>
Collection< getReceiptsResult = peerTaskExecutor.execute(task);
CompletableFuture< if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>>>> && getReceiptsResult.result().isPresent()) {
taskExecutions = peerTaskExecutor.executeBatchAsync(tasks); Map<BlockHeader, List<TransactionReceipt>> taskResult =
for (CompletableFuture< getReceiptsResult.result().get();
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>>> taskResult
taskExecution : taskExecutions) { .keySet()
taskExecution.thenAccept( .forEach(
(getReceiptsResult) -> { (blockHeader) ->
if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS getReceipts.merge(
&& getReceiptsResult.result().isPresent()) { blockHeader,
Map<BlockHeader, List<TransactionReceipt>> taskResult = taskResult.get(blockHeader),
getReceiptsResult.result().get(); (initialReceipts, newReceipts) -> {
taskResult throw new IllegalStateException(
.keySet() "Unexpectedly got receipts for block header already populated!");
.forEach( }));
(blockHeader) -> } else if (getReceiptsResult.responseCode()
getReceipts.merge( == PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE) {
blockHeader, throw new RuntimeException(
taskResult.get(blockHeader), "No peer available, unable to complete DownloadReceiptsStep");
(initialReceipts, newReceipts) -> { }
throw new IllegalStateException( // remove all the headers we found receipts for
"Unexpectedly got receipts for block header already populated!"); headers.removeAll(getReceipts.keySet());
})); // repeat until all headers have receipts
} } while (!headers.isEmpty());
}); return CompletableFuture.completedFuture(
} combineBlocksAndReceipts(blocks, getReceipts));
taskExecutions.forEach(CompletableFuture::join); });
// remove all the headers we found receipts for
headers.removeAll(getReceipts.keySet());
// repeat until all headers have receipts
} while (!headers.isEmpty());
return combineBlocksAndReceipts(blocks, getReceipts);
});
} else { } else {
return GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem) return GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem)

@ -44,7 +44,6 @@ import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.EthProtocolVersion; import org.hyperledger.besu.ethereum.eth.EthProtocolVersion;
import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection.PeerSendHandler; import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection.PeerSendHandler;
import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage; import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage; import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
@ -1244,8 +1243,7 @@ public final class EthProtocolManagerTest {
Optional.empty(), Optional.empty(),
syncConfig, syncConfig,
mock(EthScheduler.class), mock(EthScheduler.class),
mock(ForkIdManager.class), mock(ForkIdManager.class))) {
new DefaultPeerSelector(() -> null))) {
return ethManager; return ethManager;
} }

@ -30,7 +30,6 @@ import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture; import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture;
import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
@ -118,8 +117,7 @@ public class EthProtocolManagerTestUtil {
mergePeerFilter, mergePeerFilter,
mock(SynchronizerConfiguration.class), mock(SynchronizerConfiguration.class),
ethScheduler, ethScheduler,
new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false), new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false));
new DefaultPeerSelector(() -> null));
} }
public static EthProtocolManager create( public static EthProtocolManager create(
@ -170,8 +168,7 @@ public class EthProtocolManagerTestUtil {
Optional.empty(), Optional.empty(),
mock(SynchronizerConfiguration.class), mock(SynchronizerConfiguration.class),
ethScheduler, ethScheduler,
forkIdManager, forkIdManager);
new DefaultPeerSelector(() -> null));
} }
public static EthProtocolManager create(final Blockchain blockchain) { public static EthProtocolManager create(final Blockchain blockchain) {

@ -18,6 +18,8 @@ import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.ChainState;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException; import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.messages.EthPV63; import org.hyperledger.besu.ethereum.eth.messages.EthPV63;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage; import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
@ -44,14 +46,6 @@ public class GetReceiptsFromPeerTaskTest {
Assertions.assertEquals(EthProtocol.get(), task.getSubProtocol()); Assertions.assertEquals(EthProtocol.get(), task.getSubProtocol());
} }
@Test
public void testGetRequiredBlockNumber() {
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(
List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null);
Assertions.assertEquals(3, task.getRequiredBlockNumber());
}
@Test @Test
public void testGetRequestMessage() { public void testGetRequestMessage() {
GetReceiptsFromPeerTask task = GetReceiptsFromPeerTask task =
@ -137,6 +131,24 @@ public class GetReceiptsFromPeerTaskTest {
Assertions.assertEquals(List.of(receiptForBlock3), resultMap.get(blockHeader3)); Assertions.assertEquals(List.of(receiptForBlock3), resultMap.get(blockHeader3));
} }
@Test
public void testGetPeerRequirementFilter() {
BlockHeader blockHeader1 = mockBlockHeader(1);
BlockHeader blockHeader2 = mockBlockHeader(2);
BlockHeader blockHeader3 = mockBlockHeader(3);
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(List.of(blockHeader1, blockHeader2, blockHeader3), null);
EthPeer failForIncorrectProtocol = mockPeer("incorrectProtocol", 5);
EthPeer failForShortChainHeight = mockPeer("incorrectProtocol", 1);
EthPeer successfulCandidate = mockPeer(EthProtocol.NAME, 5);
Assertions.assertFalse(task.getPeerRequirementFilter().test(failForIncorrectProtocol));
Assertions.assertFalse(task.getPeerRequirementFilter().test(failForShortChainHeight));
Assertions.assertTrue(task.getPeerRequirementFilter().test(successfulCandidate));
}
private BlockHeader mockBlockHeader(final long blockNumber) { private BlockHeader mockBlockHeader(final long blockNumber) {
BlockHeader blockHeader = Mockito.mock(BlockHeader.class); BlockHeader blockHeader = Mockito.mock(BlockHeader.class);
Mockito.when(blockHeader.getNumber()).thenReturn(blockNumber); Mockito.when(blockHeader.getNumber()).thenReturn(blockNumber);
@ -149,4 +161,15 @@ public class GetReceiptsFromPeerTaskTest {
return blockHeader; return blockHeader;
} }
private EthPeer mockPeer(final String protocol, final long chainHeight) {
EthPeer ethPeer = Mockito.mock(EthPeer.class);
ChainState chainState = Mockito.mock(ChainState.class);
Mockito.when(ethPeer.getProtocolName()).thenReturn(protocol);
Mockito.when(ethPeer.chainState()).thenReturn(chainState);
Mockito.when(chainState.getEstimatedHeight()).thenReturn(chainHeight);
return ethPeer;
}
} }

@ -124,8 +124,8 @@ public class DownloadReceiptsStepTest {
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> peerTaskResult = PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> peerTaskResult =
new PeerTaskExecutorResult<>( new PeerTaskExecutorResult<>(
Optional.of(receiptsMap), PeerTaskExecutorResponseCode.SUCCESS); Optional.of(receiptsMap), PeerTaskExecutorResponseCode.SUCCESS);
Mockito.when(peerTaskExecutor.execute(Mockito.any(GetReceiptsFromPeerTask.class))) Mockito.when(peerTaskExecutor.executeAsync(Mockito.any(GetReceiptsFromPeerTask.class)))
.thenReturn(peerTaskResult); .thenReturn(CompletableFuture.completedFuture(peerTaskResult));
final CompletableFuture<List<BlockWithReceipts>> result = downloadReceiptsStep.apply(blocks); final CompletableFuture<List<BlockWithReceipts>> result = downloadReceiptsStep.apply(blocks);

@ -44,7 +44,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -197,8 +196,7 @@ public class TestNode implements Closeable {
Collections.emptyList(), Collections.emptyList(),
Optional.empty(), Optional.empty(),
syncConfig, syncConfig,
scheduler, scheduler);
new DefaultPeerSelector(() -> null));
final NetworkRunner networkRunner = final NetworkRunner networkRunner =
NetworkRunner.builder() NetworkRunner.builder()

@ -45,7 +45,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
@ -319,8 +318,7 @@ public class TransactionPoolFactoryTest {
Optional.empty(), Optional.empty(),
mock(SynchronizerConfiguration.class), mock(SynchronizerConfiguration.class),
mock(EthScheduler.class), mock(EthScheduler.class),
mock(ForkIdManager.class), mock(ForkIdManager.class));
new DefaultPeerSelector(() -> null));
} }
@Test @Test

Loading…
Cancel
Save