From 0a2f33acad64ab717f66b90f744837559d2f6f5d Mon Sep 17 00:00:00 2001 From: David Mechler Date: Tue, 30 Jun 2020 12:56:14 -0400 Subject: [PATCH] Use Cache for bonding peers (#1175) Updated to use a Cache for the bonding peers collection. Signed-off-by: David Mechler --- .../p2p/discovery/PeerDiscoveryAgent.java | 9 ++- .../internal/PeerDiscoveryController.java | 62 ++++++++++++------- .../p2p/discovery/PeerDiscoveryAgentTest.java | 11 +--- .../discovery/PeerDiscoveryBondingTest.java | 13 ++-- .../PeerDiscoveryTimestampsTest.java | 1 - .../internal/PeerDiscoveryControllerTest.java | 32 +++++----- .../PeerDiscoveryTableRefreshTest.java | 7 ++- 7 files changed, 72 insertions(+), 63 deletions(-) diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java index df08ff3c43..0eab68727a 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java @@ -21,7 +21,6 @@ import static org.apache.tuweni.bytes.Bytes.wrapBuffer; import org.hyperledger.besu.crypto.NodeKey; import org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration; import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet; -import org.hyperledger.besu.ethereum.p2p.discovery.internal.PacketType; import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController; import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor; import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerRequirement; @@ -191,10 +190,6 @@ public abstract class PeerDiscoveryAgent { } } - if (PacketType.PONG.equals(packet.getType())) { - tcpPort = sourceEndpoint.getTcpPort(); - } - // Notify the peer controller. String host = sourceEndpoint.getHost(); int port = sourceEndpoint.getUdpPort(); @@ -313,4 +308,8 @@ public abstract class PeerDiscoveryAgent { public boolean isActive() { return isActive; } + + public void bond(final DiscoveryPeer peer) { + controller.ifPresent(c -> c.handleBondingRequest(peer)); + } } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java index 4cf3860150..37a535c81e 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java @@ -38,7 +38,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -54,6 +53,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes; @@ -107,7 +108,8 @@ public class PeerDiscoveryController { private static final int PEER_REFRESH_ROUND_TIMEOUT_IN_SECONDS = 5; protected final TimerUtil timerUtil; private final PeerTable peerTable; - private final Map bondingPeers; + private final Cache bondingPeers = + CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, TimeUnit.MINUTES).build(); private final Collection bootstrapNodes; @@ -169,7 +171,6 @@ public class PeerDiscoveryController { this.outboundMessageHandler = outboundMessageHandler; this.peerBondedObservers = peerBondedObservers; this.discoveryProtocolLogger = new DiscoveryProtocolLogger(metricsSystem); - this.bondingPeers = new HashMap<>(); this.peerPermissions = new PeerDiscoveryPermissions(localPeer, peerPermissions); @@ -295,35 +296,26 @@ public class PeerDiscoveryController { return; } - // Load the peer from the table, or use the instance that comes in. - final Optional maybeKnownPeer = - peerTable.get(sender).filter(known -> known.discoveryEndpointMatches(sender)); - DiscoveryPeer peer = maybeKnownPeer.orElse(sender); - final boolean peerKnown = maybeKnownPeer.isPresent(); - if (!peerKnown && bondingPeers.containsKey(sender.getId())) { - peer = bondingPeers.get(sender.getId()); - } - - final DiscoveryPeer finalPeer = peer; + final DiscoveryPeer peer = resolvePeer(sender); switch (packet.getType()) { case PING: if (peerPermissions.allowInboundBonding(peer)) { peer.setLastSeen(System.currentTimeMillis()); final PingPacketData ping = packet.getPacketData(PingPacketData.class).get(); if (!PeerDiscoveryStatus.BONDED.equals(peer.getStatus()) - && !bondingPeers.containsKey(peer.getId())) { + && (bondingPeers.getIfPresent(sender.getId()) == null)) { bond(peer); } respondToPing(ping, packet.getHash(), peer); } break; case PONG: - bondingPeers.remove(peer.getId()); matchInteraction(packet) .ifPresent( interaction -> { - addToPeerTable(finalPeer); - recursivePeerRefreshState.onBondingComplete(finalPeer); + bondingPeers.invalidate(peer.getId()); + addToPeerTable(peer); + recursivePeerRefreshState.onBondingComplete(peer); }); break; case NEIGHBORS: @@ -331,16 +323,16 @@ public class PeerDiscoveryController { .ifPresent( interaction -> recursivePeerRefreshState.onNeighboursReceived( - finalPeer, getPeersFromNeighborsPacket(packet))); + peer, getPeersFromNeighborsPacket(packet))); break; case FIND_NEIGHBORS: - if (!peerKnown || !peerPermissions.allowInboundNeighborsRequest(peer)) { - break; + if (PeerDiscoveryStatus.BONDED.equals(peer.getStatus()) + && peerPermissions.allowInboundNeighborsRequest(peer)) { + final FindNeighborsPacketData fn = + packet.getPacketData(FindNeighborsPacketData.class).get(); + respondToFindNeighbors(fn, peer); } - final FindNeighborsPacketData fn = - packet.getPacketData(FindNeighborsPacketData.class).get(); - respondToFindNeighbors(fn, peer); break; } } @@ -586,6 +578,30 @@ public class PeerDiscoveryController { this.retryDelayFunction = retryDelayFunction; } + public void handleBondingRequest(final DiscoveryPeer peer) { + final DiscoveryPeer peerToBond = resolvePeer(peer); + + if (peerPermissions.allowOutboundBonding(peerToBond) + && PeerDiscoveryStatus.KNOWN.equals(peerToBond.getStatus())) { + bond(peerToBond); + } + } + + // Load the peer first from the table, then from bonding cache or use the instance that comes in. + private DiscoveryPeer resolvePeer(final DiscoveryPeer peer) { + final Optional maybeKnownPeer = + peerTable.get(peer).filter(known -> known.discoveryEndpointMatches(peer)); + DiscoveryPeer resolvedPeer = maybeKnownPeer.orElse(peer); + if (maybeKnownPeer.isEmpty()) { + DiscoveryPeer bondingPeer = bondingPeers.getIfPresent(peer.getId()); + if (bondingPeer != null) { + resolvedPeer = bondingPeer; + } + } + + return resolvedPeer; + } + /** Holds the state machine data for a peer interaction. */ private class PeerInteractionState implements Predicate { diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java index 4e26445f97..0e7f916d59 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.when; import org.hyperledger.besu.crypto.NodeKey; import org.hyperledger.besu.crypto.NodeKeyUtils; -import org.hyperledger.besu.ethereum.core.Hash; import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryTestHelper.AgentBuilder; import org.hyperledger.besu.ethereum.p2p.discovery.internal.FindNeighborsPacketData; import org.hyperledger.besu.ethereum.p2p.discovery.internal.MockPeerDiscoveryAgent; @@ -218,7 +217,7 @@ public class PeerDiscoveryAgentTest { final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(Collections.emptyList(), peerPermissions); assertThat(agent.getAdvertisedPeer().isPresent()).isTrue(); - final Peer localNode = agent.getAdvertisedPeer().get(); + final DiscoveryPeer localNode = agent.getAdvertisedPeer().get(); // Setup peer and permissions final MockPeerDiscoveryAgent otherNode = helper.startDiscoveryAgent(); @@ -233,10 +232,10 @@ public class PeerDiscoveryAgentTest { .thenReturn(true); // Bond - bondViaIncomingPing(agent, otherNode); + otherNode.bond(localNode); List remoteIncomingPackets = otherNode.getIncomingPackets(); - assertThat(remoteIncomingPackets).hasSize(3); + assertThat(remoteIncomingPackets).hasSize(2); final IncomingPacket firstMsg = remoteIncomingPackets.get(0); assertThat(firstMsg.packet.getType()).isEqualTo(PacketType.PING); assertThat(firstMsg.fromAgent).isEqualTo(agent); @@ -373,10 +372,6 @@ public class PeerDiscoveryAgentTest { assertThat(remoteAgent.getAdvertisedPeer().isPresent()).isTrue(); final DiscoveryPeer remotePeer = remoteAgent.getAdvertisedPeer().get(); - // Send PONG so that peers will be bonded - final Packet pongPacket = helper.createPongPacket(agent, Hash.EMPTY); - helper.sendMessageBetweenAgents(remoteAgent, agent, pongPacket); - // Remote agent should have bonded with agent assertThat(agent.streamDiscoveredPeers()).hasSize(1); assertThat(agent.streamDiscoveredPeers()).contains(remotePeer); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryBondingTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryBondingTest.java index 095324aa9e..662123e7dc 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryBondingTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryBondingTest.java @@ -41,14 +41,14 @@ public class PeerDiscoveryBondingTest { // Start a test peer and send a PING packet to the agent under test. final MockPeerDiscoveryAgent otherAgent = helper.startDiscoveryAgent(); - final Packet ping = helper.createPingPacket(otherAgent, agent); - helper.sendMessageBetweenAgents(otherAgent, agent, ping); + assertThat(agent.getAdvertisedPeer().isPresent()).isTrue(); + otherAgent.bond(agent.getAdvertisedPeer().get()); final List otherAgentIncomingPongs = otherAgent.getIncomingPackets().stream() .filter(p -> p.packet.getType().equals(PacketType.PONG)) .collect(Collectors.toList()); - assertThat(otherAgentIncomingPongs.size()).isEqualTo(2); + assertThat(otherAgentIncomingPongs.size()).isEqualTo(1); assertThat( otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).isPresent()) @@ -81,16 +81,15 @@ public class PeerDiscoveryBondingTest { final List incoming = otherNode.getIncomingPackets(); assertThat(incoming.size()).isEqualTo(0); - // Create and dispatch a PING packet. - final Packet ping = helper.createPingPacket(otherNode, agent); - helper.sendMessageBetweenAgents(otherNode, agent, ping); + assertThat(agent.getAdvertisedPeer().isPresent()).isTrue(); + otherNode.bond(agent.getAdvertisedPeer().get()); // Now we received a PONG. final List incomingPongs = otherNode.getIncomingPackets().stream() .filter(p -> p.packet.getType().equals(PacketType.PONG)) .collect(Collectors.toList()); - assertThat(incomingPongs.size()).isEqualTo(2); + assertThat(incomingPongs.size()).isEqualTo(1); final Optional maybePongData = incomingPongs.get(0).packet.getPacketData(PongPacketData.class); assertThat(maybePongData).isPresent(); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java index fa74203da7..eaa152fe05 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java @@ -95,7 +95,6 @@ public class PeerDiscoveryTimestampsTest { firstDiscovered.set(p.getFirstDiscovered()); controller.onMessage(pingPacket, peers.get(1)); - controller.onMessage(pongPacket, peers.get(1)); assertThat(controller.streamDiscoveredPeers()).hasSize(1); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java index 30200d3e8c..8cde8b1afc 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java @@ -153,14 +153,14 @@ public class PeerDiscoveryControllerTest { } private void mockPingPacketCreation(final Packet mockPacket) { - mockPacketCreation(Optional.empty(), mockPacket); + mockPingPacketCreation(Optional.empty(), mockPacket); } - private void mockPacketCreation(final DiscoveryPeer peer, final Packet mockPacket) { - mockPacketCreation(Optional.of(peer), mockPacket); + private void mockPingPacketCreation(final DiscoveryPeer peer, final Packet mockPacket) { + mockPingPacketCreation(Optional.of(peer), mockPacket); } - private void mockPacketCreation(final Optional peer, final Packet mockPacket) { + private void mockPingPacketCreation(final Optional peer, final Packet mockPacket) { doAnswer( invocation -> { final Consumer handler = invocation.getArgument(2); @@ -283,7 +283,7 @@ public class PeerDiscoveryControllerTest { final PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(discoPeer, discoPeerPing); + mockPingPacketCreation(discoPeer, discoPeerPing); controller.onMessage(discoPeerPing, discoPeer); @@ -314,7 +314,7 @@ public class PeerDiscoveryControllerTest { discoPeer.getEndpoint(), Instant.now().getEpochSecond() - PacketData.DEFAULT_EXPIRATION_PERIOD_SEC); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(discoPeer, discoPeerPing); + mockPingPacketCreation(discoPeer, discoPeerPing); controller.onMessage(discoPeerPing, discoPeer); @@ -659,7 +659,7 @@ public class PeerDiscoveryControllerTest { List nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1); PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(discoPeer, discoPeerPing); + mockPingPacketCreation(discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)) @@ -676,13 +676,13 @@ public class PeerDiscoveryControllerTest { nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1); pingPacketData = PingPacketData.create(localEndpoint, otherPeer.getEndpoint()); final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(otherPeer, pingPacket); + mockPingPacketCreation(otherPeer, pingPacket); // Setup ping to be sent to otherPeer2 after neighbors packet is received nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1); pingPacketData = PingPacketData.create(localEndpoint, otherPeer2.getEndpoint()); final Packet pingPacket2 = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(otherPeer2, pingPacket2); + mockPingPacketCreation(otherPeer2, pingPacket2); final Packet neighborsPacket = MockPacketDataFactory.mockNeighborsPacket(discoPeer, otherPeer, otherPeer2); @@ -737,7 +737,7 @@ public class PeerDiscoveryControllerTest { List nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1); PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(discoPeer, discoPeerPing); + mockPingPacketCreation(discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING)); @@ -753,13 +753,13 @@ public class PeerDiscoveryControllerTest { nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1); pingPacketData = PingPacketData.create(localEndpoint, otherPeer.getEndpoint()); final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(otherPeer, pingPacket); + mockPingPacketCreation(otherPeer, pingPacket); // Setup ping to be sent to otherPeer2 after neighbors packet is received nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1); pingPacketData = PingPacketData.create(localEndpoint, otherPeer2.getEndpoint()); final Packet pingPacket2 = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(otherPeer2, pingPacket2); + mockPingPacketCreation(otherPeer2, pingPacket2); // Blacklist peer blacklist.add(otherPeer); @@ -792,7 +792,7 @@ public class PeerDiscoveryControllerTest { final PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(discoPeer, discoPeerPing); + mockPingPacketCreation(discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING)); @@ -832,7 +832,7 @@ public class PeerDiscoveryControllerTest { final PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(discoPeer, discoPeerPing); + mockPingPacketCreation(discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING)); @@ -871,7 +871,7 @@ public class PeerDiscoveryControllerTest { final PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(discoPeer, discoPeerPing); + mockPingPacketCreation(discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING)); @@ -914,7 +914,7 @@ public class PeerDiscoveryControllerTest { final PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, nodeKeys.get(0)); - mockPacketCreation(discoPeer, discoPeerPing); + mockPingPacketCreation(discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING)); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java index 86830d2fb1..2355defcb0 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java @@ -88,9 +88,6 @@ public class PeerDiscoveryTableRefreshTest { final ArgumentCaptor captor = ArgumentCaptor.forClass(Packet.class); for (int i = 0; i < 5; i++) { - controller.getRecursivePeerRefreshState().cancel(); - timer.runPeriodicHandlers(); - controller.streamDiscoveredPeers().forEach(p -> p.setStatus(PeerDiscoveryStatus.KNOWN)); controller.onMessage(pingPacket, peers.get(1)); final PingPacketData refreshData = @@ -102,6 +99,10 @@ public class PeerDiscoveryTableRefreshTest { final Packet refreshPongPacket = Packet.create(PacketType.PONG, refreshPong, nodeKeys.get(1)); controller.onMessage(refreshPongPacket, peers.get(1)); + + controller.getRecursivePeerRefreshState().cancel(); + timer.runPeriodicHandlers(); + controller.streamDiscoveredPeers().forEach(p -> p.setStatus(PeerDiscoveryStatus.KNOWN)); } verify(outboundMessageHandler, atLeast(5)).send(eq(peers.get(1)), captor.capture());