Make fork id the default and try to recover the DiscoveryPeer for incoming connections from the PeerTable (#5628)

* make the request for the ENR the default and try to recover the DiscoveryPeer for incoming connections from the PeerTable

Signed-off-by: Stefan <stefan.pingel@consensys.net>
Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
pull/6445/head
Stefan Pingel 10 months ago committed by GitHub
parent cfea3ab2fd
commit 921bc175c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      CHANGELOG.md
  2. 6
      besu/src/main/java/org/hyperledger/besu/cli/options/unstable/NetworkingOptions.java
  3. 2
      besu/src/test/java/org/hyperledger/besu/cli/options/NetworkingOptionsTest.java
  4. 1
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  5. 124
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java
  6. 2
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/config/DiscoveryConfiguration.java
  7. 1
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/config/NetworkingConfiguration.java
  8. 8
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java
  9. 7
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java
  10. 55
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java
  11. 14
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java
  12. 12
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java
  13. 20
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java
  14. 85
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/AbstractHandshakeHandler.java
  15. 40
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramer.java
  16. 7
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/HandshakeHandlerInbound.java
  17. 7
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/HandshakeHandlerOutbound.java
  18. 12
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyConnectionInitializer.java
  19. 12
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyTLSConnectionInitializer.java
  20. 1
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java
  21. 3
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java
  22. 46
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java
  23. 2
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java
  24. 20
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java
  25. 6
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java
  26. 26
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetworkTest.java
  27. 2
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java
  28. 50
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramerTest.java
  29. 5
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyTLSConnectionInitializerTest.java

@ -19,7 +19,9 @@
- New `EXECUTION_HALTED` error returned if there is an error executing or simulating a transaction, with the reason for execution being halted. Replaces the generic `INTERNAL_ERROR` return code in certain cases which some applications may be checking for [#6343](https://github.com/hyperledger/besu/pull/6343)
- The Besu Docker images with `openjdk-latest` tags since 23.10.3 were incorrectly using UID 1001 instead of 1000 for the container's `besu` user. The user now uses 1000 again. Containers created from or migrated to images using UID 1001 will need to chown their persistent database files to UID 1000 [#6360](https://github.com/hyperledger/besu/pull/6360)
- The deprecated `--privacy-onchain-groups-enabled` option has now been removed. Use the `--privacy-flexible-groups-enabled` option instead. [#6411](https://github.com/hyperledger/besu/pull/6411)
- The time that can be spent selecting transactions during block creation is not capped at 5 seconds for PoS and PoW networks, and for PoA networks, at 75% of the block period specified in the genesis, this to prevent possible DoS in case a single transaction is taking too long to execute, and to have a stable block production rate, but it could be a breaking change if an existing network used to have transactions that takes more time to executed that the newly introduced limit, if it is mandatory for these network to keep processing these long processing transaction, then the default value of `block-txs-selection-max-time` or `poa-block-txs-selection-max-time` needs to be tuned accordingly.
- Requesting the Ethereum Node Record (ENR) to acquire the fork id from bonded peers is now enabled by default, so the following change has been made [#5628](https://github.com/hyperledger/besu/pull/5628):
- `--Xfilter-on-enr-fork-id` has been removed. To disable the feature use `--filter-on-enr-fork-id=false`.
- The time that can be spent selecting transactions during block creation is not capped at 5 seconds for PoS and PoW networks, and for PoA networks, at 75% of the block period specified in the genesis, this to prevent possible DoS in case a single transaction is taking too long to execute, and to have a stable block production rate, but it could be a breaking change if an existing network used to have transactions that takes more time to executed that the newly introduced limit, if it is mandatory for these network to keep processing these long processing transaction, then the default value of `block-txs-selection-max-time` or `poa-block-txs-selection-max-time` needs to be tuned accordingly.
### Deprecations

@ -37,7 +37,7 @@ public class NetworkingOptions implements CLIOptions<NetworkingConfiguration> {
private final String DNS_DISCOVERY_SERVER_OVERRIDE_FLAG = "--Xp2p-dns-discovery-server";
private final String DISCOVERY_PROTOCOL_V5_ENABLED = "--Xv5-discovery-enabled";
/** The constant FILTER_ON_ENR_FORK_ID. */
public static final String FILTER_ON_ENR_FORK_ID = "--Xfilter-on-enr-fork-id";
public static final String FILTER_ON_ENR_FORK_ID = "--filter-on-enr-fork-id";
@CommandLine.Option(
names = INITIATE_CONNECTIONS_FREQUENCY_FLAG,
@ -76,9 +76,9 @@ public class NetworkingOptions implements CLIOptions<NetworkingConfiguration> {
@CommandLine.Option(
names = FILTER_ON_ENR_FORK_ID,
hidden = true,
defaultValue = "false",
defaultValue = "true",
description = "Whether to enable filtering of peers based on the ENR field ForkId)")
private final Boolean filterOnEnrForkId = false;
private final Boolean filterOnEnrForkId = NetworkingConfiguration.DEFAULT_FILTER_ON_ENR_FORK_ID;
@CommandLine.Option(
hidden = true,

@ -134,7 +134,7 @@ public class NetworkingOptionsTest
final NetworkingOptions options = cmd.getNetworkingOptions();
final NetworkingConfiguration networkingConfig = options.toDomainObject();
assertThat(networkingConfig.getDiscovery().isFilterOnEnrForkIdEnabled()).isEqualTo(false);
assertThat(networkingConfig.getDiscovery().isFilterOnEnrForkIdEnabled()).isEqualTo(true);
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
assertThat(commandOutput.toString(UTF_8)).isEmpty();

@ -139,6 +139,7 @@ public class EthPeers {
"peer_limit",
"The maximum number of peers this node allows to connect",
() -> peerUpperBound);
connectedPeersCounter =
metricsSystem.createCounter(
BesuMetricCategory.PEERS, "connected_total", "Total number of peers connected");

@ -110,7 +110,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
this.blockBroadcaster = new BlockBroadcaster(ethContext);
supportedCapabilities =
this.supportedCapabilities =
calculateCapabilities(synchronizerConfiguration, ethereumWireProtocolConfiguration);
// Run validators
@ -252,11 +252,14 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
@Override
public void stop() {
if (stopped.compareAndSet(false, true)) {
LOG.info("Stopping {} Subprotocol.", getSupportedProtocol());
LOG.atInfo().setMessage("Stopping {} Subprotocol.").addArgument(getSupportedProtocol()).log();
scheduler.stop();
shutdown.countDown();
} else {
LOG.error("Attempted to stop already stopped {} Subprotocol.", getSupportedProtocol());
LOG.atInfo()
.setMessage("Attempted to stop already stopped {} Subprotocol.")
.addArgument(this::getSupportedProtocol)
.log();
}
}
@ -264,7 +267,10 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
public void awaitStop() throws InterruptedException {
shutdown.await();
scheduler.awaitStop();
LOG.info("{} Subprotocol stopped.", getSupportedProtocol());
LOG.atInfo()
.setMessage("{} Subprotocol stopped.")
.addArgument(this::getSupportedProtocol)
.log();
}
@Override
@ -277,8 +283,10 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
EthProtocolLogger.logProcessMessage(cap, code);
final EthPeer ethPeer = ethPeers.peer(message.getConnection());
if (ethPeer == null) {
LOG.debug(
"Ignoring message received from unknown peer connection: {}", message.getConnection());
LOG.atDebug()
.setMessage("Ignoring message received from unknown peer connection: {}")
.addArgument(message::getConnection)
.log();
return;
}
@ -288,19 +296,24 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
return;
} else if (!ethPeer.statusHasBeenReceived()) {
// Peers are required to send status messages before any other message type
LOG.debug(
"{} requires a Status ({}) message to be sent first. Instead, received message {} (BREACH_OF_PROTOCOL). Disconnecting from {}.",
this.getClass().getSimpleName(),
EthPV62.STATUS,
code,
ethPeer);
LOG.atDebug()
.setMessage(
"{} requires a Status ({}) message to be sent first. Instead, received message {} (BREACH_OF_PROTOCOL). Disconnecting from {}.")
.addArgument(() -> this.getClass().getSimpleName())
.addArgument(EthPV62.STATUS)
.addArgument(code)
.addArgument(ethPeer::toString)
.log();
ethPeer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
return;
}
if (this.mergePeerFilter.isPresent()) {
if (this.mergePeerFilter.get().disconnectIfGossipingBlocks(message, ethPeer)) {
LOG.debug("Post-merge disconnect: peer still gossiping blocks {}", ethPeer);
LOG.atDebug()
.setMessage("Post-merge disconnect: peer still gossiping blocks {}")
.addArgument(ethPeer::toString)
.log();
handleDisconnect(ethPeer.getConnection(), DisconnectReason.SUBPROTOCOL_TRIGGERED, false);
return;
}
@ -333,11 +346,12 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
maybeResponseData = ethMessages.dispatch(ethMessage);
}
} catch (final RLPException e) {
LOG.debug(
"Received malformed message {} (BREACH_OF_PROTOCOL), disconnecting: {}",
messageData.getData(),
ethPeer,
e);
LOG.atDebug()
.setMessage("Received malformed message {} (BREACH_OF_PROTOCOL), disconnecting: {}, {}")
.addArgument(messageData::getData)
.addArgument(ethPeer::toString)
.addArgument(e::toString)
.log();
ethPeer.disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);
}
@ -368,23 +382,31 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
genesisHash,
latestForkId);
try {
LOG.trace("Sending status message to {} for connection {}.", peer.getId(), connection);
LOG.atTrace()
.setMessage("Sending status message to {} for connection {}.")
.addArgument(peer::getId)
.addArgument(connection::toString)
.log();
peer.send(status, getSupportedProtocol(), connection);
peer.registerStatusSent(connection);
} catch (final PeerNotConnected peerNotConnected) {
// Nothing to do.
}
LOG.trace("{}", ethPeers);
LOG.atTrace().setMessage("{}").addArgument(ethPeers::toString).log();
}
@Override
public boolean shouldConnect(final Peer peer, final boolean incoming) {
if (peer.getForkId().map(forkId -> forkIdManager.peerCheck(forkId)).orElse(true)) {
LOG.trace("ForkId OK or not available");
if (peer.getForkId().map(forkIdManager::peerCheck).orElse(true)) {
LOG.atDebug()
.setMessage("ForkId OK or not available for peer {}")
.addArgument(peer::getId)
.log();
if (ethPeers.shouldConnect(peer, incoming)) {
return true;
}
}
LOG.atDebug().setMessage("ForkId check failed for peer {}").addArgument(peer::getId).log();
return false;
}
@ -397,11 +419,11 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
LOG.atDebug()
.setMessage("Disconnect - {} - {} - {}... - {} peers left")
.addArgument(initiatedByPeer ? "Inbound" : "Outbound")
.addArgument(reason)
.addArgument(connection.getPeer().getId().slice(0, 8))
.addArgument(ethPeers.peerCount())
.addArgument(reason::toString)
.addArgument(() -> connection.getPeer().getId().slice(0, 8))
.addArgument(ethPeers::peerCount)
.log();
LOG.trace("{}", ethPeers);
LOG.atTrace().setMessage("{}").addArgument(ethPeers::toString).log();
}
}
@ -412,43 +434,41 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
try {
if (!status.networkId().equals(networkId)) {
LOG.atDebug()
.setMessage("Mismatched network id: {}, EthPeer {}...")
.addArgument(status.networkId())
.addArgument(peer.getShortNodeId())
.log();
LOG.atTrace()
.setMessage("Mismatched network id: {}, EthPeer {}")
.addArgument(status.networkId())
.addArgument(peer)
.setMessage("Mismatched network id: {}, peer {}")
.addArgument(status::networkId)
.addArgument(() -> getPeerOrPeerId(peer))
.log();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
} else if (!forkIdManager.peerCheck(forkId) && status.protocolVersion() > 63) {
LOG.debug(
"{} has matching network id ({}), but non-matching fork id: {}",
peer,
networkId,
forkId);
LOG.atDebug()
.setMessage("{} has matching network id ({}), but non-matching fork id: {}")
.addArgument(() -> getPeerOrPeerId(peer))
.addArgument(networkId::toString)
.addArgument(forkId)
.log();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
} else if (forkIdManager.peerCheck(status.genesisHash())) {
LOG.debug(
"{} has matching network id ({}), but non-matching genesis hash: {}",
peer,
networkId,
status.genesisHash());
LOG.atDebug()
.setMessage("{} has matching network id ({}), but non-matching genesis hash: {}")
.addArgument(() -> getPeerOrPeerId(peer))
.addArgument(networkId::toString)
.addArgument(status::genesisHash)
.log();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
} else if (mergePeerFilter.isPresent()
&& mergePeerFilter.get().disconnectIfPoW(status, peer)) {
LOG.atDebug()
.setMessage("Post-merge disconnect: peer still PoW {}")
.addArgument(peer.getShortNodeId())
.addArgument(() -> getPeerOrPeerId(peer))
.log();
handleDisconnect(peer.getConnection(), DisconnectReason.SUBPROTOCOL_TRIGGERED, false);
} else {
LOG.debug(
"Received status message from {}: {} with connection {}",
peer,
status,
message.getConnection());
LOG.atDebug()
.setMessage("Received status message from {}: {} with connection {}")
.addArgument(peer::toString)
.addArgument(status::toString)
.addArgument(message::getConnection)
.log();
peer.registerStatusReceived(
status.bestHash(),
status.totalDifficulty(),
@ -467,6 +487,10 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
}
}
private Object getPeerOrPeerId(final EthPeer peer) {
return LOG.isTraceEnabled() ? peer : peer.getShortNodeId();
}
@Override
public void blockMined(final Block block) {
// This assumes the block has already been included in the chain

@ -32,7 +32,7 @@ public class DiscoveryConfiguration {
private List<EnodeURL> bootnodes = new ArrayList<>();
private String dnsDiscoveryURL;
private boolean discoveryV5Enabled = false;
private boolean filterOnEnrForkId = false;
private boolean filterOnEnrForkId = NetworkingConfiguration.DEFAULT_FILTER_ON_ENR_FORK_ID;
public static DiscoveryConfiguration create() {
return new DiscoveryConfiguration();

@ -23,6 +23,7 @@ public class NetworkingConfiguration {
public static final int DEFAULT_INITIATE_CONNECTIONS_FREQUENCY_SEC = 30;
public static final int DEFAULT_CHECK_MAINTAINED_CONNECTIONS_FREQUENCY_SEC = 60;
public static final int DEFAULT_PEER_LOWER_BOUND = 25;
public static final boolean DEFAULT_FILTER_ON_ENR_FORK_ID = true;
private DiscoveryConfiguration discovery = new DiscoveryConfiguration();
private RlpxConfiguration rlpx = new RlpxConfiguration();

@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerRequirement;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PingPacketData;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.TimerUtil;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURLImpl;
@ -81,6 +82,7 @@ public abstract class PeerDiscoveryAgent {
private final MetricsSystem metricsSystem;
private final RlpxAgent rlpxAgent;
private final ForkIdManager forkIdManager;
private final PeerTable peerTable;
/* The peer controller, which takes care of the state machine of peers. */
protected Optional<PeerDiscoveryController> controller = Optional.empty();
@ -109,7 +111,8 @@ public abstract class PeerDiscoveryAgent {
final MetricsSystem metricsSystem,
final StorageProvider storageProvider,
final ForkIdManager forkIdManager,
final RlpxAgent rlpxAgent) {
final RlpxAgent rlpxAgent,
final PeerTable peerTable) {
this.metricsSystem = metricsSystem;
checkArgument(nodeKey != null, "nodeKey cannot be null");
checkArgument(config != null, "provided configuration cannot be null");
@ -130,6 +133,7 @@ public abstract class PeerDiscoveryAgent {
this.forkIdManager = forkIdManager;
this.forkIdSupplier = () -> forkIdManager.getForkIdForChainHead().getForkIdAsBytesList();
this.rlpxAgent = rlpxAgent;
this.peerTable = peerTable;
}
protected abstract TimerUtil createTimer();
@ -263,9 +267,9 @@ public abstract class PeerDiscoveryAgent {
.peerRequirement(PeerRequirement.combine(peerRequirements))
.peerPermissions(peerPermissions)
.metricsSystem(metricsSystem)
.forkIdManager(forkIdManager)
.filterOnEnrForkId((config.isFilterOnEnrForkIdEnabled()))
.rlpxAgent(rlpxAgent)
.peerTable(peerTable)
.build();
}

@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.TimerUtil;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.VertxTimerUtil;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
@ -73,7 +74,8 @@ public class VertxPeerDiscoveryAgent extends PeerDiscoveryAgent {
final MetricsSystem metricsSystem,
final StorageProvider storageProvider,
final ForkIdManager forkIdManager,
final RlpxAgent rlpxAgent) {
final RlpxAgent rlpxAgent,
final PeerTable peerTable) {
super(
nodeKey,
config,
@ -82,7 +84,8 @@ public class VertxPeerDiscoveryAgent extends PeerDiscoveryAgent {
metricsSystem,
storageProvider,
forkIdManager,
rlpxAgent);
rlpxAgent,
peerTable);
checkArgument(vertx != null, "vertx instance cannot be null");
this.vertx = vertx;

@ -21,8 +21,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
@ -129,7 +127,6 @@ public class PeerDiscoveryController {
private final DiscoveryProtocolLogger discoveryProtocolLogger;
private final LabelledMetric<Counter> interactionCounter;
private final LabelledMetric<Counter> interactionRetryCounter;
private final ForkIdManager forkIdManager;
private final boolean filterOnEnrForkId;
private final RlpxAgent rlpxAgent;
@ -161,7 +158,6 @@ public class PeerDiscoveryController {
final PeerPermissions peerPermissions,
final MetricsSystem metricsSystem,
final Optional<Cache<Bytes, Packet>> maybeCacheForEnrRequests,
final ForkIdManager forkIdManager,
final boolean filterOnEnrForkId,
final RlpxAgent rlpxAgent) {
this.timerUtil = timerUtil;
@ -197,11 +193,11 @@ public class PeerDiscoveryController {
"discovery_interaction_retry_count",
"Total number of interaction retries performed",
"type");
this.cachedEnrRequests =
maybeCacheForEnrRequests.orElse(
CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, SECONDS).build());
this.forkIdManager = forkIdManager;
this.filterOnEnrForkId = filterOnEnrForkId;
}
@ -314,6 +310,7 @@ public class PeerDiscoveryController {
}
final DiscoveryPeer peer = resolvePeer(sender);
final Bytes peerId = peer.getId();
switch (packet.getType()) {
case PING:
if (peerPermissions.allowInboundBonding(peer)) {
@ -333,10 +330,10 @@ public class PeerDiscoveryController {
if (filterOnEnrForkId) {
requestENR(peer);
}
bondingPeers.invalidate(peer.getId());
bondingPeers.invalidate(peerId);
addToPeerTable(peer);
recursivePeerRefreshState.onBondingComplete(peer);
Optional.ofNullable(cachedEnrRequests.getIfPresent(peer.getId()))
Optional.ofNullable(cachedEnrRequests.getIfPresent(peerId))
.ifPresent(cachedEnrRequest -> processEnrRequest(peer, cachedEnrRequest));
});
break;
@ -360,12 +357,12 @@ public class PeerDiscoveryController {
if (PeerDiscoveryStatus.BONDED.equals(peer.getStatus())) {
processEnrRequest(peer, packet);
} else if (PeerDiscoveryStatus.BONDING.equals(peer.getStatus())) {
LOG.trace("ENR_REQUEST cached for bonding peer Id: {}", peer.getId());
LOG.trace("ENR_REQUEST cached for bonding peer Id: {}", peerId);
// Due to UDP, it may happen that we receive the ENR_REQUEST just before the PONG.
// Because peers want to send the ENR_REQUEST directly after the pong.
// If this happens we don't want to ignore the request but process when bonded.
// this cache allows to keep the request and to respond after having processed the PONG
cachedEnrRequests.put(peer.getId(), packet);
cachedEnrRequests.put(peerId, packet);
}
break;
case ENR_RESPONSE:
@ -376,26 +373,6 @@ public class PeerDiscoveryController {
packet.getPacketData(ENRResponsePacketData.class);
final NodeRecord enr = packetData.get().getEnr();
peer.setNodeRecord(enr);
final Optional<ForkId> maybeForkId = peer.getForkId();
if (maybeForkId.isPresent()) {
if (forkIdManager.peerCheck(maybeForkId.get())) {
connectOnRlpxLayer(peer);
LOG.debug(
"Peer {} PASSED fork id check. ForkId received: {}",
sender.getId(),
maybeForkId.get());
} else {
LOG.debug(
"Peer {} FAILED fork id check. ForkId received: {}",
sender.getId(),
maybeForkId.get());
}
} else {
// if the peer hasn't sent the ForkId try to connect to it anyways
connectOnRlpxLayer(peer);
LOG.debug("No fork id sent by peer: {}", peer.getId());
}
});
break;
}
@ -431,9 +408,7 @@ public class PeerDiscoveryController {
if (peer.getStatus() != PeerDiscoveryStatus.BONDED) {
peer.setStatus(PeerDiscoveryStatus.BONDED);
if (!filterOnEnrForkId) {
connectOnRlpxLayer(peer);
}
connectOnRlpxLayer(peer);
}
final PeerTable.AddResult result = peerTable.tryAdd(peer);
@ -560,8 +535,6 @@ public class PeerDiscoveryController {
*/
@VisibleForTesting
void requestENR(final DiscoveryPeer peer) {
peer.setStatus(PeerDiscoveryStatus.ENR_REQUESTED);
final Consumer<PeerInteractionState> action =
interaction -> {
final ENRRequestPacketData data = ENRRequestPacketData.create();
@ -838,7 +811,6 @@ public class PeerDiscoveryController {
private Cache<Bytes, Packet> cachedEnrRequests =
CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, SECONDS).build();
private ForkIdManager forkIdManager;
private RlpxAgent rlpxAgent;
private Builder() {}
@ -846,10 +818,6 @@ public class PeerDiscoveryController {
public PeerDiscoveryController build() {
validate();
if (peerTable == null) {
peerTable = new PeerTable(this.nodeKey.getPublicKey().getEncodedBytes(), 16);
}
return new PeerDiscoveryController(
nodeKey,
localPeer,
@ -864,7 +832,6 @@ public class PeerDiscoveryController {
peerPermissions,
metricsSystem,
Optional.of(cachedEnrRequests),
forkIdManager,
filterOnEnrForkId,
rlpxAgent);
}
@ -875,8 +842,8 @@ public class PeerDiscoveryController {
validateRequiredDependency(timerUtil, "TimerUtil");
validateRequiredDependency(workerExecutor, "AsyncExecutor");
validateRequiredDependency(metricsSystem, "MetricsSystem");
validateRequiredDependency(forkIdManager, "ForkIdManager");
validateRequiredDependency(rlpxAgent, "RlpxAgent");
validateRequiredDependency(peerTable, "PeerTable");
}
private void validateRequiredDependency(final Object object, final String name) {
@ -970,11 +937,5 @@ public class PeerDiscoveryController {
this.rlpxAgent = rlpxAgent;
return this;
}
public Builder forkIdManager(final ForkIdManager forkIdManager) {
checkNotNull(forkIdManager);
this.forkIdManager = forkIdManager;
return this;
}
}
}

@ -56,26 +56,21 @@ public class PeerTable {
* Builds a new peer table, where distance is calculated using the provided nodeId as a baseline.
*
* @param nodeId The ID of the node where this peer table is stored.
* @param bucketSize The maximum length of each k-bucket.
*/
public PeerTable(final Bytes nodeId, final int bucketSize) {
public PeerTable(final Bytes nodeId) {
this.keccak256 = Hash.keccak256(nodeId);
this.table =
Stream.generate(() -> new Bucket(DEFAULT_BUCKET_SIZE))
.limit(N_BUCKETS + 1)
.toArray(Bucket[]::new);
this.distanceCache = new ConcurrentHashMap<>();
this.maxEntriesCnt = N_BUCKETS * bucketSize;
this.maxEntriesCnt = N_BUCKETS * DEFAULT_BUCKET_SIZE;
// A bloom filter with 4096 expected insertions of 64-byte keys with a 0.1% false positive
// probability yields a memory footprint of ~7.5kb.
buildBloomFilter();
}
public PeerTable(final Bytes nodeId) {
this(nodeId, DEFAULT_BUCKET_SIZE);
}
/**
* Returns the table's representation of a peer, if it exists.
*
@ -83,11 +78,12 @@ public class PeerTable {
* @return The stored representation.
*/
public Optional<DiscoveryPeer> get(final PeerId peer) {
if (!idBloom.mightContain(peer.getId())) {
final Bytes peerId = peer.getId();
if (!idBloom.mightContain(peerId)) {
return Optional.empty();
}
final int distance = distanceFrom(peer);
return table[distance].getAndTouch(peer.getId());
return table[distance].getAndTouch(peerId);
}
/**

@ -27,6 +27,7 @@ import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryAgent;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus;
import org.hyperledger.besu.ethereum.p2p.discovery.VertxPeerDiscoveryAgent;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeerPrivileges;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURLImpl;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
@ -383,11 +384,12 @@ public class DefaultP2PNetwork implements P2PNetwork {
@VisibleForTesting
void attemptPeerConnections() {
LOG.trace("Initiating connections to discovered peers.");
rlpxAgent.connect(
final Stream<DiscoveryPeer> toTry =
streamDiscoveredPeers()
.filter(peer -> peer.getStatus() == PeerDiscoveryStatus.BONDED)
.filter(peerDiscoveryAgent::checkForkId)
.sorted(Comparator.comparing(DiscoveryPeer::getLastAttemptedConnection)));
.sorted(Comparator.comparing(DiscoveryPeer::getLastAttemptedConnection));
toTry.forEach(rlpxAgent::connect);
}
@Override
@ -511,6 +513,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
private Supplier<Stream<PeerConnection>> allConnectionsSupplier;
private Supplier<Stream<PeerConnection>> allActiveConnectionsSupplier;
private int peersLowerBound;
private PeerTable peerTable;
public P2PNetwork build() {
validate();
@ -528,6 +531,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
final MutableLocalNode localNode =
MutableLocalNode.create(config.getRlpx().getClientId(), 5, supportedCapabilities);
final PeerPrivileges peerPrivileges = new DefaultPeerPrivileges(maintainedPeers);
peerTable = new PeerTable(nodeKey.getPublicKey().getEncodedBytes());
rlpxAgent = rlpxAgent == null ? createRlpxAgent(localNode, peerPrivileges) : rlpxAgent;
peerDiscoveryAgent = peerDiscoveryAgent == null ? createDiscoveryAgent() : peerDiscoveryAgent;
@ -572,7 +576,8 @@ public class DefaultP2PNetwork implements P2PNetwork {
metricsSystem,
storageProvider,
forkIdManager,
rlpxAgent);
rlpxAgent,
peerTable);
}
private RlpxAgent createRlpxAgent(
@ -589,6 +594,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
.allConnectionsSupplier(allConnectionsSupplier)
.allActiveConnectionsSupplier(allActiveConnectionsSupplier)
.peersLowerBound(peersLowerBound)
.peerTable(peerTable)
.build();
}

@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerPrivileges;
@ -162,13 +163,6 @@ public class RlpxAgent {
}
}
public void connect(final Stream<? extends Peer> peerStream) {
if (!localNode.isReady()) {
return;
}
peerStream.forEach(this::connect);
}
public void disconnect(final Bytes peerId, final DisconnectReason reason) {
try {
allActiveConnectionsSupplier
@ -206,6 +200,7 @@ public class RlpxAgent {
+ this.getClass().getSimpleName()
+ " has finished starting"));
}
// Check peer is valid
final EnodeURL enode = peer.getEnodeURL();
if (!enode.isListening()) {
@ -380,6 +375,7 @@ public class RlpxAgent {
private Supplier<Stream<PeerConnection>> allConnectionsSupplier;
private Supplier<Stream<PeerConnection>> allActiveConnectionsSupplier;
private int peersLowerBound;
private PeerTable peerTable;
private Builder() {}
@ -399,12 +395,13 @@ public class RlpxAgent {
localNode,
connectionEvents,
metricsSystem,
p2pTLSConfiguration.get());
p2pTLSConfiguration.get(),
peerTable);
} else {
LOG.debug("Using default NettyConnectionInitializer");
connectionInitializer =
new NettyConnectionInitializer(
nodeKey, config, localNode, connectionEvents, metricsSystem);
nodeKey, config, localNode, connectionEvents, metricsSystem, peerTable);
}
}
@ -499,5 +496,10 @@ public class RlpxAgent {
this.peersLowerBound = peersLowerBound;
return this;
}
public Builder peerTable(final PeerTable peerTable) {
this.peerTable = peerTable;
return this;
}
}
}

@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
@ -60,6 +61,7 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte
private final FramerProvider framerProvider;
private final boolean inboundInitiated;
private final PeerTable peerTable;
AbstractHandshakeHandler(
final List<SubProtocol> subProtocols,
@ -70,7 +72,8 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte
final MetricsSystem metricsSystem,
final HandshakerProvider handshakerProvider,
final FramerProvider framerProvider,
final boolean inboundInitiated) {
final boolean inboundInitiated,
final PeerTable peerTable) {
this.subProtocols = subProtocols;
this.localNode = localNode;
this.expectedPeer = expectedPeer;
@ -80,6 +83,7 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte
this.handshaker = handshakerProvider.buildInstance();
this.framerProvider = framerProvider;
this.inboundInitiated = inboundInitiated;
this.peerTable = peerTable;
}
/**
@ -97,47 +101,48 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte
ctx.writeAndFlush(nextMsg.get());
} else if (handshaker.getStatus() != Handshaker.HandshakeStatus.SUCCESS) {
LOG.debug("waiting for more bytes");
return;
}
} else {
final Bytes nodeId = handshaker.partyPubKey().getEncodedBytes();
if (!localNode.isReady()) {
// If we're handling a connection before the node is fully up, just disconnect
LOG.debug("Rejecting connection because local node is not ready {}", nodeId);
disconnect(ctx, DisconnectMessage.DisconnectReason.UNKNOWN);
return;
}
final Bytes nodeId = handshaker.partyPubKey().getEncodedBytes();
if (!localNode.isReady()) {
// If we're handling a connection before the node is fully up, just disconnect
LOG.debug("Rejecting connection because local node is not ready {}", nodeId);
disconnect(ctx, DisconnectMessage.DisconnectReason.UNKNOWN);
return;
LOG.trace("Sending framed hello");
// Exchange keys done
final Framer framer = this.framerProvider.buildFramer(handshaker.secrets());
final ByteToMessageDecoder deFramer =
new DeFramer(
framer,
subProtocols,
localNode,
expectedPeer,
connectionEventDispatcher,
connectionFuture,
metricsSystem,
inboundInitiated,
peerTable);
ctx.channel()
.pipeline()
.replace(this, "DeFramer", deFramer)
.addBefore("DeFramer", "validate", new ValidateFirstOutboundMessage(framer));
ctx.writeAndFlush(new OutboundMessage(null, HelloMessage.create(localNode.getPeerInfo())))
.addListener(
ff -> {
if (ff.isSuccess()) {
LOG.trace("Successfully wrote hello message");
}
});
msg.retain();
ctx.fireChannelRead(msg);
}
LOG.trace("Sending framed hello");
// Exchange keys done
final Framer framer = this.framerProvider.buildFramer(handshaker.secrets());
final ByteToMessageDecoder deFramer =
new DeFramer(
framer,
subProtocols,
localNode,
expectedPeer,
connectionEventDispatcher,
connectionFuture,
metricsSystem,
inboundInitiated);
ctx.channel()
.pipeline()
.replace(this, "DeFramer", deFramer)
.addBefore("DeFramer", "validate", new ValidateFirstOutboundMessage(framer));
ctx.writeAndFlush(new OutboundMessage(null, HelloMessage.create(localNode.getPeerInfo())))
.addListener(
ff -> {
if (ff.isSuccess()) {
LOG.trace("Successfully wrote hello message");
}
});
msg.retain();
ctx.fireChannelRead(msg);
}
private void disconnect(

@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.BreachOfProtocolException;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.IncompatiblePeerException;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.PeerChannelClosedException;
@ -70,6 +72,7 @@ final class DeFramer extends ByteToMessageDecoder {
private final Optional<Peer> expectedPeer;
private final List<SubProtocol> subProtocols;
private final boolean inboundInitiated;
private final PeerTable peerTable;
private boolean hellosExchanged;
private final LabelledMetric<Counter> outboundMessagesCounter;
@ -81,7 +84,8 @@ final class DeFramer extends ByteToMessageDecoder {
final PeerConnectionEventDispatcher connectionEventDispatcher,
final CompletableFuture<PeerConnection> connectFuture,
final MetricsSystem metricsSystem,
final boolean inboundInitiated) {
final boolean inboundInitiated,
final PeerTable peerTable) {
this.framer = framer;
this.subProtocols = subProtocols;
this.localNode = localNode;
@ -89,6 +93,7 @@ final class DeFramer extends ByteToMessageDecoder {
this.connectFuture = connectFuture;
this.connectionEventDispatcher = connectionEventDispatcher;
this.inboundInitiated = inboundInitiated;
this.peerTable = peerTable;
this.outboundMessagesCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.NETWORK,
@ -105,8 +110,11 @@ final class DeFramer extends ByteToMessageDecoder {
while ((message = framer.deframe(in)) != null) {
if (hellosExchanged) {
out.add(message);
} else if (message.getCode() == WireMessageCodes.HELLO) {
hellosExchanged = true;
// Decode first hello and use the payload to modify pipeline
final PeerInfo peerInfo;
@ -129,13 +137,27 @@ final class DeFramer extends ByteToMessageDecoder {
subProtocols,
localNode.getPeerInfo().getCapabilities(),
peerInfo.getCapabilities());
final Optional<Peer> peer = expectedPeer.or(() -> createPeer(peerInfo, ctx));
if (peer.isEmpty()) {
LOG.debug("Failed to create connection for peer {}", peerInfo);
connectFuture.completeExceptionally(new PeerChannelClosedException(peerInfo));
ctx.close();
return;
Optional<Peer> peer;
if (expectedPeer.isPresent()) {
peer = expectedPeer;
} else {
// This is an inbound "Hello" message. Create peer from information from the Hello message
peer = createPeer(peerInfo, ctx);
if (peer.isEmpty()) {
LOG.debug("Failed to create connection for peer {}", peerInfo);
connectFuture.completeExceptionally(new PeerChannelClosedException(peerInfo));
ctx.close();
return;
}
// If we can find the DiscoveryPeer for the peer in the PeerTable we use it, because
// it could contains additional information, like the fork id.
final Optional<DiscoveryPeer> discoveryPeer = peerTable.get(peer.get());
if (discoveryPeer.isPresent()) {
peer = Optional.of(discoveryPeer.get());
}
}
final PeerConnection connection =
new NettyPeerConnection(
ctx,
@ -176,7 +198,9 @@ final class DeFramer extends ByteToMessageDecoder {
capabilityMultiplexer, connection, connectionEventDispatcher, waitingForPong),
new MessageFramer(capabilityMultiplexer, framer));
connectFuture.complete(connection);
} else if (message.getCode() == WireMessageCodes.DISCONNECT) {
final DisconnectMessage disconnectMessage = DisconnectMessage.readFrom(message);
LOG.debug(
"Peer {} disconnected before sending HELLO. Reason: {}",
@ -185,8 +209,10 @@ final class DeFramer extends ByteToMessageDecoder {
ctx.close();
connectFuture.completeExceptionally(
new PeerDisconnectedException(disconnectMessage.getReason()));
} else {
// Unexpected message - disconnect
LOG.debug(
"Message received before HELLO's exchanged (BREACH_OF_PROTOCOL), disconnecting. Peer: {}, Code: {}, Data: {}",
expectedPeer.map(Peer::getEnodeURLString).orElse("unknown"),

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEventDispatcher;
@ -40,7 +41,8 @@ final class HandshakeHandlerInbound extends AbstractHandshakeHandler {
final PeerConnectionEventDispatcher connectionEventDispatcher,
final MetricsSystem metricsSystem,
final HandshakerProvider handshakerProvider,
final FramerProvider framerProvider) {
final FramerProvider framerProvider,
final PeerTable peerTable) {
super(
subProtocols,
localNode,
@ -50,7 +52,8 @@ final class HandshakeHandlerInbound extends AbstractHandshakeHandler {
metricsSystem,
handshakerProvider,
framerProvider,
true);
true,
peerTable);
handshaker.prepareResponder(nodeKey);
}

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty;
import org.hyperledger.besu.crypto.SignatureAlgorithmFactory;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
@ -50,7 +51,8 @@ final class HandshakeHandlerOutbound extends AbstractHandshakeHandler {
final PeerConnectionEventDispatcher connectionEventDispatcher,
final MetricsSystem metricsSystem,
final HandshakerProvider handshakerProvider,
final FramerProvider framerProvider) {
final FramerProvider framerProvider,
final PeerTable peerTable) {
super(
subProtocols,
localNode,
@ -60,7 +62,8 @@ final class HandshakeHandlerOutbound extends AbstractHandshakeHandler {
metricsSystem,
handshakerProvider,
framerProvider,
false);
false,
peerTable);
handshaker.prepareInitiator(
nodeKey, SignatureAlgorithmFactory.getInstance().createPublicKey(peer.getId()));
this.first = handshaker.firstMessage();

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;
@ -68,6 +69,7 @@ public class NettyConnectionInitializer
private final PeerConnectionEventDispatcher eventDispatcher;
private final MetricsSystem metricsSystem;
private final Subscribers<ConnectCallback> connectSubscribers = Subscribers.create();
private final PeerTable peerTable;
private ChannelFuture server;
private final EventLoopGroup boss = new NioEventLoopGroup(1);
@ -80,12 +82,14 @@ public class NettyConnectionInitializer
final RlpxConfiguration config,
final LocalNode localNode,
final PeerConnectionEventDispatcher eventDispatcher,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final PeerTable peerTable) {
this.nodeKey = nodeKey;
this.config = config;
this.localNode = localNode;
this.eventDispatcher = eventDispatcher;
this.metricsSystem = metricsSystem;
this.peerTable = peerTable;
metricsSystem.createIntegerGauge(
BesuMetricCategory.NETWORK,
@ -244,7 +248,8 @@ public class NettyConnectionInitializer
eventDispatcher,
metricsSystem,
this,
this);
this,
peerTable);
}
@Nonnull
@ -259,7 +264,8 @@ public class NettyConnectionInitializer
eventDispatcher,
metricsSystem,
this,
this);
this,
peerTable);
}
@Nonnull

@ -19,6 +19,7 @@ import static org.hyperledger.besu.ethereum.p2p.rlpx.RlpxFrameConstants.LENGTH_M
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.plain.PlainFramer;
@ -55,7 +56,8 @@ public class NettyTLSConnectionInitializer extends NettyConnectionInitializer {
final LocalNode localNode,
final PeerConnectionEventDispatcher eventDispatcher,
final MetricsSystem metricsSystem,
final TLSConfiguration p2pTLSConfiguration) {
final TLSConfiguration p2pTLSConfiguration,
final PeerTable peerTable) {
this(
nodeKey,
config,
@ -63,7 +65,8 @@ public class NettyTLSConnectionInitializer extends NettyConnectionInitializer {
eventDispatcher,
metricsSystem,
defaultTlsContextFactorySupplier(p2pTLSConfiguration),
p2pTLSConfiguration.getClientHelloSniHeaderEnabled());
p2pTLSConfiguration.getClientHelloSniHeaderEnabled(),
peerTable);
}
@VisibleForTesting
@ -74,8 +77,9 @@ public class NettyTLSConnectionInitializer extends NettyConnectionInitializer {
final PeerConnectionEventDispatcher eventDispatcher,
final MetricsSystem metricsSystem,
final Supplier<TLSContextFactory> tlsContextFactorySupplier,
final Boolean clientHelloSniHeaderEnabled) {
super(nodeKey, config, localNode, eventDispatcher, metricsSystem);
final Boolean clientHelloSniHeaderEnabled,
final PeerTable peerTable) {
super(nodeKey, config, localNode, eventDispatcher, metricsSystem, peerTable);
if (tlsContextFactorySupplier != null) {
this.tlsContextFactorySupplier =
Optional.of(Suppliers.memoize(tlsContextFactorySupplier::get));

@ -295,6 +295,7 @@ public class PeerDiscoveryTestHelper {
config.setAdvertisedHost(advertisedHost);
config.setBindPort(port);
config.setActive(active);
config.setFilterOnEnrForkId(false);
final ForkIdManager mockForkIdManager = mock(ForkIdManager.class);
final ForkId forkId = new ForkId(Bytes.EMPTY, Bytes.EMPTY);

@ -63,7 +63,8 @@ public class MockPeerDiscoveryAgent extends PeerDiscoveryAgent {
new NoOpMetricsSystem(),
new InMemoryKeyValueStorageProvider(),
forkIdManager,
rlpxAgent);
rlpxAgent,
new PeerTable(nodeKey.getPublicKey().getEncodedBytes()));
this.agentNetwork = agentNetwork;
}

@ -35,8 +35,6 @@ import org.hyperledger.besu.crypto.Hash;
import org.hyperledger.besu.crypto.SignatureAlgorithm;
import org.hyperledger.besu.crypto.SignatureAlgorithmFactory;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.Endpoint;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus;
@ -1480,14 +1478,12 @@ public class PeerDiscoveryControllerTest {
}
@Test
public void shouldFiltersOnForkIdSuccess() {
public void forkIdShouldBeAvailableIfEnrPacketContainsForkId() {
final List<NodeKey> nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
final List<DiscoveryPeer> peers = helper.createDiscoveryPeers(nodeKeys);
final ForkIdManager forkIdManager = mock(ForkIdManager.class);
final DiscoveryPeer sender = peers.get(0);
final Packet enrPacket = prepareForForkIdCheck(forkIdManager, nodeKeys, sender, true);
final Packet enrPacket = prepareForForkIdCheck(nodeKeys, sender, true);
when(forkIdManager.peerCheck(any(ForkId.class))).thenReturn(true);
controller.onMessage(enrPacket, sender);
final Optional<DiscoveryPeer> maybePeer =
@ -1501,35 +1497,12 @@ public class PeerDiscoveryControllerTest {
verify(controller, times(1)).connectOnRlpxLayer(eq(maybePeer.get()));
}
@Test
public void shouldFiltersOnForkIdFailure() {
final List<NodeKey> nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
final List<DiscoveryPeer> peers = helper.createDiscoveryPeers(nodeKeys);
final ForkIdManager forkIdManager = mock(ForkIdManager.class);
final DiscoveryPeer sender = peers.get(0);
final Packet enrPacket = prepareForForkIdCheck(forkIdManager, nodeKeys, sender, true);
when(forkIdManager.peerCheck(any(ForkId.class))).thenReturn(false);
controller.onMessage(enrPacket, sender);
final Optional<DiscoveryPeer> maybePeer =
controller
.streamDiscoveredPeers()
.filter(p -> p.getId().equals(sender.getId()))
.findFirst();
assertThat(maybePeer.isPresent()).isTrue();
assertThat(maybePeer.get().getForkId().isPresent()).isTrue();
verify(controller, never()).connectOnRlpxLayer(eq(maybePeer.get()));
}
@Test
public void shouldStillCallConnectIfNoForkIdSent() {
final List<NodeKey> nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
final List<DiscoveryPeer> peers = helper.createDiscoveryPeers(nodeKeys);
final DiscoveryPeer sender = peers.get(0);
final Packet enrPacket =
prepareForForkIdCheck(mock(ForkIdManager.class), nodeKeys, sender, false);
final Packet enrPacket = prepareForForkIdCheck(nodeKeys, sender, false);
controller.onMessage(enrPacket, sender);
@ -1546,10 +1519,7 @@ public class PeerDiscoveryControllerTest {
@NotNull
private Packet prepareForForkIdCheck(
final ForkIdManager forkIdManager,
final List<NodeKey> nodeKeys,
final DiscoveryPeer sender,
final boolean sendForkId) {
final List<NodeKey> nodeKeys, final DiscoveryPeer sender, final boolean sendForkId) {
final HashMap<PacketType, Bytes> packetTypeBytesHashMap = new HashMap<>();
final OutboundMessageHandler outboundMessageHandler =
(dp, pa) -> packetTypeBytesHashMap.put(pa.getType(), pa.getHash());
@ -1573,7 +1543,6 @@ public class PeerDiscoveryControllerTest {
.outboundMessageHandler(outboundMessageHandler)
.enrCache(enrs)
.filterOnForkId(true)
.forkIdManager(forkIdManager)
.build();
// Mock the creation of the PING packet, so that we can control the hash, which gets validated
@ -1720,7 +1689,6 @@ public class PeerDiscoveryControllerTest {
private Cache<Bytes, Packet> enrs =
CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, TimeUnit.SECONDS).build();
private boolean filterOnForkId = false;
private ForkIdManager forkIdManager;
public static ControllerBuilder create() {
return new ControllerBuilder();
@ -1776,11 +1744,6 @@ public class PeerDiscoveryControllerTest {
return this;
}
public ControllerBuilder forkIdManager(final ForkIdManager forkIdManager) {
this.forkIdManager = forkIdManager;
return this;
}
PeerDiscoveryController build() {
checkNotNull(nodeKey);
if (localPeer == null) {
@ -1803,7 +1766,6 @@ public class PeerDiscoveryControllerTest {
.peerPermissions(peerPermissions)
.metricsSystem(new NoOpMetricsSystem())
.cacheForEnrRequests(enrs)
.forkIdManager(forkIdManager == null ? mock(ForkIdManager.class) : forkIdManager)
.filterOnEnrForkId(filterOnForkId)
.rlpxAgent(mock(RlpxAgent.class))
.build());

@ -24,7 +24,6 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryTestHelper;
@ -72,7 +71,6 @@ public class PeerDiscoveryTableRefreshTest {
.tableRefreshIntervalMs(0)
.metricsSystem(new NoOpMetricsSystem())
.rlpxAgent(mock(RlpxAgent.class))
.forkIdManager(mock(ForkIdManager.class))
.build());
controller.start();

@ -43,7 +43,7 @@ public class PeerTableTest {
@Test
public void addPeer() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final PeerTable table = new PeerTable(Peer.randomId());
final List<DiscoveryPeer> peers = helper.createDiscoveryPeers(5);
for (final DiscoveryPeer peer : peers) {
@ -63,7 +63,7 @@ public class PeerTableTest {
.ipAddress("127.0.0.1")
.discoveryAndListeningPorts(12345)
.build());
final PeerTable table = new PeerTable(localPeer.getId(), 16);
final PeerTable table = new PeerTable(localPeer.getId());
final PeerTable.AddResult result = table.tryAdd(localPeer);
assertThat(result.getOutcome()).isEqualTo(AddOutcome.SELF);
@ -72,7 +72,7 @@ public class PeerTableTest {
@Test
public void peerExists() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final PeerTable table = new PeerTable(Peer.randomId());
final DiscoveryPeer peer = helper.createDiscoveryPeer();
assertThat(table.tryAdd(peer).getOutcome()).isEqualTo(AddOutcome.ADDED);
@ -87,7 +87,7 @@ public class PeerTableTest {
@Test
public void peerExists_withDifferentIp() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final PeerTable table = new PeerTable(Peer.randomId());
final Bytes peerId =
SIGNATURE_ALGORITHM.get().generateKeyPair().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
@ -107,7 +107,7 @@ public class PeerTableTest {
@Test
public void peerExists_withDifferentUdpPort() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final PeerTable table = new PeerTable(Peer.randomId());
final Bytes peerId =
SIGNATURE_ALGORITHM.get().generateKeyPair().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
@ -127,7 +127,7 @@ public class PeerTableTest {
@Test
public void peerExists_withDifferentIdAndUdpPort() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final PeerTable table = new PeerTable(Peer.randomId());
final Bytes peerId =
SIGNATURE_ALGORITHM.get().generateKeyPair().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
@ -147,7 +147,7 @@ public class PeerTableTest {
@Test
public void evictExistingPeerShouldEvict() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final PeerTable table = new PeerTable(Peer.randomId());
final DiscoveryPeer peer = helper.createDiscoveryPeer();
table.tryAdd(peer);
@ -158,7 +158,7 @@ public class PeerTableTest {
@Test
public void evictPeerFromEmptyTableShouldNotEvict() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final PeerTable table = new PeerTable(Peer.randomId());
final DiscoveryPeer peer = helper.createDiscoveryPeer();
final EvictResult evictResult = table.tryEvict(peer);
@ -167,7 +167,7 @@ public class PeerTableTest {
@Test
public void evictAbsentPeerShouldNotEvict() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final PeerTable table = new PeerTable(Peer.randomId());
final DiscoveryPeer peer = helper.createDiscoveryPeer();
final List<DiscoveryPeer> otherPeers = helper.createDiscoveryPeers(5);
otherPeers.forEach(table::tryAdd);
@ -179,7 +179,7 @@ public class PeerTableTest {
@Test
public void evictSelfPeerShouldReturnSelfOutcome() {
final DiscoveryPeer peer = helper.createDiscoveryPeer();
final PeerTable table = new PeerTable(peer.getId(), 16);
final PeerTable table = new PeerTable(peer.getId());
final EvictResult evictResult = table.tryEvict(peer);
assertThat(evictResult.getOutcome()).isEqualTo(EvictOutcome.SELF);

@ -57,7 +57,7 @@ public class RecursivePeerRefreshStateTest {
neighborFinder,
timerUtil,
localPeer,
new PeerTable(createId(999), 16),
new PeerTable(createId(999)),
peerPermissions,
5,
100);
@ -180,7 +180,7 @@ public class RecursivePeerRefreshStateTest {
neighborFinder,
timerUtil,
localPeer,
new PeerTable(createId(999), 16),
new PeerTable(createId(999)),
peerPermissions,
5,
1);
@ -466,7 +466,7 @@ public class RecursivePeerRefreshStateTest {
neighborFinder,
timerUtil,
localPeer,
new PeerTable(createId(999), 16),
new PeerTable(createId(999)),
peerPermissions,
5,
100);

@ -55,7 +55,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.vertx.core.Context;
@ -82,7 +81,7 @@ public final class DefaultP2PNetworkTest {
@Mock PeerDiscoveryAgent discoveryAgent;
@Mock RlpxAgent rlpxAgent;
@Captor private ArgumentCaptor<Stream<? extends Peer>> peerStreamCaptor;
@Captor private ArgumentCaptor<DiscoveryPeer> peerCaptor;
private final NetworkingConfiguration config =
NetworkingConfiguration.create()
@ -276,12 +275,9 @@ public final class DefaultP2PNetworkTest {
final DefaultP2PNetwork network = network();
network.attemptPeerConnections();
verify(rlpxAgent, times(1)).connect(peerStreamCaptor.capture());
verify(rlpxAgent, times(1)).connect(peerCaptor.capture());
final List<? extends Peer> capturedPeers =
peerStreamCaptor.getValue().collect(Collectors.toList());
assertThat(capturedPeers.contains(discoPeer)).isTrue();
assertThat(capturedPeers.size()).isEqualTo(1);
assertThat(peerCaptor.getValue()).isEqualTo(discoPeer);
}
@Test
@ -293,12 +289,7 @@ public final class DefaultP2PNetworkTest {
final DefaultP2PNetwork network = network();
network.attemptPeerConnections();
verify(rlpxAgent, times(1)).connect(peerStreamCaptor.capture());
final List<? extends Peer> capturedPeers =
peerStreamCaptor.getValue().collect(Collectors.toList());
assertThat(capturedPeers.contains(discoPeer)).isFalse();
assertThat(capturedPeers.size()).isEqualTo(0);
verify(rlpxAgent, times(0)).connect(any());
}
@Test
@ -314,14 +305,7 @@ public final class DefaultP2PNetworkTest {
final DefaultP2PNetwork network = network();
network.attemptPeerConnections();
verify(rlpxAgent, times(1)).connect(peerStreamCaptor.capture());
final List<? extends Peer> capturedPeers =
peerStreamCaptor.getValue().collect(Collectors.toList());
assertThat(capturedPeers.size()).isEqualTo(3);
assertThat(capturedPeers.get(0)).isEqualTo(discoPeers.get(1));
assertThat(capturedPeers.get(1)).isEqualTo(discoPeers.get(0));
assertThat(capturedPeers.get(2)).isEqualTo(discoPeers.get(2));
verify(rlpxAgent, times(3)).connect(any());
}
@Test

@ -296,7 +296,7 @@ public class RlpxAgentTest {
Stream.generate(PeerTestHelper::createPeer).limit(peerNo);
agent = spy(agent);
agent.connect(peerStream);
peerStream.forEach(agent::connect);
assertThat(agent.getMapOfCompletableFutures().size()).isEqualTo(peerNo);
verify(agent, times(peerNo)).connect(any(Peer.class));

@ -24,6 +24,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.BreachOfProtocolException;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.IncompatiblePeerException;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.PeerChannelClosedException;
@ -104,7 +107,7 @@ public class DeFramerTest {
private final LocalNode localNode =
LocalNode.create(clientId, p2pVersion, capabilities, localEnode);
private final DeFramer deFramer = createDeFramer(null);
private final DeFramer deFramer = createDeFramer(null, Optional.empty());
@BeforeEach
@SuppressWarnings("unchecked")
@ -219,7 +222,7 @@ public class DeFramerTest {
final Peer peer = createRemotePeer();
final PeerInfo remotePeerInfo =
new PeerInfo(p2pVersion, clientId, capabilities, 0, peer.getId());
final DeFramer deFramer = createDeFramer(null);
final DeFramer deFramer = createDeFramer(null, Optional.empty());
final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
@ -260,6 +263,39 @@ public class DeFramerTest {
assertThat(out.size()).isEqualTo(1);
}
@Test
public void decode_duringHandshakeFindsPeerInPeerTable()
throws ExecutionException, InterruptedException {
final ChannelFuture future = NettyMocks.channelFuture(false);
when(channel.closeFuture()).thenReturn(future);
final Peer peer = createRemotePeer();
final PeerInfo remotePeerInfo =
new PeerInfo(p2pVersion, clientId, capabilities, 0, peer.getId());
final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
final Bytes nodeId = helloMessage.getPeerInfo().getNodeId();
final String enodeURLString =
"enode://" + nodeId.toString().substring(2) + "@" + "12.13.14.15:30303?discport=30301";
final Optional<DiscoveryPeer> discoveryPeer =
DiscoveryPeer.from(DefaultPeer.fromURI(enodeURLString));
final ForkId forkId = new ForkId(Bytes.fromHexString("0x190a55ad"), 4L);
discoveryPeer.orElseThrow().setForkId(forkId);
final DeFramer deFramer = createDeFramer(null, discoveryPeer);
final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
when(framer.deframe(eq(data)))
.thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData()))
.thenReturn(null);
final List<Object> out = new ArrayList<>();
deFramer.decode(ctx, data, out);
assertThat(connectFuture).isDone();
assertThat(connectFuture).isNotCompletedExceptionally();
final PeerConnection peerConnection = connectFuture.get();
assertThat(peerConnection.getPeerInfo()).isEqualTo(remotePeerInfo);
assertThat(peerConnection.getPeer().getForkId().orElseThrow()).isEqualTo(forkId);
}
@Test
public void decode_handlesUnexpectedPeerId() {
final ChannelFuture future = NettyMocks.channelFuture(false);
@ -274,7 +310,7 @@ public class DeFramerTest {
capabilities,
peer.getEnodeURL().getListeningPortOrZero(),
mismatchedId);
final DeFramer deFramer = createDeFramer(peer);
final DeFramer deFramer = createDeFramer(peer, Optional.empty());
final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
@ -414,7 +450,10 @@ public class DeFramerTest {
forPeer.getId());
}
private DeFramer createDeFramer(final Peer expectedPeer) {
private DeFramer createDeFramer(
final Peer expectedPeer, final Optional<DiscoveryPeer> peerInPeerTable) {
final PeerTable peerTable = new PeerTable(localNode.getPeerInfo().getNodeId());
peerInPeerTable.ifPresent(peerTable::tryAdd);
return new DeFramer(
framer,
Arrays.asList(MockSubProtocol.create("eth")),
@ -423,6 +462,7 @@ public class DeFramerTest {
connectionEventDispatcher,
connectFuture,
new NoOpMetricsSystem(),
true);
true,
peerTable);
}
}

@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEventDispatcher;
@ -44,6 +45,7 @@ import io.netty.handler.codec.compression.SnappyFrameDecoder;
import io.netty.handler.codec.compression.SnappyFrameEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import org.apache.tuweni.bytes.Bytes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -95,7 +97,8 @@ public class NettyTLSConnectionInitializerTest {
eventDispatcher,
new NoOpMetricsSystem(),
() -> tlsContextFactory,
clientHelloSniHeaderEnabled);
clientHelloSniHeaderEnabled,
new PeerTable(Bytes.random(64)));
}
@Test

Loading…
Cancel
Save