diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index a200d4b5a4..0ef6564cbb 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -29,10 +29,9 @@ import org.hyperledger.besu.ethereum.core.PrivacyParameters; import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; -import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.peervalidation.DaoForkPeerValidator; -import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidatorRunner; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -245,7 +244,9 @@ public abstract class BesuControllerBuilder { })); final boolean fastSyncEnabled = syncConfig.getSyncMode().equals(SyncMode.FAST); - ethProtocolManager = createEthProtocolManager(protocolContext, fastSyncEnabled); + ethProtocolManager = + createEthProtocolManager( + protocolContext, fastSyncEnabled, createPeerValidators(protocolSchedule)); final SyncState syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers()); final Synchronizer synchronizer = @@ -262,17 +263,6 @@ public abstract class BesuControllerBuilder { clock, metricsSystem); - final OptionalLong daoBlock = - genesisConfig.getConfigOptions(genesisConfigOverrides).getDaoForkBlock(); - if (daoBlock.isPresent()) { - // Setup dao validator - final EthContext ethContext = ethProtocolManager.ethContext(); - final DaoForkPeerValidator daoForkPeerValidator = - new DaoForkPeerValidator( - ethContext, protocolSchedule, metricsSystem, daoBlock.getAsLong()); - PeerValidatorRunner.runValidator(ethContext, daoForkPeerValidator); - } - final TransactionPool transactionPool = TransactionPoolFactory.createTransactionPool( protocolSchedule, @@ -356,11 +346,14 @@ public abstract class BesuControllerBuilder { Blockchain blockchain, WorldStateArchive worldStateArchive); protected EthProtocolManager createEthProtocolManager( - final ProtocolContext protocolContext, final boolean fastSyncEnabled) { + final ProtocolContext protocolContext, + final boolean fastSyncEnabled, + final List peerValidators) { return new EthProtocolManager( protocolContext.getBlockchain(), protocolContext.getWorldStateArchive(), networkId, + peerValidators, fastSyncEnabled, syncConfig.getDownloaderParallelism(), syncConfig.getTransactionsParallelism(), @@ -369,4 +362,18 @@ public abstract class BesuControllerBuilder { metricsSystem, ethereumWireProtocolConfiguration); } + + protected List createPeerValidators(final ProtocolSchedule protocolSchedule) { + final List validators = new ArrayList<>(); + + final OptionalLong daoBlock = + genesisConfig.getConfigOptions(genesisConfigOverrides).getDaoForkBlock(); + if (daoBlock.isPresent()) { + // Setup dao validator + validators.add( + new DaoForkPeerValidator(protocolSchedule, metricsSystem, daoBlock.getAsLong())); + } + + return validators; + } } diff --git a/besu/src/main/java/org/hyperledger/besu/controller/IbftLegacyBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/IbftLegacyBesuControllerBuilder.java index b177ece3b3..eda3da8d9c 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/IbftLegacyBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/IbftLegacyBesuControllerBuilder.java @@ -29,12 +29,15 @@ import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; +import java.util.List; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -97,12 +100,15 @@ public class IbftLegacyBesuControllerBuilder extends BesuControllerBuilder protocolContext, final boolean fastSyncEnabled) { + final ProtocolContext protocolContext, + final boolean fastSyncEnabled, + final List peerValidators) { LOG.info("Operating on IBFT-1.0 network."); return new Istanbul64ProtocolManager( protocolContext.getBlockchain(), protocolContext.getWorldStateArchive(), networkId, + peerValidators, fastSyncEnabled, syncConfig.getDownloaderParallelism(), syncConfig.getTransactionsParallelism(), diff --git a/besu/src/test/java/org/hyperledger/besu/RunnerTest.java b/besu/src/test/java/org/hyperledger/besu/RunnerTest.java index 69fd957105..e01834a53b 100644 --- a/besu/src/test/java/org/hyperledger/besu/RunnerTest.java +++ b/besu/src/test/java/org/hyperledger/besu/RunnerTest.java @@ -19,6 +19,7 @@ import static org.hyperledger.besu.cli.config.NetworkName.DEV; import org.hyperledger.besu.cli.config.EthNetworkConfig; import org.hyperledger.besu.config.GenesisConfigFile; +import org.hyperledger.besu.config.JsonUtil; import org.hyperledger.besu.controller.BesuController; import org.hyperledger.besu.controller.KeyPairUtil; import org.hyperledger.besu.controller.MainnetBesuControllerBuilder; @@ -61,8 +62,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -110,15 +113,16 @@ public final class RunnerTest { @Test public void fullSyncFromGenesis() throws Exception { - syncFromGenesis(SyncMode.FULL); + syncFromGenesis(SyncMode.FULL, GenesisConfigFile.mainnet()); } @Test public void fastSyncFromGenesis() throws Exception { - syncFromGenesis(SyncMode.FAST); + syncFromGenesis(SyncMode.FAST, getFastSyncGenesis()); } - private void syncFromGenesis(final SyncMode mode) throws Exception { + private void syncFromGenesis(final SyncMode mode, final GenesisConfigFile genesisConfig) + throws Exception { final Path dataDirAhead = temp.newFolder().toPath(); final Path dbAhead = dataDirAhead.resolve("database"); final int blockCount = 500; @@ -131,7 +135,7 @@ public final class RunnerTest { // Setup state with block data try (final BesuController controller = new MainnetBesuControllerBuilder() - .genesisConfigFile(GenesisConfigFile.mainnet()) + .genesisConfigFile(genesisConfig) .synchronizerConfiguration(syncConfigAhead) .ethProtocolConfiguration(EthProtocolConfiguration.defaultConfig()) .dataDirectory(dataDirAhead) @@ -150,7 +154,7 @@ public final class RunnerTest { // Setup Runner with blocks final BesuController controllerAhead = new MainnetBesuControllerBuilder() - .genesisConfigFile(GenesisConfigFile.mainnet()) + .genesisConfigFile(genesisConfig) .synchronizerConfiguration(syncConfigAhead) .ethProtocolConfiguration(EthProtocolConfiguration.defaultConfig()) .dataDirectory(dataDirAhead) @@ -208,7 +212,7 @@ public final class RunnerTest { // Setup runner with no block data final BesuController controllerBehind = new MainnetBesuControllerBuilder() - .genesisConfigFile(GenesisConfigFile.mainnet()) + .genesisConfigFile(genesisConfig) .synchronizerConfiguration(syncConfigBehind) .ethProtocolConfiguration(EthProtocolConfiguration.defaultConfig()) .dataDirectory(dataDirBehind) @@ -337,6 +341,18 @@ public final class RunnerTest { } } + private GenesisConfigFile getFastSyncGenesis() { + final ObjectNode jsonNode = GenesisConfigFile.mainnetJsonNode(); + final Optional configNode = JsonUtil.getObjectNode(jsonNode, "config"); + configNode.ifPresent( + (node) -> { + // Clear DAO block so that inability to validate DAO block won't interfere with fast sync + node.remove("daoForkBlock"); + node.put("daoForkSupport", false); + }); + return GenesisConfigFile.fromConfig(jsonNode); + } + private StorageProvider createKeyValueStorageProvider(final Path dbAhead) { return new KeyValueStorageProviderBuilder() .withStorageFactory( diff --git a/config/src/main/java/org/hyperledger/besu/config/GenesisConfigFile.java b/config/src/main/java/org/hyperledger/besu/config/GenesisConfigFile.java index c3b0547b12..3210ab6832 100644 --- a/config/src/main/java/org/hyperledger/besu/config/GenesisConfigFile.java +++ b/config/src/main/java/org/hyperledger/besu/config/GenesisConfigFile.java @@ -50,6 +50,16 @@ public class GenesisConfigFile { } } + public static ObjectNode mainnetJsonNode() { + try { + final String jsonString = + Resources.toString(GenesisConfigFile.class.getResource("/mainnet.json"), UTF_8); + return JsonUtil.objectNodeFromString(jsonString, false); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + public static GenesisConfigFile development() { try { return fromConfig( diff --git a/consensus/ibftlegacy/src/main/java/org/hyperledger/besu/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java b/consensus/ibftlegacy/src/main/java/org/hyperledger/besu/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java index bad80f521b..4504cf86e4 100644 --- a/consensus/ibftlegacy/src/main/java/org/hyperledger/besu/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java +++ b/consensus/ibftlegacy/src/main/java/org/hyperledger/besu/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java @@ -17,6 +17,7 @@ import static java.util.Collections.singletonList; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -32,6 +33,7 @@ public class Istanbul64ProtocolManager extends EthProtocolManager { final Blockchain blockchain, final WorldStateArchive worldStateArchive, final BigInteger networkId, + final List peerValidators, final boolean fastSyncEnabled, final int syncWorkers, final int txWorkers, @@ -43,6 +45,7 @@ public class Istanbul64ProtocolManager extends EthProtocolManager { blockchain, worldStateArchive, networkId, + peerValidators, fastSyncEnabled, syncWorkers, txWorkers, @@ -52,28 +55,6 @@ public class Istanbul64ProtocolManager extends EthProtocolManager { ethereumWireProtocolConfiguration); } - public Istanbul64ProtocolManager( - final Blockchain blockchain, - final WorldStateArchive worldStateArchive, - final BigInteger networkId, - final boolean fastSyncEnabled, - final int syncWorkers, - final int txWorkers, - final int computationWorkers, - final Clock clock, - final MetricsSystem metricsSystem) { - super( - blockchain, - worldStateArchive, - networkId, - fastSyncEnabled, - syncWorkers, - txWorkers, - computationWorkers, - clock, - metricsSystem); - } - @Override public List getSupportedCapabilities() { return singletonList(Istanbul64Protocol.ISTANBUL64); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java index 66386bbcfb..f548be37a1 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java @@ -21,6 +21,7 @@ import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage; import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage; import org.hyperledger.besu.ethereum.eth.messages.GetNodeDataMessage; import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; @@ -30,6 +31,7 @@ import org.hyperledger.besu.util.uint.UInt256; import java.time.Clock; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -66,11 +68,13 @@ public class EthPeer { private final AtomicReference> onStatusesExchanged = new AtomicReference<>(); private final PeerReputation reputation = new PeerReputation(); + private final Map validationStatus = new HashMap<>(); EthPeer( final PeerConnection connection, final String protocolName, final Consumer onStatusesExchanged, + final List peerValidators, final Clock clock) { this.connection = connection; this.protocolName = protocolName; @@ -86,6 +90,30 @@ public class EthPeer { })); this.chainHeadState = new ChainState(); this.onStatusesExchanged.set(onStatusesExchanged); + for (PeerValidator peerValidator : peerValidators) { + validationStatus.put(peerValidator, false); + } + } + + public void markValidated(final PeerValidator validator) { + if (!validationStatus.containsKey(validator)) { + throw new IllegalArgumentException("Attempt to update unknown validation status"); + } + validationStatus.put(validator, true); + } + + /** + * Check if this peer has been fully validated. + * + * @return {@code true} if all peer validation logic has run and successfully validated this peer + */ + public boolean isFullyValidated() { + for (Boolean isValid : validationStatus.values()) { + if (!isValid) { + return false; + } + } + return true; } public boolean isDisconnected() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index 48c65b04d3..abcd28d614 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -13,6 +13,7 @@ package org.hyperledger.besu.ethereum.eth.manager; import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -22,9 +23,11 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -58,9 +61,11 @@ public class EthPeers { pendingRequests::size); } - void registerConnection(final PeerConnection peerConnection) { + void registerConnection( + final PeerConnection peerConnection, final List peerValidators) { final EthPeer peer = - new EthPeer(peerConnection, protocolName, this::invokeConnectionCallbacks, clock); + new EthPeer( + peerConnection, protocolName, this::invokeConnectionCallbacks, peerValidators, clock); connections.putIfAbsent(peerConnection, peer); } @@ -127,7 +132,11 @@ public class EthPeers { } public Optional bestPeerWithHeightEstimate() { - return streamAvailablePeers().filter(p -> p.chainState().hasEstimatedHeight()).max(BEST_CHAIN); + return bestPeerMatchingCriteria(p -> p.chainState().hasEstimatedHeight()); + } + + public Optional bestPeerMatchingCriteria(final Predicate matchesCriteria) { + return streamAvailablePeers().filter(matchesCriteria::test).max(BEST_CHAIN); } @FunctionalInterface diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index c58ee7db43..02091eb302 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -22,6 +22,8 @@ import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.StatusMessage; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidatorRunner; import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster; import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; @@ -66,17 +68,20 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { private List supportedCapabilities; private final Blockchain blockchain; private final BlockBroadcaster blockBroadcaster; + private final List peerValidators; public EthProtocolManager( final Blockchain blockchain, final WorldStateArchive worldStateArchive, final BigInteger networkId, + final List peerValidators, final boolean fastSyncEnabled, final EthScheduler scheduler, final EthProtocolConfiguration ethereumWireProtocolConfiguration, final Clock clock, final MetricsSystem metricsSystem) { this.networkId = networkId; + this.peerValidators = peerValidators; this.scheduler = scheduler; this.blockchain = blockchain; this.fastSyncEnabled = fastSyncEnabled; @@ -90,6 +95,11 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { this.blockBroadcaster = new BlockBroadcaster(ethContext); + // Run validators + for (final PeerValidator peerValidator : this.peerValidators) { + PeerValidatorRunner.runValidator(ethContext, peerValidator); + } + // Set up request handlers new EthServer(blockchain, worldStateArchive, ethMessages, ethereumWireProtocolConfiguration); } @@ -98,6 +108,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { final Blockchain blockchain, final WorldStateArchive worldStateArchive, final BigInteger networkId, + final List peerValidators, final boolean fastSyncEnabled, final int syncWorkers, final int txWorkers, @@ -108,6 +119,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { blockchain, worldStateArchive, networkId, + peerValidators, fastSyncEnabled, new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem), EthProtocolConfiguration.defaultConfig(), @@ -119,6 +131,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { final Blockchain blockchain, final WorldStateArchive worldStateArchive, final BigInteger networkId, + final List peerValidators, final boolean fastSyncEnabled, final int syncWorkers, final int txWorkers, @@ -130,6 +143,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { blockchain, worldStateArchive, networkId, + peerValidators, fastSyncEnabled, new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem), ethereumWireProtocolConfiguration, @@ -212,7 +226,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { @Override public void handleNewConnection(final PeerConnection connection) { - ethPeers.registerConnection(connection); + ethPeers.registerConnection(connection, peerValidators); final EthPeer peer = ethPeers.peer(connection); if (peer.statusHasBeenSentToPeer()) { return; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/DaoForkPeerValidator.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/DaoForkPeerValidator.java index d696d98498..175ac9085d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/DaoForkPeerValidator.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/DaoForkPeerValidator.java @@ -34,7 +34,6 @@ public class DaoForkPeerValidator implements PeerValidator { private static final Logger LOG = LogManager.getLogger(); private static long DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER = 10L; - private final EthContext ethContext; private final ProtocolSchedule protocolSchedule; private final MetricsSystem metricsSystem; @@ -42,14 +41,12 @@ public class DaoForkPeerValidator implements PeerValidator { // Wait for peer's chainhead to advance some distance beyond daoBlockNumber before validating private final long chainHeightEstimationBuffer; - public DaoForkPeerValidator( - final EthContext ethContext, + DaoForkPeerValidator( final ProtocolSchedule protocolSchedule, final MetricsSystem metricsSystem, final long daoBlockNumber, final long chainHeightEstimationBuffer) { checkArgument(chainHeightEstimationBuffer >= 0); - this.ethContext = ethContext; this.protocolSchedule = protocolSchedule; this.metricsSystem = metricsSystem; this.daoBlockNumber = daoBlockNumber; @@ -57,20 +54,15 @@ public class DaoForkPeerValidator implements PeerValidator { } public DaoForkPeerValidator( - final EthContext ethContext, final ProtocolSchedule protocolSchedule, final MetricsSystem metricsSystem, final long daoBlockNumber) { - this( - ethContext, - protocolSchedule, - metricsSystem, - daoBlockNumber, - DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER); + this(protocolSchedule, metricsSystem, daoBlockNumber, DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER); } @Override - public CompletableFuture validatePeer(final EthPeer ethPeer) { + public CompletableFuture validatePeer( + final EthContext ethContext, final EthPeer ethPeer) { AbstractPeerTask> getHeaderTask = GetHeadersFromPeerByNumberTask.forSingleNumber( protocolSchedule, ethContext, daoBlockNumber, metricsSystem) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidator.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidator.java index 24877cd3de..51033ef73e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidator.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidator.java @@ -12,6 +12,7 @@ */ package org.hyperledger.besu.ethereum.eth.peervalidation; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; @@ -40,10 +41,11 @@ public interface PeerValidator { /** * Validates the given peer. * + * @param ethContext Utilities for working with the eth sub-protocol. * @param ethPeer The peer to be validated. * @return True if the peer is valid, false otherwise. */ - CompletableFuture validatePeer(final EthPeer ethPeer); + CompletableFuture validatePeer(final EthContext ethContext, final EthPeer ethPeer); /** * @param ethPeer The peer to be disconnected. diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidatorRunner.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidatorRunner.java index 6822bd3c49..2c25a71833 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidatorRunner.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidatorRunner.java @@ -39,12 +39,14 @@ public class PeerValidatorRunner { public void checkPeer(final EthPeer ethPeer) { if (peerValidator.canBeValidated(ethPeer)) { peerValidator - .validatePeer(ethPeer) + .validatePeer(ethContext, ethPeer) .whenComplete( (validated, err) -> { if (err != null || !validated) { // Disconnect invalid peer disconnectPeer(ethPeer); + } else { + ethPeer.markValidated(peerValidator); } }); } else if (!ethPeer.isDisconnected()) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java index 7f37a319de..f069d730d8 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java @@ -19,6 +19,7 @@ import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -113,15 +114,15 @@ public class FastSyncActions { private CompletableFuture selectPivotBlockFromPeers() { return ethContext .getEthPeers() - .bestPeerWithHeightEstimate() + .bestPeerMatchingCriteria(this::canPeerDeterminePivotBlock) // Only select a pivot block number when we have a minimum number of height estimates .filter( peer -> { - final long peerCount = countPeersWithEstimatedHeight(); + final long peerCount = countPeersThatCanDeterminePivotBlock(); final int minPeerCount = syncConfig.getFastSyncMinimumPeerCount(); if (peerCount < minPeerCount) { LOG.info( - "Waiting for peers with chain height information. {} / {} required peers currently available.", + "Waiting for valid peers with chain height information. {} / {} required peers currently available.", peerCount, minPeerCount); return false; @@ -134,7 +135,7 @@ public class FastSyncActions { peer.chainState().getEstimatedHeight() - syncConfig.getFastSyncPivotDistance(); if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) { // Peer's chain isn't long enough, return an empty value so we can try again. - LOG.info("Waiting for peer with sufficient chain height"); + LOG.info("Waiting for peers with sufficient chain height"); return null; } LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber); @@ -145,14 +146,18 @@ public class FastSyncActions { .orElseGet(this::retrySelectPivotBlockAfterDelay); } - private long countPeersWithEstimatedHeight() { + private long countPeersThatCanDeterminePivotBlock() { return ethContext .getEthPeers() .streamAvailablePeers() - .filter(peer -> peer.chainState().hasEstimatedHeight()) + .filter(this::canPeerDeterminePivotBlock) .count(); } + private boolean canPeerDeterminePivotBlock(final EthPeer peer) { + return peer.chainState().hasEstimatedHeight() && peer.isFullyValidated(); + } + private CompletableFuture retrySelectPivotBlockAfterDelay() { return ethContext .getScheduler() diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetriever.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetriever.java index 9a96b274ce..fa764251f6 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetriever.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetriever.java @@ -77,6 +77,7 @@ public class PivotBlockRetriever { .getEthPeers() .streamAvailablePeers() .filter(peer -> peer.chainState().getEstimatedHeight() >= pivotBlockNumber) + .filter(EthPeer::isFullyValidated) .collect(Collectors.toList()); final int confirmationsRequired = peersToQuery.size() / 2 + 1; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java index 8cb3fa4c78..9497a6fc1b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java @@ -16,6 +16,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.eth.EthProtocol; @@ -23,16 +24,22 @@ import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage; import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage; import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage; import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.testutil.TestClock; +import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.Test; @@ -260,6 +267,61 @@ public class EthPeerTest { assertThat(bodiesClosedCount.get()).isEqualTo(1); } + @Test + public void isFullyValidated_noPeerValidators() { + final EthPeer peer = createPeer(); + assertThat(peer.isFullyValidated()).isTrue(); + } + + @Test + public void isFullyValidated_singleValidator_notValidated() { + final PeerValidator validator = mock(PeerValidator.class); + final EthPeer peer = createPeer(validator); + + assertThat(peer.isFullyValidated()).isFalse(); + } + + @Test + public void isFullyValidated_singleValidator_validated() { + final PeerValidator validator = mock(PeerValidator.class); + final EthPeer peer = createPeer(validator); + peer.markValidated(validator); + + assertThat(peer.isFullyValidated()).isTrue(); + } + + @Test + public void isFullyValidated_multipleValidators_unvalidated() { + final List validators = + Stream.generate(() -> mock(PeerValidator.class)).limit(2).collect(Collectors.toList()); + + final EthPeer peer = createPeer(validators); + + assertThat(peer.isFullyValidated()).isFalse(); + } + + @Test + public void isFullyValidated_multipleValidators_partiallyValidated() { + final List validators = + Stream.generate(() -> mock(PeerValidator.class)).limit(2).collect(Collectors.toList()); + + final EthPeer peer = createPeer(validators); + peer.markValidated(validators.get(0)); + + assertThat(peer.isFullyValidated()).isFalse(); + } + + @Test + public void isFullyValidated_multipleValidators_fullyValidated() { + final List validators = + Stream.generate(() -> mock(PeerValidator.class)).limit(2).collect(Collectors.toList()); + + final EthPeer peer = createPeer(validators); + validators.forEach(peer::markValidated); + + assertThat(peer.isFullyValidated()).isTrue(); + } + private void messageStream( final ResponseStreamSupplier getStream, final MessageData targetMessage, @@ -338,10 +400,18 @@ public class EthPeerTest { } private EthPeer createPeer() { + return createPeer(Collections.emptyList()); + } + + private EthPeer createPeer(final PeerValidator... peerValidators) { + return createPeer(Arrays.asList(peerValidators)); + } + + private EthPeer createPeer(final List peerValidators) { final Set caps = new HashSet<>(singletonList(EthProtocol.ETH63)); final PeerConnection peerConnection = new MockPeerConnection(caps); final Consumer onPeerReady = (peer) -> {}; - return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady, clock); + return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady, peerValidators, clock); } @FunctionalInterface diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java index b8eccdab82..f4a71a2aa7 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java @@ -115,6 +115,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -137,6 +138,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -160,6 +162,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -194,6 +197,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -228,6 +232,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -254,6 +259,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -296,6 +302,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -337,6 +344,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -377,6 +385,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -420,6 +429,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -484,6 +494,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -525,6 +536,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -563,6 +575,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -617,6 +630,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -670,6 +684,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -717,6 +732,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -770,6 +786,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -822,6 +839,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -871,6 +889,7 @@ public final class EthProtocolManagerTest { blockchain, worldStateArchive, BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -923,6 +942,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -996,6 +1016,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, 1, 1, @@ -1065,6 +1086,7 @@ public final class EthProtocolManagerTest { blockchain, protocolContext.getWorldStateArchive(), BigInteger.ONE, + Collections.emptyList(), true, ethScheduler, EthProtocolConfiguration.defaultConfig(), diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java index 0f3072b7d0..8bee205c4c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java @@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.chain.GenesisState; import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.mainnet.MainnetProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.DefaultMessage; @@ -33,6 +34,7 @@ import org.hyperledger.besu.testutil.TestClock; import org.hyperledger.besu.util.uint.UInt256; import java.math.BigInteger; +import java.util.Collections; import java.util.OptionalLong; public class EthProtocolManagerTestUtil { @@ -53,6 +55,7 @@ public class EthProtocolManagerTestUtil { blockchain, worldStateArchive, networkId, + Collections.emptyList(), false, ethScheduler, EthProtocolConfiguration.defaultConfig(), @@ -130,39 +133,82 @@ public class EthProtocolManagerTestUtil { EthProtocol.ETH63, new DefaultMessage(peer.getPeerConnection(), message)); } + public static RespondingEthPeer.Builder peerBuilder() { + return RespondingEthPeer.builder(); + } + public static RespondingEthPeer createPeer( final EthProtocolManager ethProtocolManager, final UInt256 td) { - return RespondingEthPeer.create(ethProtocolManager, td); + return RespondingEthPeer.builder() + .ethProtocolManager(ethProtocolManager) + .totalDifficulty(td) + .build(); } public static RespondingEthPeer createPeer( final EthProtocolManager ethProtocolManager, final UInt256 td, final long estimatedHeight) { - return RespondingEthPeer.create(ethProtocolManager, td, estimatedHeight); + return RespondingEthPeer.builder() + .ethProtocolManager(ethProtocolManager) + .totalDifficulty(td) + .estimatedHeight(estimatedHeight) + .build(); } public static RespondingEthPeer createPeer( final EthProtocolManager ethProtocolManager, final UInt256 td, final OptionalLong estimatedHeight) { - return RespondingEthPeer.create(ethProtocolManager, td, estimatedHeight); + return RespondingEthPeer.builder() + .ethProtocolManager(ethProtocolManager) + .totalDifficulty(td) + .estimatedHeight(estimatedHeight) + .build(); + } + + public static RespondingEthPeer createPeer( + final EthProtocolManager ethProtocolManager, + final UInt256 td, + final OptionalLong estimatedHeight, + final PeerValidator... validators) { + return RespondingEthPeer.builder() + .ethProtocolManager(ethProtocolManager) + .totalDifficulty(td) + .estimatedHeight(estimatedHeight) + .peerValidators(validators) + .build(); } public static RespondingEthPeer createPeer(final EthProtocolManager ethProtocolManager) { - return RespondingEthPeer.create(ethProtocolManager, UInt256.of(1000L)); + return RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build(); } public static RespondingEthPeer createPeer( final EthProtocolManager ethProtocolManager, final long estimatedHeight) { - return RespondingEthPeer.create(ethProtocolManager, UInt256.of(1000L), estimatedHeight); + return RespondingEthPeer.builder() + .ethProtocolManager(ethProtocolManager) + .estimatedHeight(estimatedHeight) + .build(); + } + + public static RespondingEthPeer createPeer( + final EthProtocolManager ethProtocolManager, + final long estimatedHeight, + final PeerValidator... validators) { + return RespondingEthPeer.builder() + .ethProtocolManager(ethProtocolManager) + .estimatedHeight(estimatedHeight) + .peerValidators(validators) + .build(); } public static RespondingEthPeer createPeer( final EthProtocolManager ethProtocolManager, final Blockchain blockchain) { final ChainHead head = blockchain.getChainHead(); - return RespondingEthPeer.create( - ethProtocolManager, - head.getHash(), - head.getTotalDifficulty(), - blockchain.getChainHeadBlockNumber()); + return RespondingEthPeer.builder() + .ethProtocolManager(ethProtocolManager) + .totalDifficulty(head.getTotalDifficulty()) + .chainHeadHash(head.getHash()) + .estimatedHeight(blockchain.getChainHeadBlockNumber()) + .build(); } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java index 3a4c160fcf..5171a1588d 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java @@ -217,6 +217,7 @@ public class RequestManagerTest { final Set caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63)); final PeerConnection peerConnection = new MockPeerConnection(caps); final Consumer onPeerReady = (peer) -> {}; - return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady, TestClock.fixed()); + return new EthPeer( + peerConnection, EthProtocol.NAME, onPeerReady, Collections.emptyList(), TestClock.fixed()); } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java index 8f9f7ad99e..29fb75cc23 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java @@ -13,6 +13,7 @@ package org.hyperledger.besu.ethereum.eth.manager; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static org.hyperledger.besu.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive; import org.hyperledger.besu.ethereum.chain.Blockchain; @@ -28,6 +29,7 @@ import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.EthPV63; import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage; import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.DefaultMessage; @@ -55,7 +57,6 @@ import com.google.common.collect.Lists; public class RespondingEthPeer { private static final BlockDataGenerator gen = new BlockDataGenerator(); - private static final int DEFAULT_ESTIMATED_HEIGHT = 1000; private final EthPeer ethPeer; private final BlockingQueue outgoingMessages; private final EthProtocolManager ethProtocolManager; @@ -98,48 +99,16 @@ public class RespondingEthPeer { return peerConnection; } - public static RespondingEthPeer create( - final EthProtocolManager ethProtocolManager, final UInt256 totalDifficulty) { - return create(ethProtocolManager, totalDifficulty, DEFAULT_ESTIMATED_HEIGHT); + public static Builder builder() { + return new Builder(); } - public static RespondingEthPeer create( - final EthProtocolManager ethProtocolManager, - final UInt256 totalDifficulty, - final long estimatedHeight) { - final Hash chainHeadHash = gen.hash(); - return create(ethProtocolManager, chainHeadHash, totalDifficulty, estimatedHeight); - } - - public static RespondingEthPeer create( - final EthProtocolManager ethProtocolManager, - final UInt256 totalDifficulty, - final OptionalLong estimatedHeight) { - final Hash chainHeadHash = gen.hash(); - return create(ethProtocolManager, chainHeadHash, totalDifficulty, estimatedHeight); - } - - public static RespondingEthPeer create( - final EthProtocolManager ethProtocolManager, - final Hash chainHeadHash, - final UInt256 totalDifficulty) { - return create(ethProtocolManager, chainHeadHash, totalDifficulty, DEFAULT_ESTIMATED_HEIGHT); - } - - public static RespondingEthPeer create( + private static RespondingEthPeer create( final EthProtocolManager ethProtocolManager, final Hash chainHeadHash, final UInt256 totalDifficulty, - final long estimatedHeight) { - return create( - ethProtocolManager, chainHeadHash, totalDifficulty, OptionalLong.of(estimatedHeight)); - } - - public static RespondingEthPeer create( - final EthProtocolManager ethProtocolManager, - final Hash chainHeadHash, - final UInt256 totalDifficulty, - final OptionalLong estimatedHeight) { + final OptionalLong estimatedHeight, + final List peerValidators) { final EthPeers ethPeers = ethProtocolManager.ethContext().getEthPeers(); final Set caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63)); @@ -147,7 +116,7 @@ public class RespondingEthPeer { final MockPeerConnection peerConnection = new MockPeerConnection( caps, (cap, msg, conn) -> outgoingMessages.add(new OutgoingMessage(cap, msg))); - ethPeers.registerConnection(peerConnection); + ethPeers.registerConnection(peerConnection, peerValidators); final EthPeer peer = ethPeers.peer(peerConnection); peer.registerStatusReceived(chainHeadHash, totalDifficulty); estimatedHeight.ifPresent(height -> peer.chainState().update(chainHeadHash, height)); @@ -365,6 +334,61 @@ public class RespondingEthPeer { }; } + public static class Builder { + private EthProtocolManager ethProtocolManager; + private Hash chainHeadHash = gen.hash(); + private UInt256 totalDifficulty = UInt256.of(1000L); + private OptionalLong estimatedHeight = OptionalLong.of(1000L); + private List peerValidators = new ArrayList<>(); + + public RespondingEthPeer build() { + checkNotNull(ethProtocolManager, "Must configure EthProtocolManager"); + + return RespondingEthPeer.create( + ethProtocolManager, chainHeadHash, totalDifficulty, estimatedHeight, peerValidators); + } + + public Builder ethProtocolManager(final EthProtocolManager ethProtocolManager) { + checkNotNull(ethProtocolManager); + this.ethProtocolManager = ethProtocolManager; + return this; + } + + public Builder chainHeadHash(final Hash chainHeadHash) { + checkNotNull(chainHeadHash); + this.chainHeadHash = chainHeadHash; + return this; + } + + public Builder totalDifficulty(final UInt256 totalDifficulty) { + checkNotNull(totalDifficulty); + this.totalDifficulty = totalDifficulty; + return this; + } + + public Builder estimatedHeight(final OptionalLong estimatedHeight) { + checkNotNull(estimatedHeight); + this.estimatedHeight = estimatedHeight; + return this; + } + + public Builder estimatedHeight(final long estimatedHeight) { + this.estimatedHeight = OptionalLong.of(estimatedHeight); + return this; + } + + public Builder peerValidators(final List peerValidators) { + checkNotNull(peerValidators); + this.peerValidators.addAll(peerValidators); + return this; + } + + public Builder peerValidators(final PeerValidator... peerValidators) { + peerValidators(Arrays.asList(peerValidators)); + return this; + } + } + static class OutgoingMessage { private final Capability capability; private final MessageData messageData; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/DaoForkPeerValidatorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/DaoForkPeerValidatorTest.java index e7a50e77e3..f027be57e4 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/DaoForkPeerValidatorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/DaoForkPeerValidatorTest.java @@ -53,16 +53,13 @@ public class DaoForkPeerValidatorTest { PeerValidator validator = new DaoForkPeerValidator( - ethProtocolManager.ethContext(), - MainnetProtocolSchedule.create(), - new NoOpMetricsSystem(), - daoBlockNumber, - 0); + MainnetProtocolSchedule.create(), new NoOpMetricsSystem(), daoBlockNumber, 0); RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber); - CompletableFuture result = validator.validatePeer(peer.getEthPeer()); + CompletableFuture result = + validator.validatePeer(ethProtocolManager.ethContext(), peer.getEthPeer()); assertThat(result).isNotDone(); @@ -85,16 +82,13 @@ public class DaoForkPeerValidatorTest { PeerValidator validator = new DaoForkPeerValidator( - ethProtocolManager.ethContext(), - MainnetProtocolSchedule.create(), - new NoOpMetricsSystem(), - daoBlockNumber, - 0); + MainnetProtocolSchedule.create(), new NoOpMetricsSystem(), daoBlockNumber, 0); RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber); - CompletableFuture result = validator.validatePeer(peer.getEthPeer()); + CompletableFuture result = + validator.validatePeer(ethProtocolManager.ethContext(), peer.getEthPeer()); assertThat(result).isNotDone(); @@ -114,16 +108,13 @@ public class DaoForkPeerValidatorTest { PeerValidator validator = new DaoForkPeerValidator( - ethProtocolManager.ethContext(), - MainnetProtocolSchedule.create(), - new NoOpMetricsSystem(), - daoBlockNumber, - 0); + MainnetProtocolSchedule.create(), new NoOpMetricsSystem(), daoBlockNumber, 0); RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber); - CompletableFuture result = validator.validatePeer(peer.getEthPeer()); + CompletableFuture result = + validator.validatePeer(ethProtocolManager.ethContext(), peer.getEthPeer()); // Request should timeout immediately assertThat(result).isDone(); @@ -143,11 +134,7 @@ public class DaoForkPeerValidatorTest { PeerValidator validator = new DaoForkPeerValidator( - ethProtocolManager.ethContext(), - MainnetProtocolSchedule.create(), - new NoOpMetricsSystem(), - daoBlockNumber, - 0); + MainnetProtocolSchedule.create(), new NoOpMetricsSystem(), daoBlockNumber, 0); int peerCount = 1000; List otherPeers = @@ -158,7 +145,8 @@ public class DaoForkPeerValidatorTest { RespondingEthPeer targetPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber); - CompletableFuture result = validator.validatePeer(targetPeer.getEthPeer()); + CompletableFuture result = + validator.validatePeer(ethProtocolManager.ethContext(), targetPeer.getEthPeer()); assertThat(result).isNotDone(); @@ -183,11 +171,7 @@ public class DaoForkPeerValidatorTest { PeerValidator validator = new DaoForkPeerValidator( - ethProtocolManager.ethContext(), - MainnetProtocolSchedule.create(), - new NoOpMetricsSystem(), - daoBlockNumber, - buffer); + MainnetProtocolSchedule.create(), new NoOpMetricsSystem(), daoBlockNumber, buffer); EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0).getEthPeer(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidatorRunnerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidatorRunnerTest.java index fcdb82ef3d..d1d30936e3 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidatorRunnerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidatorRunnerTest.java @@ -12,6 +12,7 @@ */ package org.hyperledger.besu.ethereum.eth.peervalidation; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -34,88 +35,130 @@ public class PeerValidatorRunnerTest { @Test public void checkPeer_schedulesFutureCheckWhenPeerNotReady() { - EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final PeerValidator validator = mock(PeerValidator.class); + + final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); - EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer(); + final EthPeer peer = + EthProtocolManagerTestUtil.peerBuilder() + .ethProtocolManager(ethProtocolManager) + .peerValidators(validator) + .build() + .getEthPeer(); + + assertThat(peer.isFullyValidated()).isFalse(); - PeerValidator validator = mock(PeerValidator.class); when(validator.canBeValidated(eq(peer))).thenReturn(false); when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30)); - PeerValidatorRunner runner = + final PeerValidatorRunner runner = spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator)); runner.checkPeer(peer); + assertThat(peer.isFullyValidated()).isFalse(); + verify(runner, times(1)).checkPeer(eq(peer)); - verify(validator, never()).validatePeer(eq(peer)); + verify(validator, never()).validatePeer(eq(ethProtocolManager.ethContext()), eq(peer)); verify(runner, never()).disconnectPeer(eq(peer)); verify(runner, times(1)).scheduleNextCheck(eq(peer)); // Run pending futures to trigger the next check EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); verify(runner, times(2)).checkPeer(eq(peer)); - verify(validator, never()).validatePeer(eq(peer)); + verify(validator, never()).validatePeer(eq(ethProtocolManager.ethContext()), eq(peer)); verify(runner, never()).disconnectPeer(eq(peer)); verify(runner, times(2)).scheduleNextCheck(eq(peer)); + + assertThat(peer.isFullyValidated()).isFalse(); } @Test public void checkPeer_doesNotScheduleFutureCheckWhenPeerNotReadyAndDisconnected() { - EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final PeerValidator validator = mock(PeerValidator.class); + + final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); - EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer(); + final EthPeer peer = + EthProtocolManagerTestUtil.peerBuilder() + .ethProtocolManager(ethProtocolManager) + .peerValidators(validator) + .build() + .getEthPeer(); peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED); - PeerValidator validator = mock(PeerValidator.class); when(validator.canBeValidated(eq(peer))).thenReturn(false); when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30)); - PeerValidatorRunner runner = + final PeerValidatorRunner runner = spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator)); runner.checkPeer(peer); verify(runner, times(1)).checkPeer(eq(peer)); - verify(validator, never()).validatePeer(eq(peer)); + verify(validator, never()).validatePeer(eq(ethProtocolManager.ethContext()), eq(peer)); verify(runner, never()).disconnectPeer(eq(peer)); verify(runner, times(0)).scheduleNextCheck(eq(peer)); } @Test public void checkPeer_handlesInvalidPeer() { - EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final PeerValidator validator = mock(PeerValidator.class); + + final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); - EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer(); + final EthPeer peer = + EthProtocolManagerTestUtil.peerBuilder() + .ethProtocolManager(ethProtocolManager) + .peerValidators(validator) + .build() + .getEthPeer(); + + assertThat(peer.isFullyValidated()).isFalse(); - PeerValidator validator = mock(PeerValidator.class); when(validator.canBeValidated(eq(peer))).thenReturn(true); - when(validator.validatePeer(eq(peer))).thenReturn(CompletableFuture.completedFuture(false)); + when(validator.validatePeer(eq(ethProtocolManager.ethContext()), eq(peer))) + .thenReturn(CompletableFuture.completedFuture(false)); when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30)); - PeerValidatorRunner runner = + assertThat(peer.isFullyValidated()).isFalse(); + + final PeerValidatorRunner runner = spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator)); runner.checkPeer(peer); - verify(validator, times(1)).validatePeer(eq(peer)); + verify(validator, times(1)).validatePeer(eq(ethProtocolManager.ethContext()), eq(peer)); verify(runner, times(1)).disconnectPeer(eq(peer)); verify(runner, never()).scheduleNextCheck(eq(peer)); + + assertThat(peer.isFullyValidated()).isFalse(); } @Test public void checkPeer_handlesValidPeer() { - EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final PeerValidator validator = mock(PeerValidator.class); + + final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); - EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer(); + final EthPeer peer = + EthProtocolManagerTestUtil.peerBuilder() + .ethProtocolManager(ethProtocolManager) + .peerValidators(validator) + .build() + .getEthPeer(); + + assertThat(peer.isFullyValidated()).isFalse(); - PeerValidator validator = mock(PeerValidator.class); when(validator.canBeValidated(eq(peer))).thenReturn(true); - when(validator.validatePeer(eq(peer))).thenReturn(CompletableFuture.completedFuture(true)); + when(validator.validatePeer(eq(ethProtocolManager.ethContext()), eq(peer))) + .thenReturn(CompletableFuture.completedFuture(true)); when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30)); - PeerValidatorRunner runner = + final PeerValidatorRunner runner = spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator)); runner.checkPeer(peer); - verify(validator, times(1)).validatePeer(eq(peer)); + assertThat(peer.isFullyValidated()).isTrue(); + + verify(validator, times(1)).validatePeer(eq(ethProtocolManager.ethContext()), eq(peer)); verify(runner, never()).disconnectPeer(eq(peer)); verify(runner, never()).scheduleNextCheck(eq(peer)); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTrackerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTrackerTest.java index e199f5e65c..a454c83f15 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTrackerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTrackerTest.java @@ -38,11 +38,12 @@ public class ChainHeadTrackerTest { private final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(blockchain, blockchainSetupUtil.getWorldArchive()); private final RespondingEthPeer respondingPeer = - RespondingEthPeer.create( - ethProtocolManager, - blockchain.getChainHeadHash(), - blockchain.getChainHead().getTotalDifficulty(), - 0); + RespondingEthPeer.builder() + .ethProtocolManager(ethProtocolManager) + .chainHeadHash(blockchain.getChainHeadHash()) + .totalDifficulty(blockchain.getChainHead().getTotalDifficulty()) + .estimatedHeight(0) + .build(); private final ProtocolSchedule protocolSchedule = FixedDifficultyProtocolSchedule.create( GenesisConfigFile.development().getConfigOptions(), false); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java index 801c9aa20a..1a6e992d90 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java @@ -24,9 +24,11 @@ import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; @@ -204,8 +206,69 @@ public class FastSyncActionsTest { assertThat(result).isCompletedWithValue(expected); } + @Test + public void selectPivotBlockShouldWaitAndRetryIfSufficientValidatedPeersUnavailable() { + final int minPeers = 3; + final PeerValidator validator = mock(PeerValidator.class); + syncConfigBuilder.fastSyncMinimumPeerCount(minPeers); + syncConfig = syncConfigBuilder.build(); + fastSyncActions = createFastSyncActions(syncConfig); + final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L; + EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); + + // Create peers that are not validated + final OptionalLong height = OptionalLong.of(minPivotHeight + 10); + List peers = new ArrayList<>(); + for (int i = 0; i < minPeers; i++) { + final UInt256 td = UInt256.of(i); + + final RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height, validator); + peers.add(peer); + } + + // No pivot should be selected while peers are not fully validated + final CompletableFuture result = + fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE); + assertThat(result).isNotDone(); + EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); + assertThat(result).isNotDone(); + + // Validate a subset of peers + peers.subList(0, minPeers - 1).forEach(p -> p.getEthPeer().markValidated(validator)); + + // No pivot should be selected while only a subset of peers have height estimates + EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); + assertThat(result).isNotDone(); + + // Set best height and mark best peer validated + final long bestPeerHeight = minPivotHeight + 11; + final EthPeer bestPeer = peers.get(minPeers - 1).getEthPeer(); + bestPeer.chainState().updateHeightEstimate(bestPeerHeight); + bestPeer.markValidated(validator); + final FastSyncState expected = + new FastSyncState(bestPeerHeight - syncConfig.getFastSyncPivotDistance()); + EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); + assertThat(result).isCompletedWithValue(expected); + } + @Test public void selectPivotBlockUsesBestPeerWithHeightEstimate() { + selectPivotBlockUsesBestPeerMatchingRequiredCriteria(true, false); + } + + @Test + public void selectPivotBlockUsesBestPeerThatIsValidated() { + selectPivotBlockUsesBestPeerMatchingRequiredCriteria(false, true); + } + + @Test + public void selectPivotBlockUsesBestPeerThatIsValidatedAndHasHeightEstimate() { + selectPivotBlockUsesBestPeerMatchingRequiredCriteria(true, true); + } + + private void selectPivotBlockUsesBestPeerMatchingRequiredCriteria( + final boolean bestMissingHeight, final boolean bestNotValidated) { final int minPeers = 3; final int peerCount = minPeers + 1; syncConfigBuilder.fastSyncMinimumPeerCount(minPeers); @@ -215,21 +278,27 @@ public class FastSyncActionsTest { EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); // Create peers without chain height estimates + final PeerValidator validator = mock(PeerValidator.class); List peers = new ArrayList<>(); for (int i = 0; i < peerCount; i++) { // Best peer by td is the first peer, td decreases as i increases + final boolean isBest = i == 0; final UInt256 td = UInt256.of(peerCount - i); final OptionalLong height; - if (i == 0) { + if (isBest && bestMissingHeight) { // Don't set a height estimate for the best peer height = OptionalLong.empty(); } else { // Height increases with i height = OptionalLong.of(minPivotHeight + i); } + final RespondingEthPeer peer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height, validator); + if (!isBest || !bestNotValidated) { + peer.getEthPeer().markValidated(validator); + } peers.add(peer); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java index a96e2bb92b..7c06429ad4 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java @@ -24,6 +24,7 @@ import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -93,11 +94,42 @@ public class PivotBlockRetrieverTest { RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); final RespondingEthPeer respondingPeerA = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1); - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1); + final RespondingEthPeer respondingPeerB = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1); + final RespondingEthPeer respondingPeerC = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1); + + final CompletableFuture future = pivotBlockRetriever.downloadPivotBlockHeader(); + while (!future.isDone()) { + assertThat(respondingPeerB.hasOutstandingRequests()).isFalse(); + assertThat(respondingPeerC.hasOutstandingRequests()).isFalse(); + respondingPeerA.respondWhile(responder, () -> !future.isDone()); + } + + assertThat(future) + .isCompletedWithValue( + new FastSyncState(blockchain.getBlockHeader(PIVOT_BLOCK_NUMBER).get())); + } + + @Test + public void shouldIgnorePeersThatAreNotFullyValidated() { + final PeerValidator peerValidator = mock(PeerValidator.class); + final RespondingEthPeer.Responder responder = + RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); + final RespondingEthPeer respondingPeerA = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator); + final RespondingEthPeer respondingPeerB = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator); + final RespondingEthPeer respondingPeerC = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator); + + // Only mark one peer as validated + respondingPeerA.getEthPeer().markValidated(peerValidator); final CompletableFuture future = pivotBlockRetriever.downloadPivotBlockHeader(); while (!future.isDone()) { + assertThat(respondingPeerB.hasOutstandingRequests()).isFalse(); + assertThat(respondingPeerC.hasOutstandingRequests()).isFalse(); respondingPeerA.respondWhile(responder, () -> !future.isDone()); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java index 027c711f90..18c2d9b6cd 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java @@ -55,6 +55,7 @@ import org.hyperledger.besu.util.bytes.BytesValue; import java.io.Closeable; import java.io.IOException; import java.math.BigInteger; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -112,6 +113,7 @@ public class TestNode implements Closeable { blockchain, worldStateArchive, BigInteger.ONE, + Collections.emptyList(), false, 1, 1,