Use Cache for bonding peers (#1175)

Updated to use a Cache for the bonding peers collection.

Signed-off-by: David Mechler <david.mechler@consensys.net>
pull/1183/head
David Mechler 4 years ago committed by GitHub
parent 50736b2bd9
commit 0a2f33acad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java
  2. 62
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java
  3. 11
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java
  4. 13
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryBondingTest.java
  5. 1
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java
  6. 32
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java
  7. 7
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java

@ -21,7 +21,6 @@ import static org.apache.tuweni.bytes.Bytes.wrapBuffer;
import org.hyperledger.besu.crypto.NodeKey;
import org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PacketType;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerRequirement;
@ -191,10 +190,6 @@ public abstract class PeerDiscoveryAgent {
}
}
if (PacketType.PONG.equals(packet.getType())) {
tcpPort = sourceEndpoint.getTcpPort();
}
// Notify the peer controller.
String host = sourceEndpoint.getHost();
int port = sourceEndpoint.getUdpPort();
@ -313,4 +308,8 @@ public abstract class PeerDiscoveryAgent {
public boolean isActive() {
return isActive;
}
public void bond(final DiscoveryPeer peer) {
controller.ifPresent(c -> c.handleBondingRequest(peer));
}
}

@ -38,7 +38,6 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -54,6 +53,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
@ -107,7 +108,8 @@ public class PeerDiscoveryController {
private static final int PEER_REFRESH_ROUND_TIMEOUT_IN_SECONDS = 5;
protected final TimerUtil timerUtil;
private final PeerTable peerTable;
private final Map<Bytes, DiscoveryPeer> bondingPeers;
private final Cache<Bytes, DiscoveryPeer> bondingPeers =
CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, TimeUnit.MINUTES).build();
private final Collection<DiscoveryPeer> bootstrapNodes;
@ -169,7 +171,6 @@ public class PeerDiscoveryController {
this.outboundMessageHandler = outboundMessageHandler;
this.peerBondedObservers = peerBondedObservers;
this.discoveryProtocolLogger = new DiscoveryProtocolLogger(metricsSystem);
this.bondingPeers = new HashMap<>();
this.peerPermissions = new PeerDiscoveryPermissions(localPeer, peerPermissions);
@ -295,35 +296,26 @@ public class PeerDiscoveryController {
return;
}
// Load the peer from the table, or use the instance that comes in.
final Optional<DiscoveryPeer> maybeKnownPeer =
peerTable.get(sender).filter(known -> known.discoveryEndpointMatches(sender));
DiscoveryPeer peer = maybeKnownPeer.orElse(sender);
final boolean peerKnown = maybeKnownPeer.isPresent();
if (!peerKnown && bondingPeers.containsKey(sender.getId())) {
peer = bondingPeers.get(sender.getId());
}
final DiscoveryPeer finalPeer = peer;
final DiscoveryPeer peer = resolvePeer(sender);
switch (packet.getType()) {
case PING:
if (peerPermissions.allowInboundBonding(peer)) {
peer.setLastSeen(System.currentTimeMillis());
final PingPacketData ping = packet.getPacketData(PingPacketData.class).get();
if (!PeerDiscoveryStatus.BONDED.equals(peer.getStatus())
&& !bondingPeers.containsKey(peer.getId())) {
&& (bondingPeers.getIfPresent(sender.getId()) == null)) {
bond(peer);
}
respondToPing(ping, packet.getHash(), peer);
}
break;
case PONG:
bondingPeers.remove(peer.getId());
matchInteraction(packet)
.ifPresent(
interaction -> {
addToPeerTable(finalPeer);
recursivePeerRefreshState.onBondingComplete(finalPeer);
bondingPeers.invalidate(peer.getId());
addToPeerTable(peer);
recursivePeerRefreshState.onBondingComplete(peer);
});
break;
case NEIGHBORS:
@ -331,16 +323,16 @@ public class PeerDiscoveryController {
.ifPresent(
interaction ->
recursivePeerRefreshState.onNeighboursReceived(
finalPeer, getPeersFromNeighborsPacket(packet)));
peer, getPeersFromNeighborsPacket(packet)));
break;
case FIND_NEIGHBORS:
if (!peerKnown || !peerPermissions.allowInboundNeighborsRequest(peer)) {
break;
if (PeerDiscoveryStatus.BONDED.equals(peer.getStatus())
&& peerPermissions.allowInboundNeighborsRequest(peer)) {
final FindNeighborsPacketData fn =
packet.getPacketData(FindNeighborsPacketData.class).get();
respondToFindNeighbors(fn, peer);
}
final FindNeighborsPacketData fn =
packet.getPacketData(FindNeighborsPacketData.class).get();
respondToFindNeighbors(fn, peer);
break;
}
}
@ -586,6 +578,30 @@ public class PeerDiscoveryController {
this.retryDelayFunction = retryDelayFunction;
}
public void handleBondingRequest(final DiscoveryPeer peer) {
final DiscoveryPeer peerToBond = resolvePeer(peer);
if (peerPermissions.allowOutboundBonding(peerToBond)
&& PeerDiscoveryStatus.KNOWN.equals(peerToBond.getStatus())) {
bond(peerToBond);
}
}
// Load the peer first from the table, then from bonding cache or use the instance that comes in.
private DiscoveryPeer resolvePeer(final DiscoveryPeer peer) {
final Optional<DiscoveryPeer> maybeKnownPeer =
peerTable.get(peer).filter(known -> known.discoveryEndpointMatches(peer));
DiscoveryPeer resolvedPeer = maybeKnownPeer.orElse(peer);
if (maybeKnownPeer.isEmpty()) {
DiscoveryPeer bondingPeer = bondingPeers.getIfPresent(peer.getId());
if (bondingPeer != null) {
resolvedPeer = bondingPeer;
}
}
return resolvedPeer;
}
/** Holds the state machine data for a peer interaction. */
private class PeerInteractionState implements Predicate<Packet> {

@ -24,7 +24,6 @@ import static org.mockito.Mockito.when;
import org.hyperledger.besu.crypto.NodeKey;
import org.hyperledger.besu.crypto.NodeKeyUtils;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryTestHelper.AgentBuilder;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.FindNeighborsPacketData;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.MockPeerDiscoveryAgent;
@ -218,7 +217,7 @@ public class PeerDiscoveryAgentTest {
final MockPeerDiscoveryAgent agent =
helper.startDiscoveryAgent(Collections.emptyList(), peerPermissions);
assertThat(agent.getAdvertisedPeer().isPresent()).isTrue();
final Peer localNode = agent.getAdvertisedPeer().get();
final DiscoveryPeer localNode = agent.getAdvertisedPeer().get();
// Setup peer and permissions
final MockPeerDiscoveryAgent otherNode = helper.startDiscoveryAgent();
@ -233,10 +232,10 @@ public class PeerDiscoveryAgentTest {
.thenReturn(true);
// Bond
bondViaIncomingPing(agent, otherNode);
otherNode.bond(localNode);
List<IncomingPacket> remoteIncomingPackets = otherNode.getIncomingPackets();
assertThat(remoteIncomingPackets).hasSize(3);
assertThat(remoteIncomingPackets).hasSize(2);
final IncomingPacket firstMsg = remoteIncomingPackets.get(0);
assertThat(firstMsg.packet.getType()).isEqualTo(PacketType.PING);
assertThat(firstMsg.fromAgent).isEqualTo(agent);
@ -373,10 +372,6 @@ public class PeerDiscoveryAgentTest {
assertThat(remoteAgent.getAdvertisedPeer().isPresent()).isTrue();
final DiscoveryPeer remotePeer = remoteAgent.getAdvertisedPeer().get();
// Send PONG so that peers will be bonded
final Packet pongPacket = helper.createPongPacket(agent, Hash.EMPTY);
helper.sendMessageBetweenAgents(remoteAgent, agent, pongPacket);
// Remote agent should have bonded with agent
assertThat(agent.streamDiscoveredPeers()).hasSize(1);
assertThat(agent.streamDiscoveredPeers()).contains(remotePeer);

@ -41,14 +41,14 @@ public class PeerDiscoveryBondingTest {
// Start a test peer and send a PING packet to the agent under test.
final MockPeerDiscoveryAgent otherAgent = helper.startDiscoveryAgent();
final Packet ping = helper.createPingPacket(otherAgent, agent);
helper.sendMessageBetweenAgents(otherAgent, agent, ping);
assertThat(agent.getAdvertisedPeer().isPresent()).isTrue();
otherAgent.bond(agent.getAdvertisedPeer().get());
final List<IncomingPacket> otherAgentIncomingPongs =
otherAgent.getIncomingPackets().stream()
.filter(p -> p.packet.getType().equals(PacketType.PONG))
.collect(Collectors.toList());
assertThat(otherAgentIncomingPongs.size()).isEqualTo(2);
assertThat(otherAgentIncomingPongs.size()).isEqualTo(1);
assertThat(
otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).isPresent())
@ -81,16 +81,15 @@ public class PeerDiscoveryBondingTest {
final List<IncomingPacket> incoming = otherNode.getIncomingPackets();
assertThat(incoming.size()).isEqualTo(0);
// Create and dispatch a PING packet.
final Packet ping = helper.createPingPacket(otherNode, agent);
helper.sendMessageBetweenAgents(otherNode, agent, ping);
assertThat(agent.getAdvertisedPeer().isPresent()).isTrue();
otherNode.bond(agent.getAdvertisedPeer().get());
// Now we received a PONG.
final List<IncomingPacket> incomingPongs =
otherNode.getIncomingPackets().stream()
.filter(p -> p.packet.getType().equals(PacketType.PONG))
.collect(Collectors.toList());
assertThat(incomingPongs.size()).isEqualTo(2);
assertThat(incomingPongs.size()).isEqualTo(1);
final Optional<PongPacketData> maybePongData =
incomingPongs.get(0).packet.getPacketData(PongPacketData.class);
assertThat(maybePongData).isPresent();

@ -95,7 +95,6 @@ public class PeerDiscoveryTimestampsTest {
firstDiscovered.set(p.getFirstDiscovered());
controller.onMessage(pingPacket, peers.get(1));
controller.onMessage(pongPacket, peers.get(1));
assertThat(controller.streamDiscoveredPeers()).hasSize(1);

@ -153,14 +153,14 @@ public class PeerDiscoveryControllerTest {
}
private void mockPingPacketCreation(final Packet mockPacket) {
mockPacketCreation(Optional.empty(), mockPacket);
mockPingPacketCreation(Optional.empty(), mockPacket);
}
private void mockPacketCreation(final DiscoveryPeer peer, final Packet mockPacket) {
mockPacketCreation(Optional.of(peer), mockPacket);
private void mockPingPacketCreation(final DiscoveryPeer peer, final Packet mockPacket) {
mockPingPacketCreation(Optional.of(peer), mockPacket);
}
private void mockPacketCreation(final Optional<DiscoveryPeer> peer, final Packet mockPacket) {
private void mockPingPacketCreation(final Optional<DiscoveryPeer> peer, final Packet mockPacket) {
doAnswer(
invocation -> {
final Consumer<Packet> handler = invocation.getArgument(2);
@ -283,7 +283,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData pingPacketData =
PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(discoPeer, discoPeerPing);
mockPingPacketCreation(discoPeer, discoPeerPing);
controller.onMessage(discoPeerPing, discoPeer);
@ -314,7 +314,7 @@ public class PeerDiscoveryControllerTest {
discoPeer.getEndpoint(),
Instant.now().getEpochSecond() - PacketData.DEFAULT_EXPIRATION_PERIOD_SEC);
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(discoPeer, discoPeerPing);
mockPingPacketCreation(discoPeer, discoPeerPing);
controller.onMessage(discoPeerPing, discoPeer);
@ -659,7 +659,7 @@ public class PeerDiscoveryControllerTest {
List<NodeKey> nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(discoPeer, discoPeerPing);
mockPingPacketCreation(discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1))
@ -676,13 +676,13 @@ public class PeerDiscoveryControllerTest {
nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
pingPacketData = PingPacketData.create(localEndpoint, otherPeer.getEndpoint());
final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(otherPeer, pingPacket);
mockPingPacketCreation(otherPeer, pingPacket);
// Setup ping to be sent to otherPeer2 after neighbors packet is received
nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
pingPacketData = PingPacketData.create(localEndpoint, otherPeer2.getEndpoint());
final Packet pingPacket2 = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(otherPeer2, pingPacket2);
mockPingPacketCreation(otherPeer2, pingPacket2);
final Packet neighborsPacket =
MockPacketDataFactory.mockNeighborsPacket(discoPeer, otherPeer, otherPeer2);
@ -737,7 +737,7 @@ public class PeerDiscoveryControllerTest {
List<NodeKey> nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(discoPeer, discoPeerPing);
mockPingPacketCreation(discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING));
@ -753,13 +753,13 @@ public class PeerDiscoveryControllerTest {
nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
pingPacketData = PingPacketData.create(localEndpoint, otherPeer.getEndpoint());
final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(otherPeer, pingPacket);
mockPingPacketCreation(otherPeer, pingPacket);
// Setup ping to be sent to otherPeer2 after neighbors packet is received
nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
pingPacketData = PingPacketData.create(localEndpoint, otherPeer2.getEndpoint());
final Packet pingPacket2 = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(otherPeer2, pingPacket2);
mockPingPacketCreation(otherPeer2, pingPacket2);
// Blacklist peer
blacklist.add(otherPeer);
@ -792,7 +792,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData pingPacketData =
PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(discoPeer, discoPeerPing);
mockPingPacketCreation(discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING));
@ -832,7 +832,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData pingPacketData =
PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(discoPeer, discoPeerPing);
mockPingPacketCreation(discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING));
@ -871,7 +871,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData pingPacketData =
PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(discoPeer, discoPeerPing);
mockPingPacketCreation(discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING));
@ -914,7 +914,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData pingPacketData =
PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0));
mockPacketCreation(discoPeer, discoPeerPing);
mockPingPacketCreation(discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING));

@ -88,9 +88,6 @@ public class PeerDiscoveryTableRefreshTest {
final ArgumentCaptor<Packet> captor = ArgumentCaptor.forClass(Packet.class);
for (int i = 0; i < 5; i++) {
controller.getRecursivePeerRefreshState().cancel();
timer.runPeriodicHandlers();
controller.streamDiscoveredPeers().forEach(p -> p.setStatus(PeerDiscoveryStatus.KNOWN));
controller.onMessage(pingPacket, peers.get(1));
final PingPacketData refreshData =
@ -102,6 +99,10 @@ public class PeerDiscoveryTableRefreshTest {
final Packet refreshPongPacket = Packet.create(PacketType.PONG, refreshPong, nodeKeys.get(1));
controller.onMessage(refreshPongPacket, peers.get(1));
controller.getRecursivePeerRefreshState().cancel();
timer.runPeriodicHandlers();
controller.streamDiscoveredPeers().forEach(p -> p.setStatus(PeerDiscoveryStatus.KNOWN));
}
verify(outboundMessageHandler, atLeast(5)).send(eq(peers.get(1)), captor.capture());

Loading…
Cancel
Save