[PAN-2333] Use only fully validated peers for fast sync pivot selection (#21)

Expose EthPeer validation state so that the Synchronizer can choose peers based on whether or not they have been fully validated. This allows us to use only fully validated peers when choosing a pivot block.

Signed-off-by: Meredith Baxter <meredith.baxter@consensys.net>
Signed-off-by: Danno Ferrin <danno.ferrin@gmail.com>
pull/28/head
mbaxter 5 years ago committed by Danno Ferrin
parent 829d865f3d
commit 5d1d3beae2
  1. 37
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  2. 8
      besu/src/main/java/org/hyperledger/besu/controller/IbftLegacyBesuControllerBuilder.java
  3. 28
      besu/src/test/java/org/hyperledger/besu/RunnerTest.java
  4. 10
      config/src/main/java/org/hyperledger/besu/config/GenesisConfigFile.java
  5. 25
      consensus/ibftlegacy/src/main/java/org/hyperledger/besu/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java
  6. 28
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java
  7. 15
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  8. 16
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java
  9. 16
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/DaoForkPeerValidator.java
  10. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidator.java
  11. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidatorRunner.java
  12. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java
  13. 1
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetriever.java
  14. 72
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java
  15. 22
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java
  16. 66
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  17. 3
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java
  18. 102
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java
  19. 42
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/DaoForkPeerValidatorTest.java
  20. 89
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/PeerValidatorRunnerTest.java
  21. 11
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTrackerTest.java
  22. 73
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java
  23. 36
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java
  24. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java

@ -29,10 +29,9 @@ import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.DaoForkPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidatorRunner;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -245,7 +244,9 @@ public abstract class BesuControllerBuilder<C> {
}));
final boolean fastSyncEnabled = syncConfig.getSyncMode().equals(SyncMode.FAST);
ethProtocolManager = createEthProtocolManager(protocolContext, fastSyncEnabled);
ethProtocolManager =
createEthProtocolManager(
protocolContext, fastSyncEnabled, createPeerValidators(protocolSchedule));
final SyncState syncState =
new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
final Synchronizer synchronizer =
@ -262,17 +263,6 @@ public abstract class BesuControllerBuilder<C> {
clock,
metricsSystem);
final OptionalLong daoBlock =
genesisConfig.getConfigOptions(genesisConfigOverrides).getDaoForkBlock();
if (daoBlock.isPresent()) {
// Setup dao validator
final EthContext ethContext = ethProtocolManager.ethContext();
final DaoForkPeerValidator daoForkPeerValidator =
new DaoForkPeerValidator(
ethContext, protocolSchedule, metricsSystem, daoBlock.getAsLong());
PeerValidatorRunner.runValidator(ethContext, daoForkPeerValidator);
}
final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(
protocolSchedule,
@ -356,11 +346,14 @@ public abstract class BesuControllerBuilder<C> {
Blockchain blockchain, WorldStateArchive worldStateArchive);
protected EthProtocolManager createEthProtocolManager(
final ProtocolContext<C> protocolContext, final boolean fastSyncEnabled) {
final ProtocolContext<C> protocolContext,
final boolean fastSyncEnabled,
final List<PeerValidator> peerValidators) {
return new EthProtocolManager(
protocolContext.getBlockchain(),
protocolContext.getWorldStateArchive(),
networkId,
peerValidators,
fastSyncEnabled,
syncConfig.getDownloaderParallelism(),
syncConfig.getTransactionsParallelism(),
@ -369,4 +362,18 @@ public abstract class BesuControllerBuilder<C> {
metricsSystem,
ethereumWireProtocolConfiguration);
}
protected List<PeerValidator> createPeerValidators(final ProtocolSchedule<C> protocolSchedule) {
final List<PeerValidator> validators = new ArrayList<>();
final OptionalLong daoBlock =
genesisConfig.getConfigOptions(genesisConfigOverrides).getDaoForkBlock();
if (daoBlock.isPresent()) {
// Setup dao validator
validators.add(
new DaoForkPeerValidator(protocolSchedule, metricsSystem, daoBlock.getAsLong()));
}
return validators;
}
}

@ -29,12 +29,15 @@ import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -97,12 +100,15 @@ public class IbftLegacyBesuControllerBuilder extends BesuControllerBuilder<IbftC
@Override
protected EthProtocolManager createEthProtocolManager(
final ProtocolContext<IbftContext> protocolContext, final boolean fastSyncEnabled) {
final ProtocolContext<IbftContext> protocolContext,
final boolean fastSyncEnabled,
final List<PeerValidator> peerValidators) {
LOG.info("Operating on IBFT-1.0 network.");
return new Istanbul64ProtocolManager(
protocolContext.getBlockchain(),
protocolContext.getWorldStateArchive(),
networkId,
peerValidators,
fastSyncEnabled,
syncConfig.getDownloaderParallelism(),
syncConfig.getTransactionsParallelism(),

@ -19,6 +19,7 @@ import static org.hyperledger.besu.cli.config.NetworkName.DEV;
import org.hyperledger.besu.cli.config.EthNetworkConfig;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.config.JsonUtil;
import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.controller.KeyPairUtil;
import org.hyperledger.besu.controller.MainnetBesuControllerBuilder;
@ -61,8 +62,10 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
@ -110,15 +113,16 @@ public final class RunnerTest {
@Test
public void fullSyncFromGenesis() throws Exception {
syncFromGenesis(SyncMode.FULL);
syncFromGenesis(SyncMode.FULL, GenesisConfigFile.mainnet());
}
@Test
public void fastSyncFromGenesis() throws Exception {
syncFromGenesis(SyncMode.FAST);
syncFromGenesis(SyncMode.FAST, getFastSyncGenesis());
}
private void syncFromGenesis(final SyncMode mode) throws Exception {
private void syncFromGenesis(final SyncMode mode, final GenesisConfigFile genesisConfig)
throws Exception {
final Path dataDirAhead = temp.newFolder().toPath();
final Path dbAhead = dataDirAhead.resolve("database");
final int blockCount = 500;
@ -131,7 +135,7 @@ public final class RunnerTest {
// Setup state with block data
try (final BesuController<Void> controller =
new MainnetBesuControllerBuilder()
.genesisConfigFile(GenesisConfigFile.mainnet())
.genesisConfigFile(genesisConfig)
.synchronizerConfiguration(syncConfigAhead)
.ethProtocolConfiguration(EthProtocolConfiguration.defaultConfig())
.dataDirectory(dataDirAhead)
@ -150,7 +154,7 @@ public final class RunnerTest {
// Setup Runner with blocks
final BesuController<Void> controllerAhead =
new MainnetBesuControllerBuilder()
.genesisConfigFile(GenesisConfigFile.mainnet())
.genesisConfigFile(genesisConfig)
.synchronizerConfiguration(syncConfigAhead)
.ethProtocolConfiguration(EthProtocolConfiguration.defaultConfig())
.dataDirectory(dataDirAhead)
@ -208,7 +212,7 @@ public final class RunnerTest {
// Setup runner with no block data
final BesuController<Void> controllerBehind =
new MainnetBesuControllerBuilder()
.genesisConfigFile(GenesisConfigFile.mainnet())
.genesisConfigFile(genesisConfig)
.synchronizerConfiguration(syncConfigBehind)
.ethProtocolConfiguration(EthProtocolConfiguration.defaultConfig())
.dataDirectory(dataDirBehind)
@ -337,6 +341,18 @@ public final class RunnerTest {
}
}
private GenesisConfigFile getFastSyncGenesis() {
final ObjectNode jsonNode = GenesisConfigFile.mainnetJsonNode();
final Optional<ObjectNode> configNode = JsonUtil.getObjectNode(jsonNode, "config");
configNode.ifPresent(
(node) -> {
// Clear DAO block so that inability to validate DAO block won't interfere with fast sync
node.remove("daoForkBlock");
node.put("daoForkSupport", false);
});
return GenesisConfigFile.fromConfig(jsonNode);
}
private StorageProvider createKeyValueStorageProvider(final Path dbAhead) {
return new KeyValueStorageProviderBuilder()
.withStorageFactory(

@ -50,6 +50,16 @@ public class GenesisConfigFile {
}
}
public static ObjectNode mainnetJsonNode() {
try {
final String jsonString =
Resources.toString(GenesisConfigFile.class.getResource("/mainnet.json"), UTF_8);
return JsonUtil.objectNodeFromString(jsonString, false);
} catch (final IOException e) {
throw new IllegalStateException(e);
}
}
public static GenesisConfigFile development() {
try {
return fromConfig(

@ -17,6 +17,7 @@ import static java.util.Collections.singletonList;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -32,6 +33,7 @@ public class Istanbul64ProtocolManager extends EthProtocolManager {
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final List<PeerValidator> peerValidators,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
@ -43,6 +45,7 @@ public class Istanbul64ProtocolManager extends EthProtocolManager {
blockchain,
worldStateArchive,
networkId,
peerValidators,
fastSyncEnabled,
syncWorkers,
txWorkers,
@ -52,28 +55,6 @@ public class Istanbul64ProtocolManager extends EthProtocolManager {
ethereumWireProtocolConfiguration);
}
public Istanbul64ProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final Clock clock,
final MetricsSystem metricsSystem) {
super(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
syncWorkers,
txWorkers,
computationWorkers,
clock,
metricsSystem);
}
@Override
public List<Capability> getSupportedCapabilities() {
return singletonList(Istanbul64Protocol.ISTANBUL64);

@ -21,6 +21,7 @@ import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetNodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
@ -30,6 +31,7 @@ import org.hyperledger.besu.util.uint.UInt256;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -66,11 +68,13 @@ public class EthPeer {
private final AtomicReference<Consumer<EthPeer>> onStatusesExchanged = new AtomicReference<>();
private final PeerReputation reputation = new PeerReputation();
private final Map<PeerValidator, Boolean> validationStatus = new HashMap<>();
EthPeer(
final PeerConnection connection,
final String protocolName,
final Consumer<EthPeer> onStatusesExchanged,
final List<PeerValidator> peerValidators,
final Clock clock) {
this.connection = connection;
this.protocolName = protocolName;
@ -86,6 +90,30 @@ public class EthPeer {
}));
this.chainHeadState = new ChainState();
this.onStatusesExchanged.set(onStatusesExchanged);
for (PeerValidator peerValidator : peerValidators) {
validationStatus.put(peerValidator, false);
}
}
public void markValidated(final PeerValidator validator) {
if (!validationStatus.containsKey(validator)) {
throw new IllegalArgumentException("Attempt to update unknown validation status");
}
validationStatus.put(validator, true);
}
/**
* Check if this peer has been fully validated.
*
* @return {@code true} if all peer validation logic has run and successfully validated this peer
*/
public boolean isFullyValidated() {
for (Boolean isValid : validationStatus.values()) {
if (!isValid) {
return false;
}
}
return true;
}
public boolean isDisconnected() {

@ -13,6 +13,7 @@
package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -22,9 +23,11 @@ import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -58,9 +61,11 @@ public class EthPeers {
pendingRequests::size);
}
void registerConnection(final PeerConnection peerConnection) {
void registerConnection(
final PeerConnection peerConnection, final List<PeerValidator> peerValidators) {
final EthPeer peer =
new EthPeer(peerConnection, protocolName, this::invokeConnectionCallbacks, clock);
new EthPeer(
peerConnection, protocolName, this::invokeConnectionCallbacks, peerValidators, clock);
connections.putIfAbsent(peerConnection, peer);
}
@ -127,7 +132,11 @@ public class EthPeers {
}
public Optional<EthPeer> bestPeerWithHeightEstimate() {
return streamAvailablePeers().filter(p -> p.chainState().hasEstimatedHeight()).max(BEST_CHAIN);
return bestPeerMatchingCriteria(p -> p.chainState().hasEstimatedHeight());
}
public Optional<EthPeer> bestPeerMatchingCriteria(final Predicate<EthPeer> matchesCriteria) {
return streamAvailablePeers().filter(matchesCriteria::test).max(BEST_CHAIN);
}
@FunctionalInterface

@ -22,6 +22,8 @@ import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.StatusMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidatorRunner;
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
@ -66,17 +68,20 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
private List<Capability> supportedCapabilities;
private final Blockchain blockchain;
private final BlockBroadcaster blockBroadcaster;
private final List<PeerValidator> peerValidators;
public EthProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final List<PeerValidator> peerValidators,
final boolean fastSyncEnabled,
final EthScheduler scheduler,
final EthProtocolConfiguration ethereumWireProtocolConfiguration,
final Clock clock,
final MetricsSystem metricsSystem) {
this.networkId = networkId;
this.peerValidators = peerValidators;
this.scheduler = scheduler;
this.blockchain = blockchain;
this.fastSyncEnabled = fastSyncEnabled;
@ -90,6 +95,11 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
this.blockBroadcaster = new BlockBroadcaster(ethContext);
// Run validators
for (final PeerValidator peerValidator : this.peerValidators) {
PeerValidatorRunner.runValidator(ethContext, peerValidator);
}
// Set up request handlers
new EthServer(blockchain, worldStateArchive, ethMessages, ethereumWireProtocolConfiguration);
}
@ -98,6 +108,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final List<PeerValidator> peerValidators,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
@ -108,6 +119,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
blockchain,
worldStateArchive,
networkId,
peerValidators,
fastSyncEnabled,
new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem),
EthProtocolConfiguration.defaultConfig(),
@ -119,6 +131,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final List<PeerValidator> peerValidators,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
@ -130,6 +143,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
blockchain,
worldStateArchive,
networkId,
peerValidators,
fastSyncEnabled,
new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem),
ethereumWireProtocolConfiguration,
@ -212,7 +226,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
@Override
public void handleNewConnection(final PeerConnection connection) {
ethPeers.registerConnection(connection);
ethPeers.registerConnection(connection, peerValidators);
final EthPeer peer = ethPeers.peer(connection);
if (peer.statusHasBeenSentToPeer()) {
return;

@ -34,7 +34,6 @@ public class DaoForkPeerValidator implements PeerValidator {
private static final Logger LOG = LogManager.getLogger();
private static long DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER = 10L;
private final EthContext ethContext;
private final ProtocolSchedule<?> protocolSchedule;
private final MetricsSystem metricsSystem;
@ -42,14 +41,12 @@ public class DaoForkPeerValidator implements PeerValidator {
// Wait for peer's chainhead to advance some distance beyond daoBlockNumber before validating
private final long chainHeightEstimationBuffer;
public DaoForkPeerValidator(
final EthContext ethContext,
DaoForkPeerValidator(
final ProtocolSchedule<?> protocolSchedule,
final MetricsSystem metricsSystem,
final long daoBlockNumber,
final long chainHeightEstimationBuffer) {
checkArgument(chainHeightEstimationBuffer >= 0);
this.ethContext = ethContext;
this.protocolSchedule = protocolSchedule;
this.metricsSystem = metricsSystem;
this.daoBlockNumber = daoBlockNumber;
@ -57,20 +54,15 @@ public class DaoForkPeerValidator implements PeerValidator {
}
public DaoForkPeerValidator(
final EthContext ethContext,
final ProtocolSchedule<?> protocolSchedule,
final MetricsSystem metricsSystem,
final long daoBlockNumber) {
this(
ethContext,
protocolSchedule,
metricsSystem,
daoBlockNumber,
DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER);
this(protocolSchedule, metricsSystem, daoBlockNumber, DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER);
}
@Override
public CompletableFuture<Boolean> validatePeer(final EthPeer ethPeer) {
public CompletableFuture<Boolean> validatePeer(
final EthContext ethContext, final EthPeer ethPeer) {
AbstractPeerTask<List<BlockHeader>> getHeaderTask =
GetHeadersFromPeerByNumberTask.forSingleNumber(
protocolSchedule, ethContext, daoBlockNumber, metricsSystem)

@ -12,6 +12,7 @@
*/
package org.hyperledger.besu.ethereum.eth.peervalidation;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
@ -40,10 +41,11 @@ public interface PeerValidator {
/**
* Validates the given peer.
*
* @param ethContext Utilities for working with the eth sub-protocol.
* @param ethPeer The peer to be validated.
* @return True if the peer is valid, false otherwise.
*/
CompletableFuture<Boolean> validatePeer(final EthPeer ethPeer);
CompletableFuture<Boolean> validatePeer(final EthContext ethContext, final EthPeer ethPeer);
/**
* @param ethPeer The peer to be disconnected.

@ -39,12 +39,14 @@ public class PeerValidatorRunner {
public void checkPeer(final EthPeer ethPeer) {
if (peerValidator.canBeValidated(ethPeer)) {
peerValidator
.validatePeer(ethPeer)
.validatePeer(ethContext, ethPeer)
.whenComplete(
(validated, err) -> {
if (err != null || !validated) {
// Disconnect invalid peer
disconnectPeer(ethPeer);
} else {
ethPeer.markValidated(peerValidator);
}
});
} else if (!ethPeer.isDisconnected()) {

@ -19,6 +19,7 @@ import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -113,15 +114,15 @@ public class FastSyncActions<C> {
private CompletableFuture<FastSyncState> selectPivotBlockFromPeers() {
return ethContext
.getEthPeers()
.bestPeerWithHeightEstimate()
.bestPeerMatchingCriteria(this::canPeerDeterminePivotBlock)
// Only select a pivot block number when we have a minimum number of height estimates
.filter(
peer -> {
final long peerCount = countPeersWithEstimatedHeight();
final long peerCount = countPeersThatCanDeterminePivotBlock();
final int minPeerCount = syncConfig.getFastSyncMinimumPeerCount();
if (peerCount < minPeerCount) {
LOG.info(
"Waiting for peers with chain height information. {} / {} required peers currently available.",
"Waiting for valid peers with chain height information. {} / {} required peers currently available.",
peerCount,
minPeerCount);
return false;
@ -134,7 +135,7 @@ public class FastSyncActions<C> {
peer.chainState().getEstimatedHeight() - syncConfig.getFastSyncPivotDistance();
if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) {
// Peer's chain isn't long enough, return an empty value so we can try again.
LOG.info("Waiting for peer with sufficient chain height");
LOG.info("Waiting for peers with sufficient chain height");
return null;
}
LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber);
@ -145,14 +146,18 @@ public class FastSyncActions<C> {
.orElseGet(this::retrySelectPivotBlockAfterDelay);
}
private long countPeersWithEstimatedHeight() {
private long countPeersThatCanDeterminePivotBlock() {
return ethContext
.getEthPeers()
.streamAvailablePeers()
.filter(peer -> peer.chainState().hasEstimatedHeight())
.filter(this::canPeerDeterminePivotBlock)
.count();
}
private boolean canPeerDeterminePivotBlock(final EthPeer peer) {
return peer.chainState().hasEstimatedHeight() && peer.isFullyValidated();
}
private CompletableFuture<FastSyncState> retrySelectPivotBlockAfterDelay() {
return ethContext
.getScheduler()

@ -77,6 +77,7 @@ public class PivotBlockRetriever<C> {
.getEthPeers()
.streamAvailablePeers()
.filter(peer -> peer.chainState().getEstimatedHeight() >= pivotBlockNumber)
.filter(EthPeer::isFullyValidated)
.collect(Collectors.toList());
final int confirmationsRequired = peersToQuery.size() / 2 + 1;

@ -16,6 +16,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
@ -23,16 +24,22 @@ import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.testutil.TestClock;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Test;
@ -260,6 +267,61 @@ public class EthPeerTest {
assertThat(bodiesClosedCount.get()).isEqualTo(1);
}
@Test
public void isFullyValidated_noPeerValidators() {
final EthPeer peer = createPeer();
assertThat(peer.isFullyValidated()).isTrue();
}
@Test
public void isFullyValidated_singleValidator_notValidated() {
final PeerValidator validator = mock(PeerValidator.class);
final EthPeer peer = createPeer(validator);
assertThat(peer.isFullyValidated()).isFalse();
}
@Test
public void isFullyValidated_singleValidator_validated() {
final PeerValidator validator = mock(PeerValidator.class);
final EthPeer peer = createPeer(validator);
peer.markValidated(validator);
assertThat(peer.isFullyValidated()).isTrue();
}
@Test
public void isFullyValidated_multipleValidators_unvalidated() {
final List<PeerValidator> validators =
Stream.generate(() -> mock(PeerValidator.class)).limit(2).collect(Collectors.toList());
final EthPeer peer = createPeer(validators);
assertThat(peer.isFullyValidated()).isFalse();
}
@Test
public void isFullyValidated_multipleValidators_partiallyValidated() {
final List<PeerValidator> validators =
Stream.generate(() -> mock(PeerValidator.class)).limit(2).collect(Collectors.toList());
final EthPeer peer = createPeer(validators);
peer.markValidated(validators.get(0));
assertThat(peer.isFullyValidated()).isFalse();
}
@Test
public void isFullyValidated_multipleValidators_fullyValidated() {
final List<PeerValidator> validators =
Stream.generate(() -> mock(PeerValidator.class)).limit(2).collect(Collectors.toList());
final EthPeer peer = createPeer(validators);
validators.forEach(peer::markValidated);
assertThat(peer.isFullyValidated()).isTrue();
}
private void messageStream(
final ResponseStreamSupplier getStream,
final MessageData targetMessage,
@ -338,10 +400,18 @@ public class EthPeerTest {
}
private EthPeer createPeer() {
return createPeer(Collections.emptyList());
}
private EthPeer createPeer(final PeerValidator... peerValidators) {
return createPeer(Arrays.asList(peerValidators));
}
private EthPeer createPeer(final List<PeerValidator> peerValidators) {
final Set<Capability> caps = new HashSet<>(singletonList(EthProtocol.ETH63));
final PeerConnection peerConnection = new MockPeerConnection(caps);
final Consumer<EthPeer> onPeerReady = (peer) -> {};
return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady, clock);
return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady, peerValidators, clock);
}
@FunctionalInterface

@ -115,6 +115,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -137,6 +138,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -160,6 +162,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -194,6 +197,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -228,6 +232,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -254,6 +259,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -296,6 +302,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -337,6 +344,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -377,6 +385,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -420,6 +429,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -484,6 +494,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -525,6 +536,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -563,6 +575,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -617,6 +630,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -670,6 +684,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -717,6 +732,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -770,6 +786,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -822,6 +839,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -871,6 +889,7 @@ public final class EthProtocolManagerTest {
blockchain,
worldStateArchive,
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -923,6 +942,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -996,6 +1016,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
1,
1,
@ -1065,6 +1086,7 @@ public final class EthProtocolManagerTest {
blockchain,
protocolContext.getWorldStateArchive(),
BigInteger.ONE,
Collections.emptyList(),
true,
ethScheduler,
EthProtocolConfiguration.defaultConfig(),

@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.chain.GenesisState;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.mainnet.MainnetProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.DefaultMessage;
@ -33,6 +34,7 @@ import org.hyperledger.besu.testutil.TestClock;
import org.hyperledger.besu.util.uint.UInt256;
import java.math.BigInteger;
import java.util.Collections;
import java.util.OptionalLong;
public class EthProtocolManagerTestUtil {
@ -53,6 +55,7 @@ public class EthProtocolManagerTestUtil {
blockchain,
worldStateArchive,
networkId,
Collections.emptyList(),
false,
ethScheduler,
EthProtocolConfiguration.defaultConfig(),
@ -130,39 +133,82 @@ public class EthProtocolManagerTestUtil {
EthProtocol.ETH63, new DefaultMessage(peer.getPeerConnection(), message));
}
public static RespondingEthPeer.Builder peerBuilder() {
return RespondingEthPeer.builder();
}
public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager, final UInt256 td) {
return RespondingEthPeer.create(ethProtocolManager, td);
return RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.totalDifficulty(td)
.build();
}
public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager, final UInt256 td, final long estimatedHeight) {
return RespondingEthPeer.create(ethProtocolManager, td, estimatedHeight);
return RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.totalDifficulty(td)
.estimatedHeight(estimatedHeight)
.build();
}
public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager,
final UInt256 td,
final OptionalLong estimatedHeight) {
return RespondingEthPeer.create(ethProtocolManager, td, estimatedHeight);
return RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.totalDifficulty(td)
.estimatedHeight(estimatedHeight)
.build();
}
public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager,
final UInt256 td,
final OptionalLong estimatedHeight,
final PeerValidator... validators) {
return RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.totalDifficulty(td)
.estimatedHeight(estimatedHeight)
.peerValidators(validators)
.build();
}
public static RespondingEthPeer createPeer(final EthProtocolManager ethProtocolManager) {
return RespondingEthPeer.create(ethProtocolManager, UInt256.of(1000L));
return RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build();
}
public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager, final long estimatedHeight) {
return RespondingEthPeer.create(ethProtocolManager, UInt256.of(1000L), estimatedHeight);
return RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.estimatedHeight(estimatedHeight)
.build();
}
public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager,
final long estimatedHeight,
final PeerValidator... validators) {
return RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.estimatedHeight(estimatedHeight)
.peerValidators(validators)
.build();
}
public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager, final Blockchain blockchain) {
final ChainHead head = blockchain.getChainHead();
return RespondingEthPeer.create(
ethProtocolManager,
head.getHash(),
head.getTotalDifficulty(),
blockchain.getChainHeadBlockNumber());
return RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.totalDifficulty(head.getTotalDifficulty())
.chainHeadHash(head.getHash())
.estimatedHeight(blockchain.getChainHeadBlockNumber())
.build();
}
}

@ -217,6 +217,7 @@ public class RequestManagerTest {
final Set<Capability> caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63));
final PeerConnection peerConnection = new MockPeerConnection(caps);
final Consumer<EthPeer> onPeerReady = (peer) -> {};
return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady, TestClock.fixed());
return new EthPeer(
peerConnection, EthProtocol.NAME, onPeerReady, Collections.emptyList(), TestClock.fixed());
}
}

@ -13,6 +13,7 @@
package org.hyperledger.besu.ethereum.eth.manager;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.hyperledger.besu.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive;
import org.hyperledger.besu.ethereum.chain.Blockchain;
@ -28,6 +29,7 @@ import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.EthPV63;
import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.DefaultMessage;
@ -55,7 +57,6 @@ import com.google.common.collect.Lists;
public class RespondingEthPeer {
private static final BlockDataGenerator gen = new BlockDataGenerator();
private static final int DEFAULT_ESTIMATED_HEIGHT = 1000;
private final EthPeer ethPeer;
private final BlockingQueue<OutgoingMessage> outgoingMessages;
private final EthProtocolManager ethProtocolManager;
@ -98,48 +99,16 @@ public class RespondingEthPeer {
return peerConnection;
}
public static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager, final UInt256 totalDifficulty) {
return create(ethProtocolManager, totalDifficulty, DEFAULT_ESTIMATED_HEIGHT);
public static Builder builder() {
return new Builder();
}
public static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final UInt256 totalDifficulty,
final long estimatedHeight) {
final Hash chainHeadHash = gen.hash();
return create(ethProtocolManager, chainHeadHash, totalDifficulty, estimatedHeight);
}
public static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final UInt256 totalDifficulty,
final OptionalLong estimatedHeight) {
final Hash chainHeadHash = gen.hash();
return create(ethProtocolManager, chainHeadHash, totalDifficulty, estimatedHeight);
}
public static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final Hash chainHeadHash,
final UInt256 totalDifficulty) {
return create(ethProtocolManager, chainHeadHash, totalDifficulty, DEFAULT_ESTIMATED_HEIGHT);
}
public static RespondingEthPeer create(
private static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final Hash chainHeadHash,
final UInt256 totalDifficulty,
final long estimatedHeight) {
return create(
ethProtocolManager, chainHeadHash, totalDifficulty, OptionalLong.of(estimatedHeight));
}
public static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final Hash chainHeadHash,
final UInt256 totalDifficulty,
final OptionalLong estimatedHeight) {
final OptionalLong estimatedHeight,
final List<PeerValidator> peerValidators) {
final EthPeers ethPeers = ethProtocolManager.ethContext().getEthPeers();
final Set<Capability> caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63));
@ -147,7 +116,7 @@ public class RespondingEthPeer {
final MockPeerConnection peerConnection =
new MockPeerConnection(
caps, (cap, msg, conn) -> outgoingMessages.add(new OutgoingMessage(cap, msg)));
ethPeers.registerConnection(peerConnection);
ethPeers.registerConnection(peerConnection, peerValidators);
final EthPeer peer = ethPeers.peer(peerConnection);
peer.registerStatusReceived(chainHeadHash, totalDifficulty);
estimatedHeight.ifPresent(height -> peer.chainState().update(chainHeadHash, height));
@ -365,6 +334,61 @@ public class RespondingEthPeer {
};
}
public static class Builder {
private EthProtocolManager ethProtocolManager;
private Hash chainHeadHash = gen.hash();
private UInt256 totalDifficulty = UInt256.of(1000L);
private OptionalLong estimatedHeight = OptionalLong.of(1000L);
private List<PeerValidator> peerValidators = new ArrayList<>();
public RespondingEthPeer build() {
checkNotNull(ethProtocolManager, "Must configure EthProtocolManager");
return RespondingEthPeer.create(
ethProtocolManager, chainHeadHash, totalDifficulty, estimatedHeight, peerValidators);
}
public Builder ethProtocolManager(final EthProtocolManager ethProtocolManager) {
checkNotNull(ethProtocolManager);
this.ethProtocolManager = ethProtocolManager;
return this;
}
public Builder chainHeadHash(final Hash chainHeadHash) {
checkNotNull(chainHeadHash);
this.chainHeadHash = chainHeadHash;
return this;
}
public Builder totalDifficulty(final UInt256 totalDifficulty) {
checkNotNull(totalDifficulty);
this.totalDifficulty = totalDifficulty;
return this;
}
public Builder estimatedHeight(final OptionalLong estimatedHeight) {
checkNotNull(estimatedHeight);
this.estimatedHeight = estimatedHeight;
return this;
}
public Builder estimatedHeight(final long estimatedHeight) {
this.estimatedHeight = OptionalLong.of(estimatedHeight);
return this;
}
public Builder peerValidators(final List<PeerValidator> peerValidators) {
checkNotNull(peerValidators);
this.peerValidators.addAll(peerValidators);
return this;
}
public Builder peerValidators(final PeerValidator... peerValidators) {
peerValidators(Arrays.asList(peerValidators));
return this;
}
}
static class OutgoingMessage {
private final Capability capability;
private final MessageData messageData;

@ -53,16 +53,13 @@ public class DaoForkPeerValidatorTest {
PeerValidator validator =
new DaoForkPeerValidator(
ethProtocolManager.ethContext(),
MainnetProtocolSchedule.create(),
new NoOpMetricsSystem(),
daoBlockNumber,
0);
MainnetProtocolSchedule.create(), new NoOpMetricsSystem(), daoBlockNumber, 0);
RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber);
CompletableFuture<Boolean> result = validator.validatePeer(peer.getEthPeer());
CompletableFuture<Boolean> result =
validator.validatePeer(ethProtocolManager.ethContext(), peer.getEthPeer());
assertThat(result).isNotDone();
@ -85,16 +82,13 @@ public class DaoForkPeerValidatorTest {
PeerValidator validator =
new DaoForkPeerValidator(
ethProtocolManager.ethContext(),
MainnetProtocolSchedule.create(),
new NoOpMetricsSystem(),
daoBlockNumber,
0);
MainnetProtocolSchedule.create(), new NoOpMetricsSystem(), daoBlockNumber, 0);
RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber);
CompletableFuture<Boolean> result = validator.validatePeer(peer.getEthPeer());
CompletableFuture<Boolean> result =
validator.validatePeer(ethProtocolManager.ethContext(), peer.getEthPeer());
assertThat(result).isNotDone();
@ -114,16 +108,13 @@ public class DaoForkPeerValidatorTest {
PeerValidator validator =
new DaoForkPeerValidator(
ethProtocolManager.ethContext(),
MainnetProtocolSchedule.create(),
new NoOpMetricsSystem(),
daoBlockNumber,
0);
MainnetProtocolSchedule.create(), new NoOpMetricsSystem(), daoBlockNumber, 0);
RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber);
CompletableFuture<Boolean> result = validator.validatePeer(peer.getEthPeer());
CompletableFuture<Boolean> result =
validator.validatePeer(ethProtocolManager.ethContext(), peer.getEthPeer());
// Request should timeout immediately
assertThat(result).isDone();
@ -143,11 +134,7 @@ public class DaoForkPeerValidatorTest {
PeerValidator validator =
new DaoForkPeerValidator(
ethProtocolManager.ethContext(),
MainnetProtocolSchedule.create(),
new NoOpMetricsSystem(),
daoBlockNumber,
0);
MainnetProtocolSchedule.create(), new NoOpMetricsSystem(), daoBlockNumber, 0);
int peerCount = 1000;
List<RespondingEthPeer> otherPeers =
@ -158,7 +145,8 @@ public class DaoForkPeerValidatorTest {
RespondingEthPeer targetPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber);
CompletableFuture<Boolean> result = validator.validatePeer(targetPeer.getEthPeer());
CompletableFuture<Boolean> result =
validator.validatePeer(ethProtocolManager.ethContext(), targetPeer.getEthPeer());
assertThat(result).isNotDone();
@ -183,11 +171,7 @@ public class DaoForkPeerValidatorTest {
PeerValidator validator =
new DaoForkPeerValidator(
ethProtocolManager.ethContext(),
MainnetProtocolSchedule.create(),
new NoOpMetricsSystem(),
daoBlockNumber,
buffer);
MainnetProtocolSchedule.create(), new NoOpMetricsSystem(), daoBlockNumber, buffer);
EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0).getEthPeer();

@ -12,6 +12,7 @@
*/
package org.hyperledger.besu.ethereum.eth.peervalidation;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -34,88 +35,130 @@ public class PeerValidatorRunnerTest {
@Test
public void checkPeer_schedulesFutureCheckWhenPeerNotReady() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final PeerValidator validator = mock(PeerValidator.class);
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer();
final EthPeer peer =
EthProtocolManagerTestUtil.peerBuilder()
.ethProtocolManager(ethProtocolManager)
.peerValidators(validator)
.build()
.getEthPeer();
assertThat(peer.isFullyValidated()).isFalse();
PeerValidator validator = mock(PeerValidator.class);
when(validator.canBeValidated(eq(peer))).thenReturn(false);
when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30));
PeerValidatorRunner runner =
final PeerValidatorRunner runner =
spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator));
runner.checkPeer(peer);
assertThat(peer.isFullyValidated()).isFalse();
verify(runner, times(1)).checkPeer(eq(peer));
verify(validator, never()).validatePeer(eq(peer));
verify(validator, never()).validatePeer(eq(ethProtocolManager.ethContext()), eq(peer));
verify(runner, never()).disconnectPeer(eq(peer));
verify(runner, times(1)).scheduleNextCheck(eq(peer));
// Run pending futures to trigger the next check
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
verify(runner, times(2)).checkPeer(eq(peer));
verify(validator, never()).validatePeer(eq(peer));
verify(validator, never()).validatePeer(eq(ethProtocolManager.ethContext()), eq(peer));
verify(runner, never()).disconnectPeer(eq(peer));
verify(runner, times(2)).scheduleNextCheck(eq(peer));
assertThat(peer.isFullyValidated()).isFalse();
}
@Test
public void checkPeer_doesNotScheduleFutureCheckWhenPeerNotReadyAndDisconnected() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final PeerValidator validator = mock(PeerValidator.class);
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer();
final EthPeer peer =
EthProtocolManagerTestUtil.peerBuilder()
.ethProtocolManager(ethProtocolManager)
.peerValidators(validator)
.build()
.getEthPeer();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
PeerValidator validator = mock(PeerValidator.class);
when(validator.canBeValidated(eq(peer))).thenReturn(false);
when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30));
PeerValidatorRunner runner =
final PeerValidatorRunner runner =
spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator));
runner.checkPeer(peer);
verify(runner, times(1)).checkPeer(eq(peer));
verify(validator, never()).validatePeer(eq(peer));
verify(validator, never()).validatePeer(eq(ethProtocolManager.ethContext()), eq(peer));
verify(runner, never()).disconnectPeer(eq(peer));
verify(runner, times(0)).scheduleNextCheck(eq(peer));
}
@Test
public void checkPeer_handlesInvalidPeer() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final PeerValidator validator = mock(PeerValidator.class);
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer();
final EthPeer peer =
EthProtocolManagerTestUtil.peerBuilder()
.ethProtocolManager(ethProtocolManager)
.peerValidators(validator)
.build()
.getEthPeer();
assertThat(peer.isFullyValidated()).isFalse();
PeerValidator validator = mock(PeerValidator.class);
when(validator.canBeValidated(eq(peer))).thenReturn(true);
when(validator.validatePeer(eq(peer))).thenReturn(CompletableFuture.completedFuture(false));
when(validator.validatePeer(eq(ethProtocolManager.ethContext()), eq(peer)))
.thenReturn(CompletableFuture.completedFuture(false));
when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30));
PeerValidatorRunner runner =
assertThat(peer.isFullyValidated()).isFalse();
final PeerValidatorRunner runner =
spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator));
runner.checkPeer(peer);
verify(validator, times(1)).validatePeer(eq(peer));
verify(validator, times(1)).validatePeer(eq(ethProtocolManager.ethContext()), eq(peer));
verify(runner, times(1)).disconnectPeer(eq(peer));
verify(runner, never()).scheduleNextCheck(eq(peer));
assertThat(peer.isFullyValidated()).isFalse();
}
@Test
public void checkPeer_handlesValidPeer() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final PeerValidator validator = mock(PeerValidator.class);
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer();
final EthPeer peer =
EthProtocolManagerTestUtil.peerBuilder()
.ethProtocolManager(ethProtocolManager)
.peerValidators(validator)
.build()
.getEthPeer();
assertThat(peer.isFullyValidated()).isFalse();
PeerValidator validator = mock(PeerValidator.class);
when(validator.canBeValidated(eq(peer))).thenReturn(true);
when(validator.validatePeer(eq(peer))).thenReturn(CompletableFuture.completedFuture(true));
when(validator.validatePeer(eq(ethProtocolManager.ethContext()), eq(peer)))
.thenReturn(CompletableFuture.completedFuture(true));
when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30));
PeerValidatorRunner runner =
final PeerValidatorRunner runner =
spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator));
runner.checkPeer(peer);
verify(validator, times(1)).validatePeer(eq(peer));
assertThat(peer.isFullyValidated()).isTrue();
verify(validator, times(1)).validatePeer(eq(ethProtocolManager.ethContext()), eq(peer));
verify(runner, never()).disconnectPeer(eq(peer));
verify(runner, never()).scheduleNextCheck(eq(peer));
}

@ -38,11 +38,12 @@ public class ChainHeadTrackerTest {
private final EthProtocolManager ethProtocolManager =
EthProtocolManagerTestUtil.create(blockchain, blockchainSetupUtil.getWorldArchive());
private final RespondingEthPeer respondingPeer =
RespondingEthPeer.create(
ethProtocolManager,
blockchain.getChainHeadHash(),
blockchain.getChainHead().getTotalDifficulty(),
0);
RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.chainHeadHash(blockchain.getChainHeadHash())
.totalDifficulty(blockchain.getChainHead().getTotalDifficulty())
.estimatedHeight(0)
.build();
private final ProtocolSchedule<Void> protocolSchedule =
FixedDifficultyProtocolSchedule.create(
GenesisConfigFile.development().getConfigOptions(), false);

@ -24,9 +24,11 @@ import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
@ -204,8 +206,69 @@ public class FastSyncActionsTest {
assertThat(result).isCompletedWithValue(expected);
}
@Test
public void selectPivotBlockShouldWaitAndRetryIfSufficientValidatedPeersUnavailable() {
final int minPeers = 3;
final PeerValidator validator = mock(PeerValidator.class);
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
// Create peers that are not validated
final OptionalLong height = OptionalLong.of(minPivotHeight + 10);
List<RespondingEthPeer> peers = new ArrayList<>();
for (int i = 0; i < minPeers; i++) {
final UInt256 td = UInt256.of(i);
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height, validator);
peers.add(peer);
}
// No pivot should be selected while peers are not fully validated
final CompletableFuture<FastSyncState> result =
fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
assertThat(result).isNotDone();
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isNotDone();
// Validate a subset of peers
peers.subList(0, minPeers - 1).forEach(p -> p.getEthPeer().markValidated(validator));
// No pivot should be selected while only a subset of peers have height estimates
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isNotDone();
// Set best height and mark best peer validated
final long bestPeerHeight = minPivotHeight + 11;
final EthPeer bestPeer = peers.get(minPeers - 1).getEthPeer();
bestPeer.chainState().updateHeightEstimate(bestPeerHeight);
bestPeer.markValidated(validator);
final FastSyncState expected =
new FastSyncState(bestPeerHeight - syncConfig.getFastSyncPivotDistance());
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isCompletedWithValue(expected);
}
@Test
public void selectPivotBlockUsesBestPeerWithHeightEstimate() {
selectPivotBlockUsesBestPeerMatchingRequiredCriteria(true, false);
}
@Test
public void selectPivotBlockUsesBestPeerThatIsValidated() {
selectPivotBlockUsesBestPeerMatchingRequiredCriteria(false, true);
}
@Test
public void selectPivotBlockUsesBestPeerThatIsValidatedAndHasHeightEstimate() {
selectPivotBlockUsesBestPeerMatchingRequiredCriteria(true, true);
}
private void selectPivotBlockUsesBestPeerMatchingRequiredCriteria(
final boolean bestMissingHeight, final boolean bestNotValidated) {
final int minPeers = 3;
final int peerCount = minPeers + 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
@ -215,21 +278,27 @@ public class FastSyncActionsTest {
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
// Create peers without chain height estimates
final PeerValidator validator = mock(PeerValidator.class);
List<RespondingEthPeer> peers = new ArrayList<>();
for (int i = 0; i < peerCount; i++) {
// Best peer by td is the first peer, td decreases as i increases
final boolean isBest = i == 0;
final UInt256 td = UInt256.of(peerCount - i);
final OptionalLong height;
if (i == 0) {
if (isBest && bestMissingHeight) {
// Don't set a height estimate for the best peer
height = OptionalLong.empty();
} else {
// Height increases with i
height = OptionalLong.of(minPivotHeight + i);
}
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height, validator);
if (!isBest || !bestNotValidated) {
peer.getEthPeer().markValidated(validator);
}
peers.add(peer);
}

@ -24,6 +24,7 @@ import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -93,11 +94,42 @@ public class PivotBlockRetrieverTest {
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeerA =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1);
final RespondingEthPeer respondingPeerB =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1);
final RespondingEthPeer respondingPeerC =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1);
final CompletableFuture<FastSyncState> future = pivotBlockRetriever.downloadPivotBlockHeader();
while (!future.isDone()) {
assertThat(respondingPeerB.hasOutstandingRequests()).isFalse();
assertThat(respondingPeerC.hasOutstandingRequests()).isFalse();
respondingPeerA.respondWhile(responder, () -> !future.isDone());
}
assertThat(future)
.isCompletedWithValue(
new FastSyncState(blockchain.getBlockHeader(PIVOT_BLOCK_NUMBER).get()));
}
@Test
public void shouldIgnorePeersThatAreNotFullyValidated() {
final PeerValidator peerValidator = mock(PeerValidator.class);
final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeerA =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator);
final RespondingEthPeer respondingPeerB =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator);
final RespondingEthPeer respondingPeerC =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator);
// Only mark one peer as validated
respondingPeerA.getEthPeer().markValidated(peerValidator);
final CompletableFuture<FastSyncState> future = pivotBlockRetriever.downloadPivotBlockHeader();
while (!future.isDone()) {
assertThat(respondingPeerB.hasOutstandingRequests()).isFalse();
assertThat(respondingPeerC.hasOutstandingRequests()).isFalse();
respondingPeerA.respondWhile(responder, () -> !future.isDone());
}

@ -55,6 +55,7 @@ import org.hyperledger.besu.util.bytes.BytesValue;
import java.io.Closeable;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@ -112,6 +113,7 @@ public class TestNode implements Closeable {
blockchain,
worldStateArchive,
BigInteger.ONE,
Collections.emptyList(),
false,
1,
1,

Loading…
Cancel
Save