Improve finding peers (#7626)

* add check before adding peer to peer table

Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
pull/7701/head
Stefan Pingel 2 months ago committed by GitHub
parent c3aa3f4ecc
commit 9c80c9bf42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 31
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/DiscoveryPeer.java
  2. 2
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java
  3. 5
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryStatus.java
  4. 49
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java
  5. 12
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java
  6. 2
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java
  7. 5
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyConnectionInitializer.java
  8. 7
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java
  9. 8
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryBondingTest.java
  10. 13
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java
  11. 49
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java
  12. 79
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java
  13. 10
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java
  14. 4
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java

@ -17,12 +17,12 @@ package org.hyperledger.besu.ethereum.p2p.discovery;
import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeer;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import org.hyperledger.besu.ethereum.rlp.RLPOutput;
import org.hyperledger.besu.plugin.data.EnodeURL;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.tuweni.bytes.Bytes;
import org.ethereum.beacon.discovery.schema.NodeRecord;
@ -37,9 +37,7 @@ public class DiscoveryPeer extends DefaultPeer {
private final Endpoint endpoint;
// Timestamps.
private long firstDiscovered = 0;
private long lastContacted = 0;
private long lastSeen = 0;
private final AtomicLong firstDiscovered = new AtomicLong(0L);
private long lastAttemptedConnection = 0;
private NodeRecord nodeRecord;
@ -96,20 +94,11 @@ public class DiscoveryPeer extends DefaultPeer {
}
public long getFirstDiscovered() {
return firstDiscovered;
return firstDiscovered.get();
}
public PeerId setFirstDiscovered(final long firstDiscovered) {
this.firstDiscovered = firstDiscovered;
return this;
}
public long getLastContacted() {
return lastContacted;
}
public void setLastContacted(final long lastContacted) {
this.lastContacted = lastContacted;
public void setFirstDiscovered(final long firstDiscovered) {
this.firstDiscovered.compareAndExchange(0L, firstDiscovered);
}
public long getLastAttemptedConnection() {
@ -120,14 +109,6 @@ public class DiscoveryPeer extends DefaultPeer {
this.lastAttemptedConnection = lastAttemptedConnection;
}
public long getLastSeen() {
return lastSeen;
}
public void setLastSeen(final long lastSeen) {
this.lastSeen = lastSeen;
}
public Endpoint getEndpoint() {
return endpoint;
}
@ -163,8 +144,6 @@ public class DiscoveryPeer extends DefaultPeer {
sb.append("status=").append(status);
sb.append(", enode=").append(this.getEnodeURL());
sb.append(", firstDiscovered=").append(firstDiscovered);
sb.append(", lastContacted=").append(lastContacted);
sb.append(", lastSeen=").append(lastSeen);
sb.append('}');
return sb.toString();
}

@ -365,9 +365,7 @@ public abstract class PeerDiscoveryAgent {
(res, err) -> {
if (err != null) {
handleOutgoingPacketError(err, peer, packet);
return;
}
peer.setLastContacted(System.currentTimeMillis());
});
}

@ -35,10 +35,7 @@ public enum PeerDiscoveryStatus {
* We have successfully bonded with this {@link DiscoveryPeer}, and we are able to exchange
* messages with them.
*/
BONDED,
/** We have requested the ENR record from this {@link DiscoveryPeer} */
ENR_REQUESTED;
BONDED;
@Override
public String toString() {

@ -27,6 +27,7 @@ import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
@ -43,6 +44,7 @@ import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -321,7 +323,6 @@ public class PeerDiscoveryController {
switch (packet.getType()) {
case PING:
if (peerPermissions.allowInboundBonding(peer)) {
peer.setLastSeen(System.currentTimeMillis());
final PingPacketData ping = packet.getPacketData(PingPacketData.class).get();
if (!PeerDiscoveryStatus.BONDED.equals(peer.getStatus())
&& (bondingPeers.getIfPresent(sender.getId()) == null)) {
@ -338,7 +339,7 @@ public class PeerDiscoveryController {
requestENR(peer);
}
bondingPeers.invalidate(peerId);
addToPeerTable(peer);
checkBeforeAddingToPeerTable(peer);
recursivePeerRefreshState.onBondingComplete(peer);
Optional.ofNullable(cachedEnrRequests.getIfPresent(peerId))
.ifPresent(cachedEnrRequest -> processEnrRequest(peer, cachedEnrRequest));
@ -405,22 +406,33 @@ public class PeerDiscoveryController {
.collect(Collectors.toList());
}
private boolean addToPeerTable(final DiscoveryPeer peer) {
final PeerTable.AddResult result = peerTable.tryAdd(peer);
if (result.getOutcome() != PeerTable.AddResult.AddOutcome.INVALID) {
// Reset the last seen timestamp.
final long now = System.currentTimeMillis();
if (peer.getFirstDiscovered() == 0) {
peer.setFirstDiscovered(now);
private void checkBeforeAddingToPeerTable(final DiscoveryPeer peer) {
if (peerTable.isIpAddressInvalid(peer.getEndpoint())) {
return;
}
peer.setLastSeen(now);
if (peer.getStatus() != PeerDiscoveryStatus.BONDED) {
if (peer.getFirstDiscovered() == 0L) {
connectOnRlpxLayer(peer)
.whenComplete(
(pc, th) -> {
if (th == null || !(th.getCause() instanceof TimeoutException)) {
peer.setStatus(PeerDiscoveryStatus.BONDED);
connectOnRlpxLayer(peer);
peer.setFirstDiscovered(System.currentTimeMillis());
addToPeerTable(peer);
} else {
LOG.debug("Handshake timed out with peer {}", peer.getLoggableId(), th);
peerTable.invalidateIP(peer.getEndpoint());
}
});
} else {
peer.setStatus(PeerDiscoveryStatus.BONDED);
addToPeerTable(peer);
}
}
public void addToPeerTable(final DiscoveryPeer peer) {
final PeerTable.AddResult result = peerTable.tryAdd(peer);
if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) {
// Bump peer.
peerTable.tryEvict(peer);
@ -429,14 +441,10 @@ public class PeerDiscoveryController {
peerTable.tryEvict(result.getEvictionCandidate());
peerTable.tryAdd(peer);
}
return true;
}
return false;
}
void connectOnRlpxLayer(final DiscoveryPeer peer) {
rlpxAgent.connect(peer);
CompletableFuture<PeerConnection> connectOnRlpxLayer(final DiscoveryPeer peer) {
return rlpxAgent.connect(peer);
}
private Optional<PeerInteractionState> matchInteraction(final Packet packet) {
@ -512,7 +520,6 @@ public class PeerDiscoveryController {
return;
}
peer.setFirstDiscovered(System.currentTimeMillis());
peer.setStatus(PeerDiscoveryStatus.BONDING);
bondingPeers.put(peer.getId(), peer);
@ -719,7 +726,7 @@ public class PeerDiscoveryController {
// Load the peer first from the table, then from bonding cache or use the instance that comes in.
private DiscoveryPeer resolvePeer(final DiscoveryPeer peer) {
if (peerTable.ipAddressIsInvalid(peer.getEndpoint())) {
if (peerTable.isIpAddressInvalid(peer.getEndpoint())) {
return null;
}
final Optional<DiscoveryPeer> maybeKnownPeer =

@ -112,7 +112,7 @@ public class PeerTable {
* @see AddOutcome
*/
public AddResult tryAdd(final DiscoveryPeer peer) {
if (ipAddressIsInvalid(peer.getEndpoint())) {
if (isIpAddressInvalid(peer.getEndpoint())) {
return AddResult.invalid();
}
final Bytes id = peer.getId();
@ -212,7 +212,7 @@ public class PeerTable {
return Arrays.stream(table).flatMap(e -> e.getPeers().stream());
}
boolean ipAddressIsInvalid(final Endpoint endpoint) {
public boolean isIpAddressInvalid(final Endpoint endpoint) {
final String key = getKey(endpoint);
if (invalidIPs.contains(key)) {
return true;
@ -223,7 +223,7 @@ public class PeerTable {
for (final Bucket bucket : table) {
bucket.getPeers().stream()
.filter(p -> p.getEndpoint().getHost().equals(endpoint.getHost()))
.forEach(p -> evictAndStore(p, bucket, key));
.forEach(bucket::evict);
}
return true;
} else {
@ -231,13 +231,13 @@ public class PeerTable {
}
}
private void evictAndStore(final DiscoveryPeer peer, final Bucket bucket, final String key) {
bucket.evict(peer);
public void invalidateIP(final Endpoint endpoint) {
final String key = getKey(endpoint);
invalidIPs.add(key);
}
private static String getKey(final Endpoint endpoint) {
return endpoint.getHost() + endpoint.getFunctionalTcpPort();
return endpoint.getHost() + ":" + endpoint.getFunctionalTcpPort();
}
/**

@ -202,7 +202,7 @@ public class RecursivePeerRefreshState {
return !oneTrueMap.containsKey(discoPeer.getId())
&& (initialPeers.contains(discoPeer) || !peerTable.get(discoPeer).isPresent())
&& !discoPeer.getId().equals(localPeer.getId())
&& !peerTable.ipAddressIsInvalid(discoPeer.getEndpoint());
&& !peerTable.isIpAddressInvalid(discoPeer.getEndpoint());
}
void onNeighboursReceived(final DiscoveryPeer peer, final List<DiscoveryPeer> peers) {

@ -16,7 +16,6 @@ package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
@ -174,10 +173,6 @@ public class NettyConnectionInitializer
public CompletableFuture<PeerConnection> connect(final Peer peer) {
final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>();
if (peer instanceof DiscoveryPeer) {
((DiscoveryPeer) peer).setLastAttemptedConnection(System.currentTimeMillis());
}
final EnodeURL enode = peer.getEnodeURL();
new Bootstrap()
.group(workers)

@ -182,7 +182,7 @@ public class PeerDiscoveryAgentTest {
}
@Test
public void neighborsPacketLimited() {
public void neighborsPacketLimited() throws InterruptedException {
// Start 20 agents with no bootstrap peers.
final List<MockPeerDiscoveryAgent> otherAgents =
helper.startDiscoveryAgents(20, Collections.emptyList());
@ -192,8 +192,9 @@ public class PeerDiscoveryAgentTest {
.map(Optional::get)
.collect(Collectors.toList());
// Start another peer pointing to those 20 agents.
// Start another peer
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(otherPeers);
// We used to do a hasSize match but we had issues with duplicate peers getting added to the
// list. By moving to a contains we make sure that all the peers are loaded with tolerance for
// duplicates. If we fix the duplication problem we should use containsExactlyInAnyOrder to
@ -222,7 +223,7 @@ public class PeerDiscoveryAgentTest {
final List<IncomingPacket> incomingPackets =
testAgent.getIncomingPackets().stream()
.filter(p -> p.packet.getType().equals(PacketType.NEIGHBORS))
.collect(toList());
.toList();
assertThat(incomingPackets.size()).isEqualTo(1);
final IncomingPacket neighborsPacket = incomingPackets.get(0);
assertThat(neighborsPacket.fromAgent).isEqualTo(agent);

@ -47,11 +47,15 @@ public class PeerDiscoveryBondingTest {
final List<IncomingPacket> otherAgentIncomingPongs =
otherAgent.getIncomingPackets().stream()
.filter(p -> p.packet.getType().equals(PacketType.PONG))
.collect(Collectors.toList());
.toList();
assertThat(otherAgentIncomingPongs.size()).isEqualTo(1);
assertThat(
otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).isPresent())
otherAgentIncomingPongs
.getFirst()
.packet
.getPacketData(PongPacketData.class)
.isPresent())
.isTrue();
final PongPacketData pong =
otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).get();

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.p2p.discovery;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Arrays.asList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -44,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -301,15 +303,12 @@ public class PeerDiscoveryTestHelper {
final ForkId forkId = new ForkId(Bytes.EMPTY, Bytes.EMPTY);
when(mockForkIdManager.getForkIdForChainHead()).thenReturn(forkId);
when(mockForkIdManager.peerCheck(forkId)).thenReturn(true);
final RlpxAgent rlpxAgent = mock(RlpxAgent.class);
when(rlpxAgent.connect(any()))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException()));
final MockPeerDiscoveryAgent mockPeerDiscoveryAgent =
new MockPeerDiscoveryAgent(
nodeKey,
config,
peerPermissions,
agents,
natService,
mockForkIdManager,
mock(RlpxAgent.class));
nodeKey, config, peerPermissions, agents, natService, mockForkIdManager, rlpxAgent);
mockPeerDiscoveryAgent.getAdvertisedPeer().ifPresent(peer -> peer.setNodeRecord(nodeRecord));
return mockPeerDiscoveryAgent;

@ -40,16 +40,13 @@ public class PeerDiscoveryTimestampsTest {
final Packet pong = helper.createPongPacket(agent, Hash.hash(agentPing.getHash()));
helper.sendMessageBetweenAgents(testAgent, agent, pong);
long lastSeen;
long firstDiscovered;
assertThat(agent.streamDiscoveredPeers()).hasSize(1);
DiscoveryPeer p = agent.streamDiscoveredPeers().iterator().next();
assertThat(p.getLastSeen()).isGreaterThan(0);
assertThat(p.getFirstDiscovered()).isGreaterThan(0);
lastSeen = p.getLastSeen();
firstDiscovered = p.getFirstDiscovered();
helper.sendMessageBetweenAgents(testAgent, agent, testAgentPing);
@ -57,52 +54,6 @@ public class PeerDiscoveryTimestampsTest {
assertThat(agent.streamDiscoveredPeers()).hasSize(1);
p = agent.streamDiscoveredPeers().iterator().next();
assertThat(p.getLastSeen()).isGreaterThan(lastSeen);
assertThat(p.getFirstDiscovered()).isEqualTo(firstDiscovered);
}
@Test
public void lastContactedTimestampUpdatedOnOutboundMessage() {
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(Collections.emptyList());
assertThat(agent.streamDiscoveredPeers()).hasSize(0);
// Start a test peer and send a PING packet to the agent under test.
final MockPeerDiscoveryAgent testAgent = helper.startDiscoveryAgent();
final Packet ping = helper.createPingPacket(testAgent, agent);
helper.sendMessageBetweenAgents(testAgent, agent, ping);
assertThat(agent.streamDiscoveredPeers()).hasSize(1);
final long lastContacted;
final long lastSeen;
final long firstDiscovered;
DiscoveryPeer peer = agent.streamDiscoveredPeers().iterator().next();
final long lc = peer.getLastContacted();
final long ls = peer.getLastSeen();
final long fd = peer.getFirstDiscovered();
assertThat(lc).isGreaterThan(0);
assertThat(ls).isGreaterThan(0);
assertThat(fd).isGreaterThan(0);
lastContacted = lc;
lastSeen = ls;
firstDiscovered = fd;
// Send another packet and ensure that timestamps are updated accordingly.
// Sleep beforehand to make sure timestamps will be different.
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// Swallow exception because we only want to pause the test.
}
helper.sendMessageBetweenAgents(testAgent, agent, ping);
peer = agent.streamDiscoveredPeers().iterator().next();
assertThat(peer.getLastContacted()).isGreaterThan(lastContacted);
assertThat(peer.getLastSeen()).isGreaterThan(lastSeen);
assertThat(peer.getFirstDiscovered()).isEqualTo(firstDiscovered);
}
}

@ -55,7 +55,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -468,14 +470,13 @@ public class PeerDiscoveryControllerTest {
.build();
// Mock the creation of the PING packet, so that we can control the hash, which gets validated
// when
// processing the PONG.
// when processing the PONG.
final PingPacketData mockPing =
PingPacketData.create(
Optional.ofNullable(localPeer.getEndpoint()), peers.get(0).getEndpoint(), UInt64.ONE);
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, nodeKeys.get(0));
mockPingPacketCreation(mockPacket);
controller.setRetryDelayFunction((prev) -> 999999999L);
controller.setRetryDelayFunction(PeerDiscoveryControllerTest::longDelayFunction);
controller.start();
// Verify that the PING was sent.
@ -506,11 +507,68 @@ public class PeerDiscoveryControllerTest {
.isEqualTo(PeerDiscoveryStatus.BONDED);
}
@Test
public void addedToInvalidIpsWhenConnectTimedOut() {
// Create a peer
final List<NodeKey> nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
final NodeKey nodeKey = nodeKeys.getFirst();
final DiscoveryPeer peerThatTimesOut = helper.createDiscoveryPeers(nodeKeys).getFirst();
// Initialize the peer controller, using a rlpx agent that times out when asked to connect.
// Set a high controller refresh interval and a high timeout threshold, to avoid retries
// getting in the way of this test.
final OutboundMessageHandler outboundMessageHandler = mock(OutboundMessageHandler.class);
RlpxAgent rlpxAgentMock = mock(RlpxAgent.class);
when(rlpxAgentMock.connect(any()))
.thenReturn(CompletableFuture.failedFuture(new Exception(new TimeoutException())));
controller =
getControllerBuilder()
.outboundMessageHandler(outboundMessageHandler)
.rlpxAgent(rlpxAgentMock)
.build();
// Mock the creation of the PING packet, so that we can control the hash, which gets validated
// when processing the PONG.
final PingPacketData mockPing =
PingPacketData.create(
Optional.ofNullable(localPeer.getEndpoint()),
peerThatTimesOut.getEndpoint(),
UInt64.ONE);
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, nodeKey);
mockPingPacketCreation(mockPacket);
controller.setRetryDelayFunction(PeerDiscoveryControllerTest::longDelayFunction);
controller.start();
controller.handleBondingRequest(peerThatTimesOut);
// Verify that the PING was sent.
verify(outboundMessageHandler, times(1))
.send(eq(peerThatTimesOut), matchPacketOfType(PacketType.PING));
// Simulate a PONG message from the peer.
respondWithPong(peerThatTimesOut, nodeKey, mockPacket.getHash());
final List<DiscoveryPeer> peersInTable = controller.streamDiscoveredPeers().toList();
assertThat(peersInTable).hasSize(0);
assertThat(peersInTable).doesNotContain(peerThatTimesOut);
// Try bonding again, and check that the peer is not sent the PING packet again
controller.handleBondingRequest(peerThatTimesOut);
// verify that the ping was not sent, no additional interaction
verify(outboundMessageHandler, times(1))
.send(eq(peerThatTimesOut), matchPacketOfType(PacketType.PING));
}
private ControllerBuilder getControllerBuilder() {
final RlpxAgent rlpxAgent = mock(RlpxAgent.class);
when(rlpxAgent.connect(any()))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException()));
return ControllerBuilder.create()
.nodeKey(localNodeKey)
.localPeer(localPeer)
.peerTable(peerTable);
.peerTable(peerTable)
.rlpxAgent(rlpxAgent);
}
private void respondWithPong(
@ -544,7 +602,7 @@ public class PeerDiscoveryControllerTest {
mockPingPacketCreation(pingPacket);
controller.setRetryDelayFunction((prev) -> 999999999L);
controller.setRetryDelayFunction(PeerDiscoveryControllerTest::longDelayFunction);
controller.start();
verify(outboundMessageHandler, times(1))
@ -994,7 +1052,7 @@ public class PeerDiscoveryControllerTest {
.build();
mockPingPacketCreation(pingPacket);
controller.setRetryDelayFunction((prev) -> 999999999L);
controller.setRetryDelayFunction(PeerDiscoveryControllerTest::longDelayFunction);
controller.start();
verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING));
@ -1689,6 +1747,7 @@ public class PeerDiscoveryControllerTest {
private Cache<Bytes, Packet> enrs =
CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, TimeUnit.SECONDS).build();
private boolean filterOnForkId = false;
private RlpxAgent rlpxAgent;
public static ControllerBuilder create() {
return new ControllerBuilder();
@ -1744,6 +1803,11 @@ public class PeerDiscoveryControllerTest {
return this;
}
public ControllerBuilder rlpxAgent(final RlpxAgent rlpxAgent) {
this.rlpxAgent = rlpxAgent;
return this;
}
PeerDiscoveryController build() {
checkNotNull(nodeKey);
if (localPeer == null) {
@ -1752,6 +1816,7 @@ public class PeerDiscoveryControllerTest {
if (peerTable == null) {
peerTable = new PeerTable(localPeer.getId());
}
return spy(
PeerDiscoveryController.builder()
.nodeKey(nodeKey)
@ -1767,7 +1832,7 @@ public class PeerDiscoveryControllerTest {
.metricsSystem(new NoOpMetricsSystem())
.cacheForEnrRequests(enrs)
.filterOnEnrForkId(filterOnForkId)
.rlpxAgent(mock(RlpxAgent.class))
.rlpxAgent(rlpxAgent)
.build());
}
}

@ -22,6 +22,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
@ -34,8 +35,8 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.units.bigints.UInt64;
@ -59,6 +60,9 @@ public class PeerDiscoveryTableRefreshTest {
final OutboundMessageHandler outboundMessageHandler = mock(OutboundMessageHandler.class);
final MockTimerUtil timer = new MockTimerUtil();
final RlpxAgent rlpxAgent = mock(RlpxAgent.class);
when(rlpxAgent.connect(any()))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException()));
final PeerDiscoveryController controller =
spy(
PeerDiscoveryController.builder()
@ -70,7 +74,7 @@ public class PeerDiscoveryTableRefreshTest {
.workerExecutor(new BlockingAsyncExecutor())
.tableRefreshIntervalMs(0)
.metricsSystem(new NoOpMetricsSystem())
.rlpxAgent(mock(RlpxAgent.class))
.rlpxAgent(rlpxAgent)
.build());
controller.start();
@ -117,7 +121,7 @@ public class PeerDiscoveryTableRefreshTest {
final List<Packet> capturedFindNeighborsPackets =
captor.getAllValues().stream()
.filter(p -> p.getType().equals(PacketType.FIND_NEIGHBORS))
.collect(Collectors.toList());
.toList();
assertThat(capturedFindNeighborsPackets.size()).isEqualTo(5);
// Collect targets from find neighbors packets

@ -196,7 +196,7 @@ public class PeerTableTest {
final PeerTable.AddResult addResult1 = table.tryAdd(peer1);
assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome());
assertThat(table.ipAddressIsInvalid(peer2.getEndpoint())).isEqualTo(true);
assertThat(table.isIpAddressInvalid(peer2.getEndpoint())).isEqualTo(true);
}
@Test
@ -210,7 +210,7 @@ public class PeerTableTest {
final PeerTable.AddResult addResult1 = table.tryAdd(peer1);
assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome());
assertThat(table.ipAddressIsInvalid(peer2.getEndpoint())).isEqualTo(false);
assertThat(table.isIpAddressInvalid(peer2.getEndpoint())).isEqualTo(false);
}
@Test

Loading…
Cancel
Save