Check for snap server (#6609)

* EthPeer add isServingSnap to be able to make sure that we have enough snap servers connected when we are snap syncing

Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
Signed-off-by: Sally MacFarlane <macfarla.github@gmail.com>
Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
pull/7295/head
Stefan Pingel 5 months ago committed by GitHub
parent 8d8dbf05b5
commit 8a9a84ad58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 10
      besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
  2. 36
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  3. 7
      besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java
  4. 7
      besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java
  5. 27
      besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java
  6. 6
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/protocol/BftProtocolManager.java
  7. 6
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/forkid/ForkIdManager.java
  8. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java
  9. 375
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  10. 63
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java
  11. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerRequest.java
  12. 1
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PendingPeerRequest.java
  13. 39
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetAccountRangeFromPeerTask.java
  14. 32
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetBytecodeFromPeerTask.java
  15. 41
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetStorageRangeFromPeerTask.java
  16. 31
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetTrieNodeFromPeerTask.java
  17. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java
  18. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java
  19. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java
  20. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java
  21. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java
  22. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java
  23. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java
  24. 75
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTracker.java
  25. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  26. 86
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SnapServerChecker.java
  27. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SyncMode.java
  28. 9
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/SyncTargetManager.java
  29. 89
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java
  30. 9
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java
  31. 69
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  32. 36
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java
  33. 7
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java
  34. 11
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java
  35. 6
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTrackerTest.java
  36. 6
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java
  37. 28
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java
  38. 9
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java
  39. 19
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/NetworkRunner.java
  40. 10
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/ProtocolManager.java
  41. 4
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/messages/DisconnectMessage.java
  42. 7
      ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java

@ -688,13 +688,14 @@ public class RunnerBuilder {
.map(nodePerms -> PeerPermissions.combine(nodePerms, defaultPeerPermissions))
.orElse(defaultPeerPermissions);
final EthPeers ethPeers = besuController.getEthPeers();
LOG.info("Detecting NAT service.");
final boolean fallbackEnabled = natMethod == NatMethod.AUTO || natMethodFallbackEnabled;
final NatService natService = new NatService(buildNatManager(natMethod), fallbackEnabled);
final NetworkBuilder inactiveNetwork = caps -> new NoopP2PNetwork();
final NetworkBuilder activeNetwork =
caps -> {
final EthPeers ethPeers = besuController.getEthPeers();
return DefaultP2PNetwork.builder()
.vertx(vertx)
.nodeKey(nodeKey)
@ -709,8 +710,8 @@ public class RunnerBuilder {
.blockchain(context.getBlockchain())
.blockNumberForks(besuController.getGenesisConfigOptions().getForkBlockNumbers())
.timestampForks(besuController.getGenesisConfigOptions().getForkBlockTimestamps())
.allConnectionsSupplier(ethPeers::getAllConnections)
.allActiveConnectionsSupplier(ethPeers::getAllActiveConnections)
.allConnectionsSupplier(ethPeers::streamAllConnections)
.allActiveConnectionsSupplier(ethPeers::streamAllActiveConnections)
.maxPeers(ethPeers.getMaxPeers())
.build();
};
@ -721,9 +722,10 @@ public class RunnerBuilder {
.subProtocols(subProtocols)
.network(p2pEnabled ? activeNetwork : inactiveNetwork)
.metricsSystem(metricsSystem)
.ethPeersShouldConnect(ethPeers::shouldTryToConnect)
.build();
besuController.getEthPeers().setRlpxAgent(networkRunner.getRlpxAgent());
ethPeers.setRlpxAgent(networkRunner.getRlpxAgent());
final P2PNetwork network = networkRunner.getNetwork();
// ForkId in Ethereum Node Record needs updating when we transition to a new protocol spec

@ -46,7 +46,6 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
@ -77,6 +76,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.config.NetworkingConfiguration;
@ -604,6 +604,12 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final int maxMessageSize = ethereumWireProtocolConfiguration.getMaxMessageSize();
final Supplier<ProtocolSpec> currentProtocolSpecSupplier =
() -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader());
final ForkIdManager forkIdManager =
new ForkIdManager(
blockchain,
genesisConfigOptions.getForkBlockNumbers(),
genesisConfigOptions.getForkBlockTimestamps(),
ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled());
final EthPeers ethPeers =
new EthPeers(
getSupportedProtocol(),
@ -615,7 +621,9 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
nodeKey.getPublicKey().getEncodedBytes(),
maxPeers,
maxRemotelyInitiatedPeers,
randomPeerPriority);
randomPeerPriority,
syncConfig.getSyncMode(),
forkIdManager);
final EthMessages ethMessages = new EthMessages();
final EthMessages snapMessages = new EthMessages();
@ -681,13 +689,14 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
ethMessages,
scheduler,
peerValidators,
Optional.empty());
Optional.empty(),
forkIdManager);
final PivotBlockSelector pivotBlockSelector =
createPivotSelector(
protocolSchedule, protocolContext, ethContext, syncState, metricsSystem, blockchain);
final Synchronizer synchronizer =
final DefaultSynchronizer synchronizer =
createSynchronizer(
protocolSchedule,
worldStateStorageCoordinator,
@ -697,6 +706,16 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
ethProtocolManager,
pivotBlockSelector);
ethPeers.setTrailingPeerRequirementsSupplier(synchronizer::calculateTrailingPeerRequirements);
if (SyncMode.isSnapSync(syncConfig.getSyncMode())
|| SyncMode.isCheckpointSync(syncConfig.getSyncMode())) {
synchronizer.subscribeInSync((b) -> ethPeers.snapServerPeersNeeded(!b));
ethPeers.snapServerPeersNeeded(true);
} else {
ethPeers.snapServerPeersNeeded(false);
}
protocolContext.setSynchronizer(Optional.of(synchronizer));
final Optional<SnapProtocolManager> maybeSnapProtocolManager =
@ -809,7 +828,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
* @param pivotBlockSelector the pivot block selector
* @return the synchronizer
*/
protected Synchronizer createSynchronizer(
protected DefaultSynchronizer createSynchronizer(
final ProtocolSchedule protocolSchedule,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
@ -1000,6 +1019,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
* @param scheduler the scheduler
* @param peerValidators the peer validators
* @param mergePeerFilter the merge peer filter
* @param forkIdManager the fork id manager
* @return the eth protocol manager
*/
protected EthProtocolManager createEthProtocolManager(
@ -1012,7 +1032,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
return new EthProtocolManager(
protocolContext.getBlockchain(),
networkId,
@ -1026,8 +1047,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
mergePeerFilter,
synchronizerConfiguration,
scheduler,
genesisConfigOptions.getForkBlockNumbers(),
genesisConfigOptions.getForkBlockTimestamps());
forkIdManager);
}
/**

@ -49,6 +49,7 @@ import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
@ -242,7 +243,8 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
return besuControllerBuilderSchedule
.get(0L)
.createEthProtocolManager(
@ -255,7 +257,8 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
ethMessages,
scheduler,
peerValidators,
mergePeerFilter);
mergePeerFilter,
forkIdManager);
}
@Override

@ -41,6 +41,7 @@ import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardChain;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
@ -97,7 +98,8 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
var mergeContext = protocolContext.getConsensusContext(MergeContext.class);
@ -126,7 +128,8 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
ethMessages,
scheduler,
peerValidators,
filterToUse);
filterToUse,
forkIdManager);
return ethProtocolManager;
}

@ -33,7 +33,6 @@ import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
@ -49,6 +48,7 @@ import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
@ -156,7 +156,8 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
return mergeBesuControllerBuilder.createEthProtocolManager(
protocolContext,
synchronizerConfiguration,
@ -167,7 +168,8 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
ethMessages,
scheduler,
peerValidators,
mergePeerFilter);
mergePeerFilter,
forkIdManager);
}
@Override
@ -212,7 +214,7 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
}
@Override
protected Synchronizer createSynchronizer(
protected DefaultSynchronizer createSynchronizer(
final ProtocolSchedule protocolSchedule,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
@ -222,15 +224,14 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
final PivotBlockSelector pivotBlockSelector) {
DefaultSynchronizer sync =
(DefaultSynchronizer)
super.createSynchronizer(
protocolSchedule,
worldStateStorageCoordinator,
protocolContext,
ethContext,
syncState,
ethProtocolManager,
pivotBlockSelector);
super.createSynchronizer(
protocolSchedule,
worldStateStorageCoordinator,
protocolContext,
ethContext,
syncState,
ethProtocolManager,
pivotBlockSelector);
if (genesisConfigOptions.getTerminalTotalDifficulty().isPresent()) {
LOG.info(

@ -20,7 +20,6 @@ import org.hyperledger.besu.consensus.common.bft.events.BftEvents;
import org.hyperledger.besu.consensus.common.bft.network.PeerConnectionTracker;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
@ -108,11 +107,6 @@ public class BftProtocolManager implements ProtocolManager {
peers.add(peerConnection);
}
@Override
public boolean shouldConnect(final Peer peer, final boolean incoming) {
return false; // for now the EthProtocolManager takes care of this
}
@Override
public void handleDisconnect(
final PeerConnection peerConnection,

@ -58,7 +58,11 @@ public class ForkIdManager {
checkNotNull(blockchain);
checkNotNull(blockNumberForks);
this.chainHeadSupplier = blockchain::getChainHeadHeader;
this.genesisHash = blockchain.getGenesisBlock().getHash();
try {
this.genesisHash = blockchain.getGenesisBlock().getHash();
} catch (Exception e) {
throw new RuntimeException(e);
}
this.blockNumbersForkIds = new ArrayList<>();
this.timestampsForkIds = new ArrayList<>();
this.legacyEth64 = legacyEth64;

@ -103,6 +103,7 @@ public class EthPeer implements Comparable<EthPeer> {
private final PeerReputation reputation = new PeerReputation();
private final Map<PeerValidator, Boolean> validationStatus = new ConcurrentHashMap<>();
private final Bytes id;
private boolean isServingSnap = false;
private static final Map<Integer, Integer> roundMessages;
@ -393,6 +394,14 @@ public class EthPeer implements Comparable<EthPeer> {
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_TRIE_NODES), getTrieNodes);
}
public void setIsServingSnap(final boolean isServingSnap) {
this.isServingSnap = isServingSnap;
}
public boolean isServingSnap() {
return isServingSnap;
}
private RequestManager.ResponseStream sendRequest(
final RequestManager requestManager, final MessageData messageData) throws PeerNotConnected {
lastRequestTimestamp = clock.millis();
@ -582,9 +591,9 @@ public class EthPeer implements Comparable<EthPeer> {
}
/**
* Return A read-only snapshot of this peer's current {@code chainState} }
* Return A read-only snapshot of this peer's current {@code chainState}
*
* @return A read-only snapshot of this peer's current {@code chainState} }
* @return A read-only snapshot of this peer's current {@code chainState}
*/
public ChainHeadEstimate chainStateSnapshot() {
return chainHeadState.getSnapshot();
@ -629,14 +638,17 @@ public class EthPeer implements Comparable<EthPeer> {
@Override
public String toString() {
return String.format(
"PeerId: %s %s, validated? %s, disconnected? %s, client: %s, %s, %s",
"PeerId: %s %s, validated? %s, disconnected? %s, client: %s, %s, %s, isServingSnap %s, has height %s, connected for %s ms",
getLoggableId(),
reputation,
isFullyValidated(),
isDisconnected(),
connection.getPeerInfo().getClientId(),
connection,
connection.getPeer().getEnodeURLString());
connection.getPeer().getEnodeURLString(),
isServingSnap,
chainHeadState.getEstimatedHeight(),
System.currentTimeMillis() - connection.getInitiatedAt());
}
@Nonnull

@ -14,8 +14,16 @@
*/
package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
import org.hyperledger.besu.ethereum.eth.sync.SnapServerChecker;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements;
import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
@ -35,8 +43,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -72,7 +82,7 @@ public class EthPeers {
public static final int NODE_ID_LENGTH = 64;
public static final int USEFULL_PEER_SCORE_THRESHOLD = 102;
private final Map<Bytes, EthPeer> completeConnections = new ConcurrentHashMap<>();
private final Map<Bytes, EthPeer> activeConnections = new ConcurrentHashMap<>();
private final Cache<PeerConnection, EthPeer> incompleteConnections =
CacheBuilder.newBuilder()
@ -92,12 +102,22 @@ public class EthPeers {
private final Boolean randomPeerPriority;
private final Bytes nodeIdMask = Bytes.random(NODE_ID_LENGTH);
private final Supplier<ProtocolSpec> currentProtocolSpecSupplier;
private final SyncMode syncMode;
private final ForkIdManager forkIdManager;
private final int snapServerTargetNumber;
private final boolean shouldLimitRemoteConnections;
private Comparator<EthPeer> bestPeerComparator;
private final Bytes localNodeId;
private RlpxAgent rlpxAgent;
private final Counter connectedPeersCounter;
// private List<ProtocolManager> protocolManagers;
private ChainHeadTracker tracker;
private SnapServerChecker snapServerChecker;
private boolean snapServerPeersNeeded = false;
private Supplier<TrailingPeerRequirements> trailingPeerRequirementsSupplier =
() -> TrailingPeerRequirements.UNRESTRICTED;
public EthPeers(
final String protocolName,
@ -109,7 +129,9 @@ public class EthPeers {
final Bytes localNodeId,
final int peerUpperBound,
final int maxRemotelyInitiatedConnections,
final Boolean randomPeerPriority) {
final Boolean randomPeerPriority,
final SyncMode syncMode,
final ForkIdManager forkIdManager) {
this.protocolName = protocolName;
this.currentProtocolSpecSupplier = currentProtocolSpecSupplier;
this.clock = clock;
@ -121,11 +143,22 @@ public class EthPeers {
this.maxRemotelyInitiatedConnections = maxRemotelyInitiatedConnections;
this.randomPeerPriority = randomPeerPriority;
LOG.trace("MaxPeers: {}, Max Remote: {}", peerUpperBound, maxRemotelyInitiatedConnections);
this.syncMode = syncMode;
this.forkIdManager = forkIdManager;
this.snapServerTargetNumber =
peerUpperBound / 2; // 50% of peers should be snap servers while snap syncing
this.shouldLimitRemoteConnections = maxRemotelyInitiatedConnections < peerUpperBound;
metricsSystem.createIntegerGauge(
BesuMetricCategory.ETHEREUM,
"peer_count",
"The current number of peers connected",
() -> (int) streamAvailablePeers().filter(p -> p.readyForRequests()).count());
activeConnections::size);
metricsSystem.createIntegerGauge(
BesuMetricCategory.ETHEREUM,
"peer_count_snap_server",
"The current number of peers connected that serve snap data",
() -> (int) streamAvailablePeers().filter(EthPeer::isServingSnap).count());
metricsSystem.createIntegerGauge(
BesuMetricCategory.PEERS,
"pending_peer_requests_current",
@ -146,7 +179,7 @@ public class EthPeers {
final PeerConnection newConnection, final List<PeerValidator> peerValidators) {
final Bytes id = newConnection.getPeer().getId();
synchronized (this) {
EthPeer ethPeer = completeConnections.get(id);
EthPeer ethPeer = activeConnections.get(id);
if (ethPeer == null) {
final Optional<EthPeer> peerInList =
incompleteConnections.asMap().values().stream()
@ -193,12 +226,12 @@ public class EthPeers {
if (peer.getConnection().equals(connection)) {
final Bytes id = peer.getId();
if (!peerHasIncompleteConnection(id)) {
removed = completeConnections.remove(id, peer);
removed = activeConnections.remove(id, peer);
disconnectCallbacks.forEach(callback -> callback.onDisconnect(peer));
peer.handleDisconnect();
abortPendingRequestsAssignedToDisconnectedPeers();
if (peer.getReputation().getScore() > USEFULL_PEER_SCORE_THRESHOLD) {
LOG.atDebug().setMessage("Disconnected USEFULL peer {}").addArgument(peer).log();
LOG.atDebug().setMessage("Disconnected USEFUL peer {}").addArgument(peer).log();
} else {
LOG.atDebug()
.setMessage("Disconnected EthPeer {}")
@ -227,7 +260,7 @@ public class EthPeers {
public EthPeer peer(final PeerConnection connection) {
final EthPeer ethPeer = incompleteConnections.getIfPresent(connection);
return ethPeer != null ? ethPeer : completeConnections.get(connection.getPeer().getId());
return ethPeer != null ? ethPeer : activeConnections.get(connection.getPeer().getId());
}
public PendingPeerRequest executePeerRequest(
@ -265,7 +298,7 @@ public class EthPeers {
@VisibleForTesting
void reattemptPendingPeerRequests() {
synchronized (this) {
final List<EthPeer> peers = streamAvailablePeers().collect(Collectors.toList());
final List<EthPeer> peers = streamAvailablePeers().toList();
final Iterator<PendingPeerRequest> iterator = pendingRequests.iterator();
while (iterator.hasNext() && peers.stream().anyMatch(EthPeer::hasAvailableRequestCapacity)) {
final PendingPeerRequest request = iterator.next();
@ -290,7 +323,7 @@ public class EthPeers {
public int peerCount() {
removeDisconnectedPeers();
return completeConnections.size();
return activeConnections.size();
}
public int getMaxPeers() {
@ -298,11 +331,11 @@ public class EthPeers {
}
public Stream<EthPeer> streamAllPeers() {
return completeConnections.values().stream();
return activeConnections.values().stream();
}
private void removeDisconnectedPeers() {
completeConnections
activeConnections
.values()
.forEach(
ep -> {
@ -313,9 +346,7 @@ public class EthPeers {
}
public Stream<EthPeer> streamAvailablePeers() {
return streamAllPeers()
.filter(EthPeer::readyForRequests)
.filter(peer -> !peer.isDisconnected());
return streamAllPeers().filter(peer -> !peer.isDisconnected());
}
public Stream<EthPeer> streamBestPeers() {
@ -350,53 +381,59 @@ public class EthPeers {
this.rlpxAgent = rlpxAgent;
}
public Stream<PeerConnection> getAllActiveConnections() {
return completeConnections.values().stream()
public Stream<PeerConnection> streamAllActiveConnections() {
return activeConnections.values().stream()
.map(EthPeer::getConnection)
.filter(c -> !c.isDisconnected());
}
public Stream<PeerConnection> getAllConnections() {
public Stream<PeerConnection> streamAllConnections() {
return Stream.concat(
completeConnections.values().stream().map(EthPeer::getConnection),
activeConnections.values().stream().map(EthPeer::getConnection),
incompleteConnections.asMap().keySet().stream())
.distinct()
.filter(c -> !c.isDisconnected());
}
public boolean shouldConnect(final Peer peer, final boolean inbound) {
public boolean shouldTryToConnect(final Peer peer, final boolean inbound) {
if (peer.getForkId().isPresent()) {
final ForkId forkId = peer.getForkId().get();
if (!forkIdManager.peerCheck(forkId)) {
LOG.atDebug()
.setMessage("Wrong fork id, not trying to connect to peer {}")
.addArgument(peer::getId)
.log();
return false;
}
}
final Bytes id = peer.getId();
if (peerCount() >= peerUpperBound && !canExceedPeerLimits(id)) {
if (alreadyConnectedOrConnecting(inbound, id)) {
LOG.atTrace()
.setMessage("not connecting to peer {} - too many peers")
.setMessage("not connecting to peer {} - already connected")
.addArgument(peer.getLoggableId())
.log();
return false;
}
final EthPeer ethPeer = completeConnections.get(id);
return peerCount() < getMaxPeers() || needMoreSnapServers() || canExceedPeerLimits(id);
}
private boolean alreadyConnectedOrConnecting(final boolean inbound, final Bytes id) {
final EthPeer ethPeer = activeConnections.get(id);
if (ethPeer != null && !ethPeer.isDisconnected()) {
LOG.atTrace()
.setMessage("not connecting to peer {} - already disconnected")
.addArgument(ethPeer.getLoggableId())
.log();
return false;
return true;
}
final List<PeerConnection> incompleteConnections = getIncompleteConnections(id);
if (!incompleteConnections.isEmpty()) {
if (incompleteConnections.stream()
.anyMatch(c -> !c.isDisconnected() && (!inbound || (inbound && c.inboundInitiated())))) {
LOG.atTrace()
.setMessage("not connecting to peer {} - new connection already in process")
.addArgument(peer.getLoggableId())
.log();
return false;
}
}
return true;
return incompleteConnections.stream()
.anyMatch(c -> !c.isDisconnected() && (!inbound || (inbound && c.inboundInitiated())));
}
public void disconnectWorstUselessPeer() {
streamAvailablePeers()
.filter(p -> !canExceedPeerLimits(p.getId()))
.min(getBestPeerComparator())
.ifPresent(
peer -> {
@ -411,6 +448,23 @@ public class EthPeers {
});
}
public void setChainHeadTracker(final ChainHeadTracker tracker) {
this.tracker = tracker;
}
public void setSnapServerChecker(final SnapServerChecker checker) {
this.snapServerChecker = checker;
}
public void snapServerPeersNeeded(final boolean b) {
this.snapServerPeersNeeded = b;
}
public void setTrailingPeerRequirementsSupplier(
final Supplier<TrailingPeerRequirements> tprSupplier) {
this.trailingPeerRequirementsSupplier = tprSupplier;
}
@FunctionalInterface
public interface ConnectCallback {
void onPeerConnected(EthPeer newPeer);
@ -418,21 +472,108 @@ public class EthPeers {
@Override
public String toString() {
if (completeConnections.isEmpty()) {
if (activeConnections.isEmpty()) {
return "0 EthPeers {}";
}
final String connectionsList =
completeConnections.values().stream()
activeConnections.values().stream()
.sorted()
.map(EthPeer::toString)
.collect(Collectors.joining(", \n"));
return completeConnections.size() + " EthPeers {\n" + connectionsList + '}';
return activeConnections.size() + " EthPeers {\n" + connectionsList + '}';
}
private void ethPeerStatusExchanged(final EthPeer peer) {
if (addPeerToEthPeers(peer)) {
connectedPeersCounter.inc();
connectCallbacks.forEach(cb -> cb.onPeerConnected(peer));
// We have a connection to a peer that is on the right chain and is willing to connect to us.
// Find out what the EthPeer block height is and whether it can serve snap data (if we are doing
// snap sync)
LOG.debug("Peer {} status exchanged", peer);
assert tracker != null : "ChainHeadTracker must be set before EthPeers can be used";
CompletableFuture<BlockHeader> future = tracker.getBestHeaderFromPeer(peer);
future.whenComplete(
(peerHeadBlockHeader, error) -> {
if (peerHeadBlockHeader == null) {
LOG.debug(
"Failed to retrieve chain head info. Disconnecting {}... {}",
peer.getLoggableId(),
error);
peer.disconnect(
DisconnectMessage.DisconnectReason.USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_HEAD);
} else {
// we can check trailing peers now
final TrailingPeerRequirements trailingPeerRequirements =
trailingPeerRequirementsSupplier.get();
if (trailingPeerRequirements != null) {
if (peer.chainState().getEstimatedHeight()
< trailingPeerRequirements.getMinimumHeightToBeUpToDate()) {
if (!(getNumTrailingPeers(trailingPeerRequirements.getMinimumHeightToBeUpToDate())
< trailingPeerRequirements.getMaxTrailingPeers())) {
LOG.atTrace()
.setMessage(
"Adding trailing peer {} would exceed max trailing peers {}. Disconnecting...")
.addArgument(peer.getLoggableId())
.addArgument(trailingPeerRequirements.getMaxTrailingPeers())
.log();
peer.disconnect(
DisconnectMessage.DisconnectReason.USELESS_PEER_EXCEEDS_TRAILING_PEERS);
return;
}
}
}
peer.chainState().updateHeightEstimate(peerHeadBlockHeader.getNumber());
CompletableFuture<Void> isServingSnapFuture;
if (SyncMode.isCheckpointSync(syncMode) || SyncMode.isSnapSync(syncMode)) {
// even if we have finished the snap sync, we still want to know if the peer is a snap
// server
isServingSnapFuture =
CompletableFuture.runAsync(
() -> {
try {
checkIsSnapServer(peer, peerHeadBlockHeader);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} else {
isServingSnapFuture = CompletableFuture.completedFuture(null);
}
isServingSnapFuture.thenRun(
() -> {
if (!peer.getConnection().isDisconnected() && addPeerToEthPeers(peer)) {
connectedPeersCounter.inc();
connectCallbacks.forEach(cb -> cb.onPeerConnected(peer));
}
});
}
});
}
private void checkIsSnapServer(final EthPeer peer, final BlockHeader peersHeadBlockHeader) {
if (peer.getAgreedCapabilities().contains(SnapProtocol.SNAP1)) {
if (snapServerChecker != null) {
// set that peer is a snap server for doing the test
peer.setIsServingSnap(true);
Boolean isServer;
try {
isServer = snapServerChecker.check(peer, peersHeadBlockHeader).get(6L, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.atTrace()
.setMessage("Error checking if peer {} is a snap server. Setting to false.")
.addArgument(peer.getLoggableId())
.log();
peer.setIsServingSnap(false);
return;
}
peer.setIsServingSnap(isServer);
LOG.atTrace()
.setMessage("{}: peer {}")
.addArgument(isServer ? "Is a snap server" : "Is NOT a snap server")
.addArgument(peer.getLoggableId())
.log();
}
}
}
@ -468,7 +609,7 @@ public class EthPeers {
}
private void enforceRemoteConnectionLimits() {
if (!shouldLimitRemoteConnections() || peerCount() < maxRemotelyInitiatedConnections) {
if (!shouldLimitRemoteConnections || peerCount() < maxRemotelyInitiatedConnections) {
// Nothing to do
return;
}
@ -488,7 +629,7 @@ public class EthPeers {
}
private Stream<EthPeer> getActivePrioritizedPeers() {
return completeConnections.values().stream()
return activeConnections.values().stream()
.filter(p -> !p.isDisconnected())
.sorted(this::comparePeerPriorities);
}
@ -512,19 +653,15 @@ public class EthPeers {
});
}
private boolean remoteConnectionLimitReached() {
return shouldLimitRemoteConnections()
&& countUntrustedRemotelyInitiatedConnections() >= maxRemotelyInitiatedConnections;
}
private boolean shouldLimitRemoteConnections() {
return maxRemotelyInitiatedConnections < peerUpperBound;
private boolean inboundInitiatedConnectionLimitExceeded() {
return shouldLimitRemoteConnections
&& countUntrustedRemotelyInitiatedConnections() > maxRemotelyInitiatedConnections;
}
private long countUntrustedRemotelyInitiatedConnections() {
return completeConnections.values().stream()
.map(ep -> ep.getConnection())
.filter(c -> c.inboundInitiated())
return activeConnections.values().stream()
.map(EthPeer::getConnection)
.filter(PeerConnection::inboundInitiated)
.filter(c -> !c.isDisconnected())
.filter(conn -> !canExceedPeerLimits(conn.getPeer().getId()))
.count();
@ -534,67 +671,123 @@ public class EthPeers {
final RemovalNotification<PeerConnection, EthPeer> removalNotification) {
if (removalNotification.wasEvicted()) {
final PeerConnection peerConnectionRemoved = removalNotification.getKey();
final PeerConnection peerConnectionOfEthPeer = removalNotification.getValue().getConnection();
if (!peerConnectionRemoved.equals(peerConnectionOfEthPeer)) {
// If this connection is not the connection of the EthPeer by now we can disconnect
peerConnectionRemoved.disconnect(DisconnectMessage.DisconnectReason.ALREADY_CONNECTED);
final EthPeer peer = removalNotification.getValue();
if (peer == null) {
return;
}
final PeerConnection peerConnectionOfEthPeer = peer.getConnection();
if (peerConnectionRemoved != null) {
if (!peerConnectionRemoved.equals(peerConnectionOfEthPeer)) {
// If this connection is not the connection of the EthPeer by now we can disconnect
peerConnectionRemoved.disconnect(DisconnectMessage.DisconnectReason.ALREADY_CONNECTED);
}
}
}
}
private boolean addPeerToEthPeers(final EthPeer peer) {
boolean addPeerToEthPeers(final EthPeer peer) {
// We have a connection to a peer that is on the right chain and is willing to connect to us.
// Figure out whether we want to keep this peer and add it to the EthPeers connections.
if (completeConnections.containsValue(peer)) {
// Figure out whether we want to add it to the active connections.
final PeerConnection connection = peer.getConnection();
if (activeConnections.containsValue(peer)) {
return false;
}
final PeerConnection connection = peer.getConnection();
final Bytes id = peer.getId();
if (!randomPeerPriority) {
// Disconnect if too many peers
if (!canExceedPeerLimits(id) && peerCount() >= peerUpperBound) {
LOG.atTrace()
.setMessage("Too many peers. Disconnect connection: {}, max connections {}")
.addArgument(connection)
.addArgument(peerUpperBound)
.log();
connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
return false;
}
// Disconnect if too many remotely-initiated connections
if (connection.inboundInitiated()
&& !canExceedPeerLimits(id)
&& remoteConnectionLimitReached()) {
LOG.atTrace()
.setMessage(
"Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}")
.addArgument(connection)
.addArgument(maxRemotelyInitiatedConnections)
.log();
connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
return false;
if (peerCount() >= peerUpperBound) {
final long numSnapServers = numberOfSnapServers();
final boolean inboundLimitExceeded = inboundInitiatedConnectionLimitExceeded();
// three reasons why we would disconnect an existing peer to accommodate the new peer
if (canExceedPeerLimits(id)
|| (snapServerPeersNeeded
&& numSnapServers < snapServerTargetNumber
&& peer.isServingSnap())
|| (inboundLimitExceeded && !peer.getConnection().inboundInitiated())) {
final boolean filterOutSnapServers =
snapServerPeersNeeded && (numSnapServers <= snapServerTargetNumber);
// find and disconnect the least useful peer we can disconnect
activeConnections.values().stream()
.filter(p -> !canExceedPeerLimits(p.getId()))
.filter(filterOutSnapServers ? p -> !p.isServingSnap() : p -> true)
.filter(inboundLimitExceeded ? p -> p.getConnection().inboundInitiated() : p -> true)
.min(MOST_USEFUL_PEER)
.ifPresentOrElse(
pe -> {
pe.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
LOG.atTrace()
.setMessage("Disconnecting peer {} to be replaced by prioritised peer {}")
.addArgument(pe.getLoggableId())
.addArgument(peer.getLoggableId())
.log();
},
() -> // disconnect the least useful peer
activeConnections.values().stream()
.filter(p -> !canExceedPeerLimits(p.getId()))
.min(MOST_USEFUL_PEER)
.ifPresent(
p -> {
p.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
LOG.atTrace()
.setMessage(
"Disconnecting peer {} to be replaced by prioritised peer {}")
.addArgument(p.getLoggableId())
.addArgument(peer.getLoggableId())
.log();
}));
} else {
LOG.atTrace()
.setMessage(
"Too many peers. Disconnect peer {} with connection: {}, max connections {}")
.addArgument(peer.getLoggableId())
.addArgument(connection)
.addArgument(peerUpperBound)
.log();
connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
return false;
}
}
final boolean added = (completeConnections.putIfAbsent(id, peer) == null);
final boolean added = (activeConnections.putIfAbsent(id, peer) == null);
if (added) {
LOG.atTrace()
.setMessage("Added peer {} with connection {} to completeConnections")
.setMessage("Added peer {} with connection {} to activeConnections")
.addArgument(id)
.addArgument(connection)
.log();
} else {
LOG.atTrace()
.setMessage("Did not add peer {} with connection {} to completeConnections")
.setMessage("Did not add peer {} with connection {} to activeConnections")
.addArgument(id)
.addArgument(connection)
.log();
}
return added;
} else {
// randomPeerPriority! Add the peer and if there are too many connections fix it
completeConnections.putIfAbsent(id, peer);
// TODO: random peer priority does not care yet about snap server peers -> check later
activeConnections.putIfAbsent(id, peer);
enforceRemoteConnectionLimits();
enforceConnectionLimits();
return completeConnections.containsKey(id);
return activeConnections.containsKey(id);
}
}
private long getNumTrailingPeers(final long minimumHeightToBeUpToDate) {
return streamAvailablePeers()
.filter(p -> p.chainState().getEstimatedHeight() < minimumHeightToBeUpToDate)
.count();
}
private boolean needMoreSnapServers() {
return snapServerPeersNeeded && numberOfSnapServers() < snapServerTargetNumber;
}
private long numberOfSnapServers() {
return activeConnections.values().stream().filter(EthPeer::isServingSnap).count();
}
}

@ -34,7 +34,6 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
@ -161,41 +160,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()));
}
public EthProtocolManager(
final Blockchain blockchain,
final BigInteger networkId,
final WorldStateArchive worldStateArchive,
final TransactionPool transactionPool,
final EthProtocolConfiguration ethereumWireProtocolConfiguration,
final EthPeers ethPeers,
final EthMessages ethMessages,
final EthContext ethContext,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler,
final List<Long> blockNumberForks,
final List<Long> timestampForks) {
this(
blockchain,
networkId,
worldStateArchive,
transactionPool,
ethereumWireProtocolConfiguration,
ethPeers,
ethMessages,
ethContext,
peerValidators,
mergePeerFilter,
synchronizerConfiguration,
scheduler,
new ForkIdManager(
blockchain,
blockNumberForks,
timestampForks,
ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()));
}
public EthContext ethContext() {
return ethContext;
}
@ -398,28 +362,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
LOG.atTrace().setMessage("{}").addArgument(ethPeers::toString).log();
}
@Override
public boolean shouldConnect(final Peer peer, final boolean incoming) {
if (peer.getForkId().map(forkIdManager::peerCheck).orElse(true)) {
LOG.atDebug()
.setMessage("ForkId OK or not available for peer {}")
.addArgument(peer::getLoggableId)
.log();
if (ethPeers.shouldConnect(peer, incoming)) {
return true;
}
} else {
LOG.atDebug()
.setMessage("ForkId check failed for peer {} our fork id {} theirs {}")
.addArgument(peer::getLoggableId)
.addArgument(forkIdManager.getForkIdForChainHead())
.addArgument(peer.getForkId())
.log();
return false;
}
return false;
}
@Override
public void handleDisconnect(
final PeerConnection connection,
@ -427,11 +369,10 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final boolean initiatedByPeer) {
final boolean wasActiveConnection = ethPeers.registerDisconnect(connection);
LOG.atDebug()
.setMessage("Disconnect - active Connection? {} - {} - {} {} - {} {} - {} peers left")
.setMessage("Disconnect - active Connection? {} - {} - {} - {} {} - {} peers left")
.addArgument(wasActiveConnection)
.addArgument(initiatedByPeer ? "Inbound" : "Outbound")
.addArgument(reason::getValue)
.addArgument(reason::name)
.addArgument(reason::toString)
.addArgument(() -> connection.getPeer().getLoggableId())
.addArgument(() -> connection.getPeerInfo().getClientId())
.addArgument(ethPeers::peerCount)

@ -19,4 +19,8 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNot
public interface PeerRequest {
ResponseStream sendRequest(EthPeer peer) throws PeerNotConnected;
default boolean isEthPeerSuitable(final EthPeer ethPeer) {
return true;
}
}

@ -86,6 +86,7 @@ public class PendingPeerRequest {
: ethPeers
.streamAvailablePeers()
.filter(peer -> peer.chainState().getEstimatedHeight() >= minimumBlockNumber)
.filter(request::isEthPeerSuitable)
.min(EthPeers.LEAST_TO_MOST_BUSY);
}

@ -17,10 +17,13 @@ package org.hyperledger.besu.ethereum.eth.manager.snap;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.PeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerRequestTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -64,14 +67,34 @@ public class GetAccountRangeFromPeerTask
@Override
protected PendingPeerRequest sendRequest() {
return sendRequestToPeer(
peer -> {
LOG.trace(
"Requesting account range [{} ,{}] for state root {} from peer {} .",
startKeyHash,
endKeyHash,
blockHeader.getStateRoot(),
peer);
return peer.getSnapAccountRange(blockHeader.getStateRoot(), startKeyHash, endKeyHash);
new PeerRequest() {
@Override
public RequestManager.ResponseStream sendRequest(final EthPeer peer)
throws PeerConnection.PeerNotConnected {
LOG.atTrace()
.setMessage("Requesting account range [{} ,{}] for state root {} from peer {} .")
.addArgument(startKeyHash)
.addArgument(endKeyHash)
.addArgument(blockHeader)
.addArgument(peer)
.log();
if (!peer.isServingSnap()) {
LOG.atDebug()
.setMessage("EthPeer that is not serving snap called in {}, peer: {}")
.addArgument(GetAccountRangeFromPeerTask.class)
.addArgument(peer)
.log();
throw new RuntimeException(
"EthPeer that is not serving snap called in "
+ GetAccountRangeFromPeerTask.class);
}
return peer.getSnapAccountRange(blockHeader.getStateRoot(), startKeyHash, endKeyHash);
}
@Override
public boolean isEthPeerSuitable(final EthPeer ethPeer) {
return ethPeer.isServingSnap();
}
},
blockHeader.getNumber());
}

@ -21,10 +21,13 @@ import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.PeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerRequestTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.ByteCodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -66,9 +69,32 @@ public class GetBytecodeFromPeerTask extends AbstractPeerRequestTask<Map<Bytes32
@Override
protected PendingPeerRequest sendRequest() {
return sendRequestToPeer(
peer -> {
LOG.trace("Requesting {} Bytecodes from {} .", codeHashes.size(), peer);
return peer.getSnapBytecode(blockHeader.getStateRoot(), codeHashes);
new PeerRequest() {
@Override
public RequestManager.ResponseStream sendRequest(final EthPeer peer)
throws PeerConnection.PeerNotConnected {
LOG.atTrace()
.setMessage("Requesting {} Bytecodes from {} .")
.addArgument(codeHashes.size())
.addArgument(peer)
.log();
if (!peer.isServingSnap()) {
LOG.atDebug()
.setMessage("EthPeer that is not serving snap called in {}, peer: {}")
.addArgument(GetAccountRangeFromPeerTask.class)
.addArgument(peer)
.log();
throw new RuntimeException(
"EthPeer that is not serving snap called in "
+ GetAccountRangeFromPeerTask.class);
}
return peer.getSnapBytecode(blockHeader.getStateRoot(), codeHashes);
}
@Override
public boolean isEthPeerSuitable(final EthPeer ethPeer) {
return ethPeer.isServingSnap();
}
},
blockHeader.getNumber());
}

@ -17,10 +17,13 @@ package org.hyperledger.besu.ethereum.eth.manager.snap;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.PeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerRequestTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -69,15 +72,35 @@ public class GetStorageRangeFromPeerTask
@Override
protected PendingPeerRequest sendRequest() {
return sendRequestToPeer(
peer -> {
LOG.trace(
"Requesting storage range [{} ,{}] for {} accounts from peer {} .",
startKeyHash,
endKeyHash,
accountHashes.size(),
peer);
return peer.getSnapStorageRange(
blockHeader.getStateRoot(), accountHashes, startKeyHash, endKeyHash);
new PeerRequest() {
@Override
public RequestManager.ResponseStream sendRequest(final EthPeer peer)
throws PeerConnection.PeerNotConnected {
LOG.atTrace()
.setMessage("Requesting storage range [{} ,{}] for {} accounts from peer {} .")
.addArgument(startKeyHash)
.addArgument(endKeyHash)
.addArgument(accountHashes.size())
.addArgument(peer)
.log();
if (!peer.isServingSnap()) {
LOG.atDebug()
.setMessage("EthPeer that is not serving snap called in {}, peer: {}")
.addArgument(GetAccountRangeFromPeerTask.class)
.addArgument(peer)
.log();
throw new RuntimeException(
"EthPeer that is not serving snap called in "
+ GetAccountRangeFromPeerTask.class);
}
return peer.getSnapStorageRange(
blockHeader.getStateRoot(), accountHashes, startKeyHash, endKeyHash);
}
@Override
public boolean isEthPeerSuitable(final EthPeer ethPeer) {
return ethPeer.isServingSnap();
}
},
blockHeader.getNumber());
}

@ -20,10 +20,13 @@ import static org.slf4j.LoggerFactory.getLogger;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.PeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerRequestTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodesMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -72,9 +75,31 @@ public class GetTrieNodeFromPeerTask extends AbstractPeerRequestTask<Map<Bytes,
@Override
protected PendingPeerRequest sendRequest() {
return sendRequestToPeer(
peer -> {
LOG.trace("Requesting {} trie nodes from peer {}", paths.size(), peer);
return peer.getSnapTrieNode(blockHeader.getStateRoot(), paths);
new PeerRequest() {
@Override
public RequestManager.ResponseStream sendRequest(final EthPeer peer)
throws PeerConnection.PeerNotConnected {
LOG.atTrace()
.setMessage("Requesting {} trie nodes from peer {}")
.addArgument(paths.size())
.addArgument(peer)
.log();
if (!peer.isServingSnap()) {
LOG.debug(
"EthPeer that is not serving snap called in {}, {}",
GetAccountRangeFromPeerTask.class,
peer);
throw new RuntimeException(
"EthPeer that is not serving snap called in "
+ GetAccountRangeFromPeerTask.class);
}
return peer.getSnapTrieNode(blockHeader.getStateRoot(), paths);
}
@Override
public boolean isEthPeerSuitable(final EthPeer ethPeer) {
return ethPeer.isServingSnap();
}
},
blockHeader.getNumber());
}

@ -31,6 +31,7 @@ public class RetryingGetAccountRangeFromPeerTask
extends AbstractRetryingPeerTask<AccountRangeMessage.AccountRangeData> {
public static final int MAX_RETRIES = 4;
private final EthContext ethContext;
private final Bytes32 startKeyHash;
private final Bytes32 endKeyHash;
@ -79,4 +80,9 @@ public class RetryingGetAccountRangeFromPeerTask
return peerResult.getResult();
});
}
@Override
protected boolean isSuitablePeer(final EthPeer peer) {
return peer.isServingSnap();
}
}

@ -69,4 +69,9 @@ public class RetryingGetBytecodeFromPeerTask extends AbstractRetryingPeerTask<Ma
return peerResult.getResult();
});
}
@Override
protected boolean isSuitablePeer(final EthPeer peer) {
return peer.isServingSnap();
}
}

@ -79,4 +79,9 @@ public class RetryingGetStorageRangeFromPeerTask
return peerResult.getResult();
});
}
@Override
protected boolean isSuitablePeer(final EthPeer peer) {
return peer.isServingSnap();
}
}

@ -68,4 +68,9 @@ public class RetryingGetTrieNodeFromPeerTask extends AbstractRetryingPeerTask<Ma
return peerResult.getResult();
});
}
@Override
protected boolean isSuitablePeer(final EthPeer peer) {
return peer.isServingSnap();
}
}

@ -22,7 +22,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncConfiguration;
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractSnapMessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
@ -143,11 +142,6 @@ public class SnapProtocolManager implements ProtocolManager {
@Override
public void handleNewConnection(final PeerConnection connection) {}
@Override
public boolean shouldConnect(final Peer peer, final boolean incoming) {
return false; // EthManager is taking care of this for now
}
@Override
public void handleDisconnect(
final PeerConnection connection,

@ -67,8 +67,20 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
this.metricsSystem = metricsSystem;
}
public void assignPeer(final EthPeer peer) {
assignedPeer = Optional.of(peer);
/**
* Assign the peer to be used for the task.
*
* @param peer The peer to assign to the task.
* @return True if the peer was assigned, false otherwise.
*/
public boolean assignPeer(final EthPeer peer) {
if (isSuitablePeer(peer)) {
assignedPeer = Optional.of(peer);
return true;
} else {
assignedPeer = Optional.empty();
return false;
}
}
public Optional<EthPeer> getAssignedPeer() {
@ -167,4 +179,8 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
public int getMaxRetries() {
return maxRetries;
}
protected boolean isSuitablePeer(final EthPeer peer) {
return true;
}
}

@ -49,9 +49,12 @@ public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetry
}
@Override
public void assignPeer(final EthPeer peer) {
super.assignPeer(peer);
triedPeers.add(peer);
public boolean assignPeer(final EthPeer peer) {
if (super.assignPeer(peer)) {
triedPeers.add(peer);
return true;
}
return false;
}
protected abstract CompletableFuture<T> executeTaskOnCurrentPeer(final EthPeer peer);
@ -62,8 +65,7 @@ public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetry
final Optional<EthPeer> maybePeer =
assignedPeer
.filter(u -> getRetryCount() == 1) // first try with the assigned peer if present
.map(Optional::of)
.orElseGet(this::selectNextPeer); // otherwise, select a new one from the pool
.or(this::selectNextPeer); // otherwise select a new one from the pool
if (maybePeer.isEmpty()) {
LOG.atTrace()
@ -101,7 +103,7 @@ public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetry
@Override
protected void handleTaskError(final Throwable error) {
if (isPeerFailure(error)) {
getAssignedPeer().ifPresent(peer -> failedPeers.add(peer));
getAssignedPeer().ifPresent(failedPeers::add);
}
super.handleTaskError(error);
}
@ -124,10 +126,11 @@ public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetry
return maybeNextPeer;
}
private Stream<EthPeer> remainingPeersToTry() {
protected Stream<EthPeer> remainingPeersToTry() {
return getEthContext()
.getEthPeers()
.streamBestPeers()
.filter(this::isSuitablePeer)
.filter(peer -> !triedPeers.contains(peer));
}

@ -19,24 +19,25 @@ import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers.ConnectCallback;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ChainHeadTracker implements ConnectCallback {
public class ChainHeadTracker {
private static final Logger LOG = LoggerFactory.getLogger(ChainHeadTracker.class);
private final EthContext ethContext;
private final ProtocolSchedule protocolSchedule;
private final TrailingPeerLimiter trailingPeerLimiter;
private final MetricsSystem metricsSystem;
public ChainHeadTracker(
@ -46,7 +47,6 @@ public class ChainHeadTracker implements ConnectCallback {
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.protocolSchedule = protocolSchedule;
this.trailingPeerLimiter = trailingPeerLimiter;
this.metricsSystem = metricsSystem;
}
@ -60,48 +60,53 @@ public class ChainHeadTracker implements ConnectCallback {
new TrailingPeerLimiter(ethContext.getEthPeers(), trailingPeerRequirementsCalculator);
final ChainHeadTracker tracker =
new ChainHeadTracker(ethContext, protocolSchedule, trailingPeerLimiter, metricsSystem);
ethContext.getEthPeers().subscribeConnect(tracker);
ethContext.getEthPeers().setChainHeadTracker(tracker);
blockchain.observeBlockAdded(trailingPeerLimiter);
}
@Override
public void onPeerConnected(final EthPeer peer) {
public CompletableFuture<BlockHeader> getBestHeaderFromPeer(final EthPeer peer) {
LOG.atDebug()
.setMessage("Requesting chain head info from {}...")
.addArgument(peer::getLoggableId)
.log();
GetHeadersFromPeerByHashTask.forSingleHash(
final CompletableFuture<AbstractPeerTask.PeerTaskResult<List<BlockHeader>>>
bestHeaderFromPeerCompletableFuture = getBestHeaderFromPeerCompletableFuture(peer);
final CompletableFuture<BlockHeader> future = new CompletableFuture<>();
bestHeaderFromPeerCompletableFuture.whenComplete(
(peerResult, error) -> {
if (peerResult != null && !peerResult.getResult().isEmpty()) {
final BlockHeader chainHeadHeader = peerResult.getResult().get(0);
peer.chainState().update(chainHeadHeader);
future.complete(chainHeadHeader);
LOG.atDebug()
.setMessage("Retrieved chain head info {} from {}...")
.addArgument(
() -> chainHeadHeader.getNumber() + " (" + chainHeadHeader.getBlockHash() + ")")
.addArgument(peer::getLoggableId)
.log();
} else {
LOG.atDebug()
.setMessage("Failed to retrieve chain head info. Disconnecting {}... {}")
.addArgument(peer::getLoggableId)
.addArgument(error != null ? error : "Empty Response")
.log();
peer.disconnect(
DisconnectMessage.DisconnectReason.USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_HEAD);
future.complete(null);
}
});
return future;
}
public CompletableFuture<AbstractPeerTask.PeerTaskResult<List<BlockHeader>>>
getBestHeaderFromPeerCompletableFuture(final EthPeer peer) {
return GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule,
ethContext,
Hash.wrap(peer.chainState().getBestBlock().getHash()),
0,
metricsSystem)
.assignPeer(peer)
.run()
.whenComplete(
(peerResult, error) -> {
if (peerResult != null && !peerResult.getResult().isEmpty()) {
final BlockHeader chainHeadHeader = peerResult.getResult().get(0);
peer.chainState().update(chainHeadHeader);
trailingPeerLimiter.enforceTrailingPeerLimit();
LOG.atDebug()
.setMessage("Retrieved chain head info {} from {}...")
.addArgument(
() ->
chainHeadHeader.getNumber()
+ " ("
+ chainHeadHeader.getBlockHash()
+ ")")
.addArgument(peer::getLoggableId)
.log();
} else {
LOG.atDebug()
.setMessage("Failed to retrieve chain head info. Disconnecting {}... {}")
.addArgument(peer::getLoggableId)
.addArgument(error)
.log();
peer.disconnect(DisconnectReason.USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_STATE);
}
});
.run();
}
}

@ -99,6 +99,11 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
this::calculateTrailingPeerRequirements,
metricsSystem);
if (SyncMode.isSnapSync(syncConfig.getSyncMode())
|| SyncMode.isCheckpointSync(syncConfig.getSyncMode())) {
SnapServerChecker.createAndSetSnapServerChecker(ethContext, metricsSystem);
}
this.blockPropagationManager =
terminationCondition.shouldStopDownload()
? Optional.empty()
@ -187,7 +192,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
() -> getSyncStatus().isPresent() ? 0 : 1);
}
private TrailingPeerRequirements calculateTrailingPeerRequirements() {
public TrailingPeerRequirements calculateTrailingPeerRequirements() {
return fastSyncDownloader
.flatMap(FastSyncDownloader::calculateTrailingPeerRequirements)
.orElse(

@ -0,0 +1,86 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.snap.GetAccountRangeFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SnapServerChecker {
private static final Logger LOG = LoggerFactory.getLogger(SnapServerChecker.class);
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
public SnapServerChecker(final EthContext ethContext, final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
}
public static void createAndSetSnapServerChecker(
final EthContext ethContext, final MetricsSystem metricsSystem) {
final SnapServerChecker checker = new SnapServerChecker(ethContext, metricsSystem);
ethContext.getEthPeers().setSnapServerChecker(checker);
}
public CompletableFuture<Boolean> check(final EthPeer peer, final BlockHeader peersHeadHeader) {
LOG.atTrace()
.setMessage("Checking whether peer {} is a snap server ...")
.addArgument(peer::getLoggableId)
.log();
final CompletableFuture<AbstractPeerTask.PeerTaskResult<AccountRangeMessage.AccountRangeData>>
snapServerCheckCompletableFuture = getAccountRangeFromPeer(peer, peersHeadHeader);
final CompletableFuture<Boolean> future = new CompletableFuture<>();
snapServerCheckCompletableFuture.whenComplete(
(peerResult, error) -> {
if (peerResult != null) {
if (!peerResult.getResult().accounts().isEmpty()
|| !peerResult.getResult().proofs().isEmpty()) {
LOG.atTrace()
.setMessage("Peer {} is a snap server.")
.addArgument(peer::getLoggableId)
.log();
future.complete(true);
} else {
LOG.atTrace()
.setMessage("Peer {} is not a snap server.")
.addArgument(peer::getLoggableId)
.log();
future.complete(false);
}
}
});
return future;
}
public CompletableFuture<AbstractPeerTask.PeerTaskResult<AccountRangeMessage.AccountRangeData>>
getAccountRangeFromPeer(final EthPeer peer, final BlockHeader header) {
return GetAccountRangeFromPeerTask.forAccountRange(
ethContext, Hash.ZERO, Hash.ZERO, header, metricsSystem)
.assignPeer(peer)
.run();
}
}

@ -54,4 +54,8 @@ public enum SyncMode {
public static boolean isCheckpointSync(final SyncMode syncMode) {
return X_CHECKPOINT.equals(syncMode) || CHECKPOINT.equals(syncMode);
}
public static boolean isSnapSync(final SyncMode syncMode) {
return X_SNAP.equals(syncMode) || SNAP.equals(syncMode);
}
}

@ -42,6 +42,9 @@ import org.slf4j.LoggerFactory;
public class SyncTargetManager extends AbstractSyncTargetManager {
private static final Logger LOG = LoggerFactory.getLogger(SyncTargetManager.class);
private static final int LOG_DEBUG_REPEAT_DELAY = 15;
private static final int LOG_INFO_REPEAT_DELAY = 120;
private static final int SECONDS_PER_REQUEST = 6; // 5s per request + 1s wait between retries
private final WorldStateStorageCoordinator worldStateStorageCoordinator;
@ -52,8 +55,6 @@ public class SyncTargetManager extends AbstractSyncTargetManager {
private final FastSyncState fastSyncState;
private final AtomicBoolean logDebug = new AtomicBoolean(true);
private final AtomicBoolean logInfo = new AtomicBoolean(true);
private final int logDebugRepeatDelay = 15;
private final int logInfoRepeatDelay = 120;
public SyncTargetManager(
final SynchronizerConfiguration config,
@ -84,14 +85,14 @@ public class SyncTargetManager extends AbstractSyncTargetManager {
"Unable to find sync target. Currently checking %d peers for usefulness. Pivot block: %d",
ethContext.getEthPeers().peerCount(), pivotBlockHeader.getNumber()),
logDebug,
logDebugRepeatDelay);
LOG_DEBUG_REPEAT_DELAY);
throttledLog(
LOG::info,
String.format(
"Unable to find sync target. Currently checking %d peers for usefulness.",
ethContext.getEthPeers().peerCount()),
logInfo,
logInfoRepeatDelay);
LOG_INFO_REPEAT_DELAY);
return completedFuture(Optional.empty());
} else {
final EthPeer bestPeer = maybeBestPeer.get();

@ -21,15 +21,18 @@ import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.PeerDisconnectedException;
import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
@ -37,6 +40,7 @@ import java.util.Collections;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
@ -56,6 +60,11 @@ public class EthPeersTest {
when(peerRequest.sendRequest(any())).thenReturn(responseStream);
ethProtocolManager = EthProtocolManagerTestUtil.create();
ethPeers = ethProtocolManager.ethContext().getEthPeers();
final ChainHeadTracker mock = mock(ChainHeadTracker.class);
final BlockHeader blockHeader = mock(BlockHeader.class);
when(mock.getBestHeaderFromPeer(any()))
.thenReturn(CompletableFuture.completedFuture(blockHeader));
ethPeers.setChainHeadTracker(mock);
}
@Test
@ -112,6 +121,9 @@ public class EthPeersTest {
@Test
public void shouldExecutePeerRequestImmediatelyWhenPeerIsAvailable() throws Exception {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
when(peerRequest.isEthPeerSuitable(peer.getEthPeer())).thenReturn(true);
final PendingPeerRequest pendingRequest =
ethPeers.executePeerRequest(peerRequest, 10, Optional.empty());
@ -127,6 +139,8 @@ public class EthPeersTest {
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
useRequestSlot(workingPeer.getEthPeer());
when(peerRequest.isEthPeerSuitable(any())).thenReturn(true);
final PendingPeerRequest pendingRequest =
ethPeers.executePeerRequest(peerRequest, 10, Optional.empty());
@ -147,6 +161,8 @@ public class EthPeersTest {
assertThat(leastRecentlyUsedPeer.getEthPeer().outstandingRequests())
.isEqualTo(mostRecentlyUsedPeer.getEthPeer().outstandingRequests());
when(peerRequest.isEthPeerSuitable(any())).thenReturn(true);
final PendingPeerRequest pendingRequest =
ethPeers.executePeerRequest(peerRequest, 10, Optional.empty());
@ -180,10 +196,13 @@ public class EthPeersTest {
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
useAllAvailableCapacity(suitablePeer.getEthPeer());
when(peerRequest.isEthPeerSuitable(suitablePeer.getEthPeer())).thenReturn(true);
final PendingPeerRequest pendingRequest =
ethPeers.executePeerRequest(peerRequest, 200, Optional.empty());
verifyNoInteractions(peerRequest);
verify(peerRequest, times(0)).sendRequest(suitablePeer.getEthPeer());
assertNotDone(pendingRequest);
suitablePeer.disconnect(DisconnectReason.TOO_MANY_PEERS);
@ -194,6 +213,8 @@ public class EthPeersTest {
public void shouldFailWithPeerNotConnectedIfPeerRequestThrows() throws Exception {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
when(peerRequest.sendRequest(peer.getEthPeer())).thenThrow(new PeerNotConnected("Oh dear"));
when(peerRequest.isEthPeerSuitable(any())).thenReturn(true);
final PendingPeerRequest pendingRequest =
ethPeers.executePeerRequest(peerRequest, 100, Optional.empty());
@ -205,9 +226,11 @@ public class EthPeersTest {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
useAllAvailableCapacity(peer.getEthPeer());
when(peerRequest.isEthPeerSuitable(any())).thenReturn(true);
final PendingPeerRequest pendingRequest =
ethPeers.executePeerRequest(peerRequest, 100, Optional.empty());
verifyNoInteractions(peerRequest);
verify(peerRequest, times(0)).sendRequest(peer.getEthPeer());
freeUpCapacity(peer.getEthPeer());
@ -221,11 +244,12 @@ public class EthPeersTest {
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 10);
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
when(peerRequest.isEthPeerSuitable(peer.getEthPeer())).thenReturn(true);
useAllAvailableCapacity(peer.getEthPeer());
final PendingPeerRequest pendingRequest =
ethPeers.executePeerRequest(peerRequest, 100, Optional.empty());
verifyNoInteractions(peerRequest);
verify(peerRequest, times(0)).sendRequest(peer.getEthPeer());
freeUpCapacity(peer.getEthPeer());
@ -238,15 +262,17 @@ public class EthPeersTest {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
useAllAvailableCapacity(peer.getEthPeer());
when(peerRequest.isEthPeerSuitable(peer.getEthPeer())).thenReturn(true);
final PendingPeerRequest pendingRequest =
ethPeers.executePeerRequest(peerRequest, 100, Optional.empty());
verifyNoInteractions(peerRequest);
verify(peerRequest, times(0)).sendRequest(peer.getEthPeer());
pendingRequest.abort();
freeUpCapacity(peer.getEthPeer());
verifyNoInteractions(peerRequest);
verify(peerRequest, times(0)).sendRequest(peer.getEthPeer());
assertRequestFailure(pendingRequest, CancellationException.class);
}
@ -349,6 +375,59 @@ public class EthPeersTest {
assertThat(ethPeers.toString()).contains(peerA.getLoggableId());
}
@Test
public void snapServersPreferredWhileSyncing() {
ethPeers.snapServerPeersNeeded(true);
while (ethPeers.peerCount() < ethPeers.getMaxPeers()) {
final EthPeer ethPeer =
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, Difficulty.of(50), 20, false, false)
.getEthPeer();
assertThat(ethPeers.addPeerToEthPeers(ethPeer)).isTrue();
}
final EthPeer nonSnapServingPeer =
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, Difficulty.of(50), 20, false, false)
.getEthPeer();
assertThat(ethPeers.addPeerToEthPeers(nonSnapServingPeer)).isFalse();
assertThat(nonSnapServingPeer.getConnection().isDisconnected()).isTrue();
final EthPeer snapServingPeer =
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, Difficulty.of(50), 20, true, false)
.getEthPeer();
assertThat(ethPeers.addPeerToEthPeers(snapServingPeer)).isTrue();
assertThat(ethPeers.peerCount()).isEqualTo(ethPeers.getMaxPeers());
}
@Test
public void snapServersNotPreferredWhenInSync() {
ethPeers.snapServerPeersNeeded(false);
while (ethPeers.peerCount() < ethPeers.getMaxPeers()) {
final EthPeer ethPeer =
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, Difficulty.of(50), 20, false, false)
.getEthPeer();
assertThat(ethPeers.addPeerToEthPeers(ethPeer)).isTrue();
}
final EthPeer snapServingPeer =
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, Difficulty.of(50), 20, true, false)
.getEthPeer();
assertThat(ethPeers.addPeerToEthPeers(snapServingPeer)).isFalse();
assertThat(snapServingPeer.getConnection().isDisconnected()).isTrue();
assertThat(ethPeers.peerCount()).isEqualTo(ethPeers.getMaxPeers());
}
private void freeUpCapacity(final EthPeer ethPeer) {
ethPeers.dispatchMessage(ethPeer, new EthMessage(ethPeer, NodeDataMessage.create(emptyList())));
}

@ -528,7 +528,7 @@ public final class EthProtocolManagerTest {
private MockPeerConnection setupPeer(
final EthProtocolManager ethManager, final PeerSendHandler onSend) {
final MockPeerConnection peer = setupPeerWithoutStatusExchange(ethManager, onSend);
final MockPeerConnection peerConnection = setupPeerWithoutStatusExchange(ethManager, onSend);
final StatusMessage statusMessage =
StatusMessage.create(
EthProtocolVersion.V63,
@ -536,8 +536,11 @@ public final class EthProtocolManagerTest {
blockchain.getChainHead().getTotalDifficulty(),
blockchain.getChainHeadHash(),
blockchain.getBlockHeader(BlockHeader.GENESIS_BLOCK_NUMBER).get().getHash());
ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(peer, statusMessage));
return peer;
ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(peerConnection, statusMessage));
final EthPeers ethPeers = ethManager.ethContext().getEthPeers();
final EthPeer ethPeer = ethPeers.peer(peerConnection);
ethPeers.addPeerToEthPeers(ethPeer);
return peerConnection;
}
private MockPeerConnection setupPeerWithoutStatusExchange(

@ -16,12 +16,15 @@ package org.hyperledger.besu.ethereum.eth.manager;
import static com.google.common.base.Preconditions.checkArgument;
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.ChainHead;
import org.hyperledger.besu.ethereum.chain.GenesisState;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture;
@ -29,6 +32,8 @@ import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
@ -46,8 +51,10 @@ import java.math.BigInteger;
import java.util.Collections;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import org.apache.tuweni.bytes.Bytes;
import org.mockito.Mockito;
public class EthProtocolManagerTestUtil {
@ -86,7 +93,13 @@ public class EthProtocolManagerTestUtil {
Bytes.random(64),
25,
25,
false);
false,
SyncMode.FAST,
new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false));
final ChainHeadTracker chainHeadTrackerMock = getChainHeadTrackerMock();
peers.setChainHeadTracker(chainHeadTrackerMock);
final EthMessages messages = new EthMessages();
final EthScheduler ethScheduler = new DeterministicEthScheduler(TimeoutPolicy.NEVER_TIMEOUT);
final EthContext ethContext = new EthContext(peers, messages, ethScheduler);
@ -139,6 +152,8 @@ public class EthProtocolManagerTestUtil {
final EthContext ethContext,
final ForkIdManager forkIdManager) {
ethPeers.setChainHeadTracker(getChainHeadTrackerMock());
final BigInteger networkId = BigInteger.ONE;
return new EthProtocolManager(
blockchain,
@ -205,9 +220,15 @@ public class EthProtocolManagerTestUtil {
Bytes.random(64),
25,
25,
false);
false,
SyncMode.FAST,
new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false));
final EthMessages messages = new EthMessages();
final ChainHeadTracker chtMock = getChainHeadTrackerMock();
peers.setChainHeadTracker(chtMock);
return create(
blockchain,
ethScheduler,
@ -219,6 +240,17 @@ public class EthProtocolManagerTestUtil {
new EthContext(peers, messages, ethScheduler));
}
public static ChainHeadTracker getChainHeadTrackerMock() {
final ChainHeadTracker chtMock = mock(ChainHeadTracker.class);
final BlockHeader blockHeaderMock = mock(BlockHeader.class);
Mockito.lenient()
.when(chtMock.getBestHeaderFromPeer(any()))
.thenReturn(CompletableFuture.completedFuture(blockHeaderMock));
Mockito.lenient().when(blockHeaderMock.getNumber()).thenReturn(0L);
Mockito.lenient().when(blockHeaderMock.getStateRoot()).thenReturn(Hash.ZERO);
return chtMock;
}
public static EthProtocolManager create(
final ProtocolSchedule protocolSchedule,
final Blockchain blockchain,
@ -239,7 +271,9 @@ public class EthProtocolManagerTestUtil {
Bytes.random(64),
25,
25,
false);
false,
SyncMode.FAST,
new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false));
final EthMessages messages = new EthMessages();
return create(
@ -258,7 +292,7 @@ public class EthProtocolManagerTestUtil {
final ProtocolSchedule protocolSchedule,
final Blockchain blockchain,
final EthScheduler ethScheduler) {
final EthPeers peers =
final EthPeers ethPeers =
new EthPeers(
EthProtocol.NAME,
() -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader()),
@ -269,7 +303,13 @@ public class EthProtocolManagerTestUtil {
Bytes.random(64),
25,
25,
false);
false,
SyncMode.FAST,
new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false));
final ChainHeadTracker chainHeadTrackerMock = getChainHeadTrackerMock();
ethPeers.setChainHeadTracker(chainHeadTrackerMock);
final EthMessages messages = new EthMessages();
return create(
@ -278,9 +318,9 @@ public class EthProtocolManagerTestUtil {
BlockchainSetupUtil.forTesting(DataStorageFormat.FOREST).getWorldArchive(),
mock(TransactionPool.class),
EthProtocolConfiguration.defaultConfig(),
peers,
ethPeers,
messages,
new EthContext(peers, messages, ethScheduler));
new EthContext(ethPeers, messages, ethScheduler));
}
public static EthProtocolManager create() {
@ -446,4 +486,19 @@ public class EthProtocolManagerTestUtil {
.estimatedHeight(blockchain.getChainHeadBlockNumber())
.build();
}
public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager,
final Difficulty td,
final int estimatedHeight,
final boolean isServingSnap,
final boolean addToEthPeers) {
return RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.totalDifficulty(td)
.estimatedHeight(estimatedHeight)
.isServingSnap(isServingSnap)
.addToEthPeers(addToEthPeers)
.build();
}
}

@ -121,7 +121,9 @@ public class RespondingEthPeer {
final Hash chainHeadHash,
final Difficulty totalDifficulty,
final OptionalLong estimatedHeight,
final List<PeerValidator> peerValidators) {
final List<PeerValidator> peerValidators,
final boolean isServingSnap,
final boolean addToEthPeers) {
final EthPeers ethPeers = ethProtocolManager.ethContext().getEthPeers();
final Set<Capability> caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63));
@ -130,10 +132,24 @@ public class RespondingEthPeer {
new MockPeerConnection(
caps, (cap, msg, conn) -> outgoingMessages.add(new OutgoingMessage(cap, msg)));
ethPeers.registerNewConnection(peerConnection, peerValidators);
final int before = ethPeers.peerCount();
final EthPeer peer = ethPeers.peer(peerConnection);
peer.registerStatusReceived(chainHeadHash, totalDifficulty, 63, peerConnection);
estimatedHeight.ifPresent(height -> peer.chainState().update(chainHeadHash, height));
peer.registerStatusSent(peerConnection);
if (addToEthPeers) {
peer.registerStatusSent(peerConnection);
ethPeers.addPeerToEthPeers(peer);
while (ethPeers.peerCount()
<= before) { // this is needed to make sure that the peer is added to the active
// connections
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
peer.setIsServingSnap(isServingSnap);
return new RespondingEthPeer(
ethProtocolManager, snapProtocolManager, peerConnection, peer, outgoingMessages);
@ -396,6 +412,8 @@ public class RespondingEthPeer {
private Difficulty totalDifficulty = Difficulty.of(1000L);
private OptionalLong estimatedHeight = OptionalLong.of(1000L);
private final List<PeerValidator> peerValidators = new ArrayList<>();
private boolean isServingSnap = false;
private boolean addToEthPeers = true;
public RespondingEthPeer build() {
checkNotNull(ethProtocolManager, "Must configure EthProtocolManager");
@ -406,7 +424,9 @@ public class RespondingEthPeer {
chainHeadHash,
totalDifficulty,
estimatedHeight,
peerValidators);
peerValidators,
isServingSnap,
addToEthPeers);
}
public Builder ethProtocolManager(final EthProtocolManager ethProtocolManager) {
@ -444,6 +464,11 @@ public class RespondingEthPeer {
return this;
}
public Builder isServingSnap(final boolean isServingSnap) {
this.isServingSnap = isServingSnap;
return this;
}
public Builder peerValidators(final List<PeerValidator> peerValidators) {
checkNotNull(peerValidators);
this.peerValidators.addAll(peerValidators);
@ -454,6 +479,11 @@ public class RespondingEthPeer {
peerValidators(Arrays.asList(peerValidators));
return this;
}
public Builder addToEthPeers(final boolean addToEthPeers) {
this.addToEthPeers = addToEthPeers;
return this;
}
}
static class OutgoingMessage {

@ -38,11 +38,13 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -121,7 +123,10 @@ public abstract class AbstractMessageTaskTest<T, R> {
Bytes.random(64),
MAX_PEERS,
MAX_PEERS,
false));
false,
SyncMode.FAST,
new ForkIdManager(
blockchain, Collections.emptyList(), Collections.emptyList(), false)));
final EthMessages ethMessages = new EthMessages();
final EthScheduler ethScheduler =

@ -55,6 +55,7 @@ import org.hyperledger.besu.ethereum.eth.messages.NewBlockMessage;
import org.hyperledger.besu.ethereum.eth.sync.BlockPropagationManager.ProcessingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
@ -629,7 +630,10 @@ public abstract class AbstractBlockPropagationManagerTest {
Bytes.random(64),
25,
25,
false),
false,
SyncMode.SNAP,
new ForkIdManager(
blockchain, Collections.emptyList(), Collections.emptyList(), false)),
new EthMessages(),
ethScheduler);
final BlockPropagationManager blockPropagationManager =
@ -767,7 +771,10 @@ public abstract class AbstractBlockPropagationManagerTest {
Bytes.random(64),
25,
25,
false),
false,
SyncMode.SNAP,
new ForkIdManager(
blockchain, Collections.emptyList(), Collections.emptyList(), false)),
new EthMessages(),
ethScheduler);
final BlockPropagationManager blockPropagationManager =

@ -98,7 +98,7 @@ public class ChainHeadTrackerTest {
blockchainSetupUtil.getBlockchain(),
blockchainSetupUtil.getWorldArchive(),
blockchainSetupUtil.getTransactionPool());
chainHeadTracker.onPeerConnected(respondingPeer.getEthPeer());
chainHeadTracker.getBestHeaderFromPeer(respondingPeer.getEthPeer());
Assertions.assertThat(chainHeadState().getEstimatedHeight()).isZero();
@ -118,7 +118,7 @@ public class ChainHeadTrackerTest {
blockchainSetupUtil.getBlockchain(),
blockchainSetupUtil.getWorldArchive(),
blockchainSetupUtil.getTransactionPool());
chainHeadTracker.onPeerConnected(respondingPeer.getEthPeer());
chainHeadTracker.getBestHeaderFromPeer(respondingPeer.getEthPeer());
// Change the hash of the current known head
respondingPeer.getEthPeer().chainState().statusReceived(Hash.EMPTY_TRIE_HASH, Difficulty.ONE);
@ -137,7 +137,7 @@ public class ChainHeadTrackerTest {
blockchainSetupUtil.getBlockchain(),
blockchainSetupUtil.getWorldArchive(),
blockchainSetupUtil.getTransactionPool());
chainHeadTracker.onPeerConnected(respondingPeer.getEthPeer());
chainHeadTracker.getBestHeaderFromPeer(respondingPeer.getEthPeer());
Assertions.assertThat(chainHeadState().getEstimatedHeight()).isZero();

@ -288,15 +288,15 @@ public class PivotBlockRetrieverTest {
final CompletableFuture<FastSyncState> future = pivotBlockRetriever.downloadPivotBlockHeader();
peerA.respond(responder);
peerB.respondTimes(emptyResponder, 3);
peerB.respondTimes(emptyResponder, 4);
// PeerA should have responded, while peerB is being retried, peerC shouldn't have been queried
// yet
assertThat(future).isNotCompleted();
assertThat(peerC.hasOutstandingRequests()).isFalse();
// After exhausting retries for peerB, we should try peerC
peerB.respondTimes(emptyResponder, 2);
// After exhausting retries (max retries is 5) for peerB, we should try peerC
peerB.respondTimes(emptyResponder, 1);
peerC.respond(responder);
assertThat(future)

@ -18,6 +18,7 @@ import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain;
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryWorldStateArchive;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -30,6 +31,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
import org.hyperledger.besu.ethereum.chain.GenesisState;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions;
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.core.MiningParameters;
@ -42,8 +44,11 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration;
@ -77,6 +82,7 @@ import java.util.concurrent.CompletableFuture;
import io.vertx.core.Vertx;
import org.apache.tuweni.bytes.Bytes;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -153,7 +159,12 @@ public class TestNode implements Closeable {
Bytes.random(64),
25,
25,
false);
false,
SyncMode.SNAP,
new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false));
final ChainHeadTracker mockCHT = getChainHeadTracker();
ethPeers.setChainHeadTracker(mockCHT);
final EthScheduler scheduler = new EthScheduler(1, 1, 1, metricsSystem);
final EthContext ethContext = new EthContext(ethPeers, ethMessages, scheduler);
@ -189,6 +200,7 @@ public class TestNode implements Closeable {
NetworkRunner.builder()
.subProtocols(EthProtocol.get())
.protocolManagers(singletonList(ethProtocolManager))
.ethPeersShouldConnect((p, d) -> true)
.network(
capabilities ->
DefaultP2PNetwork.builder()
@ -201,8 +213,8 @@ public class TestNode implements Closeable {
.blockchain(blockchain)
.blockNumberForks(Collections.emptyList())
.timestampForks(Collections.emptyList())
.allConnectionsSupplier(ethPeers::getAllConnections)
.allActiveConnectionsSupplier(ethPeers::getAllActiveConnections)
.allConnectionsSupplier(ethPeers::streamAllConnections)
.allActiveConnectionsSupplier(ethPeers::streamAllActiveConnections)
.build())
.metricsSystem(new NoOpMetricsSystem())
.build();
@ -217,6 +229,16 @@ public class TestNode implements Closeable {
selfPeer = DefaultPeer.fromEnodeURL(network.getLocalEnode().get());
}
private static ChainHeadTracker getChainHeadTracker() {
final ChainHeadTracker mockCHT = mock(ChainHeadTracker.class);
final BlockHeader mockBlockHeader = mock(BlockHeader.class);
Mockito.lenient().when(mockBlockHeader.getNumber()).thenReturn(0L);
Mockito.lenient()
.when(mockCHT.getBestHeaderFromPeer(any()))
.thenReturn(CompletableFuture.completedFuture(mockBlockHeader));
return mockCHT;
}
public Bytes id() {
return nodeKey.getPublicKey().getEncodedBytes();
}

@ -34,6 +34,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
import org.hyperledger.besu.ethereum.chain.BlockAddedObserver;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
@ -44,6 +45,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredPendingTransactions;
@ -102,6 +104,9 @@ public class TransactionPoolFactoryTest {
@BeforeEach
public void setup() {
when(blockchain.getBlockHashByNumber(anyLong())).thenReturn(Optional.of(mock(Hash.class)));
final Block mockBlock = mock(Block.class);
when(mockBlock.getHash()).thenReturn(Hash.ZERO);
when(blockchain.getGenesisBlock()).thenReturn(mockBlock);
when(context.getBlockchain()).thenReturn(blockchain);
final NodeMessagePermissioningProvider nmpp = (destinationEnode, code) -> true;
@ -116,7 +121,9 @@ public class TransactionPoolFactoryTest {
Bytes.random(64),
25,
25,
false);
false,
SyncMode.SNAP,
new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false));
when(ethContext.getEthMessages()).thenReturn(ethMessages);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.p2p.network;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
@ -31,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@ -47,15 +49,18 @@ public class NetworkRunner implements AutoCloseable {
private final Map<String, SubProtocol> subProtocols;
private final List<ProtocolManager> protocolManagers;
private final LabelledMetric<Counter> inboundMessageCounter;
private final BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect;
private NetworkRunner(
final P2PNetwork network,
final Map<String, SubProtocol> subProtocols,
final List<ProtocolManager> protocolManagers,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect) {
this.network = network;
this.protocolManagers = protocolManagers;
this.subProtocols = subProtocols;
this.ethPeersShouldConnect = ethPeersShouldConnect;
inboundMessageCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.NETWORK,
@ -158,8 +163,7 @@ public class NetworkRunner implements AutoCloseable {
protocolManager.handleNewConnection(connection);
});
network.subscribeConnectRequest(
(peer, incoming) -> protocolManager.shouldConnect(peer, incoming));
network.subscribeConnectRequest(ethPeersShouldConnect::apply);
network.subscribeDisconnect(
(connection, disconnectReason, initiatedByPeer) -> {
@ -186,6 +190,7 @@ public class NetworkRunner implements AutoCloseable {
List<ProtocolManager> protocolManagers = new ArrayList<>();
List<SubProtocol> subProtocols = new ArrayList<>();
MetricsSystem metricsSystem;
private BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect;
public NetworkRunner build() {
final Map<String, SubProtocol> subProtocolMap = new HashMap<>();
@ -203,7 +208,8 @@ public class NetworkRunner implements AutoCloseable {
}
}
final P2PNetwork network = networkProvider.build(caps);
return new NetworkRunner(network, subProtocolMap, protocolManagers, metricsSystem);
return new NetworkRunner(
network, subProtocolMap, protocolManagers, metricsSystem, ethPeersShouldConnect);
}
public Builder protocolManagers(final List<ProtocolManager> protocolManagers) {
@ -230,6 +236,11 @@ public class NetworkRunner implements AutoCloseable {
this.metricsSystem = metricsSystem;
return this;
}
public Builder ethPeersShouldConnect(final BiFunction<Peer, Boolean, Boolean> shouldConnect) {
this.ethPeersShouldConnect = shouldConnect;
return this;
}
}
@FunctionalInterface

@ -14,7 +14,6 @@
*/
package org.hyperledger.besu.ethereum.p2p.network;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
@ -59,15 +58,6 @@ public interface ProtocolManager extends AutoCloseable {
*/
void handleNewConnection(PeerConnection peerConnection);
/**
* Call this to find out whether we should try to connect to a certain peer
*
* @param peer the peer that we are trying to connect to
* @param incoming true if the connection is incoming
* @return true, if the ProtocolManager wants to connect to the peer, false otherwise
*/
boolean shouldConnect(Peer peer, final boolean incoming);
/**
* Handles peer disconnects.
*

@ -129,11 +129,11 @@ public final class DisconnectMessage extends AbstractMessageData {
USELESS_PEER_NO_SHARED_CAPABILITIES((byte) 0x03, "No shared capabilities"),
USELESS_PEER_WORLD_STATE_NOT_AVAILABLE((byte) 0x03, "World state not available"),
USELESS_PEER_MISMATCHED_PIVOT_BLOCK((byte) 0x03, "Mismatched pivot block"),
USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_STATE(
(byte) 0x03, "Failed to retrieve header for chain state"),
USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_HEAD((byte) 0x03, "Failed to retrieve chain head header"),
USELESS_PEER_CANNOT_CONFIRM_PIVOT_BLOCK((byte) 0x03, "Peer failed to confirm pivot block"),
USELESS_PEER_BY_REPUTATION((byte) 0x03, "Lowest reputation score"),
USELESS_PEER_BY_CHAIN_COMPARATOR((byte) 0x03, "Lowest by chain height comparator"),
USELESS_PEER_EXCEEDS_TRAILING_PEERS((byte) 0x03, "Adding peer would exceed max trailing peers"),
TOO_MANY_PEERS((byte) 0x04),
ALREADY_CONNECTED((byte) 0x05),
INCOMPATIBLE_P2P_PROTOCOL_VERSION((byte) 0x06),

@ -42,12 +42,14 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.ImmutableTransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.EpochCalculator;
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
@ -73,6 +75,7 @@ import org.hyperledger.besu.util.Subscribers;
import org.hyperledger.besu.util.number.Fraction;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
@ -239,7 +242,9 @@ public class RetestethContext {
localNodeKey,
MAX_PEERS,
MAX_PEERS,
false);
false,
SyncMode.FAST,
new ForkIdManager(blockchain, List.of(), List.of(), false));
final SyncState syncState = new SyncState(blockchain, ethPeers);
ethScheduler = new EthScheduler(1, 1, 1, 1, metricsSystem);

Loading…
Cancel
Save