fix enr request order (#4179)

* fix enr request
* add tests
* removed errant debug
* adds pr feedback and test to ensure ENR responses never go out to incomplete ping/pong handshakes
Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
Co-authored-by: Justin Florentine <justin+github@florentine.us>
pull/4210/head
matkt 2 years ago committed by GitHub
parent 42cfff0acb
commit b1bff9ce1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java
  2. 107
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java

@ -111,6 +111,7 @@ public class PeerDiscoveryController {
private final PeerTable peerTable; private final PeerTable peerTable;
private final Cache<Bytes, DiscoveryPeer> bondingPeers = private final Cache<Bytes, DiscoveryPeer> bondingPeers =
CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, TimeUnit.MINUTES).build(); CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, TimeUnit.MINUTES).build();
private final Cache<Bytes, Packet> cachedEnrRequests;
private final Collection<DiscoveryPeer> bootstrapNodes; private final Collection<DiscoveryPeer> bootstrapNodes;
@ -159,7 +160,8 @@ public class PeerDiscoveryController {
final PeerRequirement peerRequirement, final PeerRequirement peerRequirement,
final PeerPermissions peerPermissions, final PeerPermissions peerPermissions,
final Subscribers<PeerBondedObserver> peerBondedObservers, final Subscribers<PeerBondedObserver> peerBondedObservers,
final MetricsSystem metricsSystem) { final MetricsSystem metricsSystem,
final Optional<Cache<Bytes, Packet>> maybeCacheForEnrRequests) {
this.timerUtil = timerUtil; this.timerUtil = timerUtil;
this.nodeKey = nodeKey; this.nodeKey = nodeKey;
this.localPeer = localPeer; this.localPeer = localPeer;
@ -194,6 +196,9 @@ public class PeerDiscoveryController {
"discovery_interaction_retry_count", "discovery_interaction_retry_count",
"Total number of interaction retries performed", "Total number of interaction retries performed",
"type"); "type");
this.cachedEnrRequests =
maybeCacheForEnrRequests.orElse(
CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, SECONDS).build());
} }
public static Builder builder() { public static Builder builder() {
@ -317,6 +322,8 @@ public class PeerDiscoveryController {
bondingPeers.invalidate(peer.getId()); bondingPeers.invalidate(peer.getId());
addBondedPeerToPeerTable(peer); addBondedPeerToPeerTable(peer);
recursivePeerRefreshState.onBondingComplete(peer); recursivePeerRefreshState.onBondingComplete(peer);
Optional.ofNullable(cachedEnrRequests.getIfPresent(peer.getId()))
.ifPresent(cachedEnrRequest -> processEnrRequest(peer, cachedEnrRequest));
}); });
break; break;
case NEIGHBORS: case NEIGHBORS:
@ -337,12 +344,15 @@ public class PeerDiscoveryController {
break; break;
case ENR_REQUEST: case ENR_REQUEST:
if (PeerDiscoveryStatus.BONDED.equals(peer.getStatus())) { if (PeerDiscoveryStatus.BONDED.equals(peer.getStatus())) {
LOG.trace("ENR_REQUEST received from bonded peer Id: {}", peer.getId()); processEnrRequest(peer, packet);
packet } else if (PeerDiscoveryStatus.BONDING.equals(peer.getStatus())) {
.getPacketData(ENRRequestPacketData.class) LOG.trace("ENR_REQUEST cached for bonding peer Id: {}", peer.getId());
.ifPresent(p -> respondToENRRequest(p, packet.getHash(), peer)); // Due to UDP, it may happen that we receive the ENR_REQUEST just before the PONG.
// Because peers want to send the ENR_REQUEST directly after the pong.
// If this happens we don't want to ignore the request but process when bonded.
// this cache allows to keep the request and to respond after having processed the PONG
cachedEnrRequests.put(peer.getId(), packet);
} }
break; break;
case ENR_RESPONSE: case ENR_RESPONSE:
// Currently there is no use case where an ENRResponse will be sent otherwise // Currently there is no use case where an ENRResponse will be sent otherwise
@ -356,6 +366,13 @@ public class PeerDiscoveryController {
} }
} }
private void processEnrRequest(final DiscoveryPeer peer, final Packet packet) {
LOG.trace("ENR_REQUEST received from bonded peer Id: {}", peer.getId());
packet
.getPacketData(ENRRequestPacketData.class)
.ifPresent(p -> respondToENRRequest(p, packet.getHash(), peer));
}
private List<DiscoveryPeer> getPeersFromNeighborsPacket(final Packet packet) { private List<DiscoveryPeer> getPeersFromNeighborsPacket(final Packet packet) {
final Optional<NeighborsPacketData> maybeNeighborsData = final Optional<NeighborsPacketData> maybeNeighborsData =
packet.getPacketData(NeighborsPacketData.class); packet.getPacketData(NeighborsPacketData.class);
@ -748,6 +765,9 @@ public class PeerDiscoveryController {
private AsyncExecutor workerExecutor; private AsyncExecutor workerExecutor;
private MetricsSystem metricsSystem; private MetricsSystem metricsSystem;
private Cache<Bytes, Packet> cachedEnrRequests =
CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, SECONDS).build();
private Builder() {} private Builder() {}
public PeerDiscoveryController build() { public PeerDiscoveryController build() {
@ -770,7 +790,8 @@ public class PeerDiscoveryController {
peerRequirement, peerRequirement,
peerPermissions, peerPermissions,
peerBondedObservers, peerBondedObservers,
metricsSystem); metricsSystem,
Optional.of(cachedEnrRequests));
} }
private void validate() { private void validate() {
@ -862,5 +883,11 @@ public class PeerDiscoveryController {
this.metricsSystem = metricsSystem; this.metricsSystem = metricsSystem;
return this; return this;
} }
public Builder cacheForEnrRequests(final Cache<Bytes, Packet> cacheToUse) {
checkNotNull(cacheToUse);
this.cachedEnrRequests = cacheToUse;
return this;
}
} }
} }

@ -57,6 +57,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.bytes.MutableBytes; import org.apache.tuweni.bytes.MutableBytes;
@ -1371,6 +1374,101 @@ public class PeerDiscoveryControllerTest {
.send(eq(peers.get(0)), matchPacketOfType(PacketType.ENR_REQUEST)); .send(eq(peers.get(0)), matchPacketOfType(PacketType.ENR_REQUEST));
} }
@Test
public void shouldRespondToRecentENRRequestAfterBonding() {
final List<NodeKey> nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
final List<DiscoveryPeer> peers = helper.createDiscoveryPeers(nodeKeys);
final OutboundMessageHandler outboundMessageHandler = mock(OutboundMessageHandler.class);
controller =
getControllerBuilder()
.peers(peers.get(0))
.outboundMessageHandler(outboundMessageHandler)
.build();
// Mock the creation of the PING packet, so that we can control the hash, which gets validated
// when receiving the PONG.
final PingPacketData mockPing =
PingPacketData.create(
Optional.ofNullable(localPeer.getEndpoint()), peers.get(0).getEndpoint(), UInt64.ONE);
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, nodeKeys.get(0));
mockPingPacketCreation(mockPacket);
controller.start();
final PongPacketData pongRequestPacketData =
PongPacketData.create(localPeer.getEndpoint(), mockPacket.getHash(), UInt64.ONE);
final ENRRequestPacketData enrRequestPacketData = ENRRequestPacketData.create();
final Packet enrPacket =
Packet.create(PacketType.ENR_REQUEST, enrRequestPacketData, nodeKeys.get(0));
final Packet pongPacket =
Packet.create(PacketType.PONG, pongRequestPacketData, nodeKeys.get(0));
controller.onMessage(enrPacket, peers.get(0));
verify(outboundMessageHandler, never()).send(any(), matchPacketOfType(PacketType.ENR_RESPONSE));
controller.onMessage(pongPacket, peers.get(0));
verify(outboundMessageHandler, times(1))
.send(any(), matchPacketOfType(PacketType.ENR_RESPONSE));
}
@Test
public void shouldNotRespondENRPriorToPong() {
final List<NodeKey> nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1);
final List<DiscoveryPeer> peers = helper.createDiscoveryPeers(nodeKeys);
final OutboundMessageHandler outboundMessageHandler = mock(OutboundMessageHandler.class);
final Cache<Bytes, Packet> enrs =
CacheBuilder.newBuilder()
.maximumSize(50)
.expireAfterWrite(1, TimeUnit.NANOSECONDS)
.ticker(
new Ticker() {
int tickCount = 1;
@Override
public long read() {
return tickCount += 10;
}
})
.build();
controller =
getControllerBuilder()
.peers(peers.get(0))
.outboundMessageHandler(outboundMessageHandler)
.enrCache(enrs)
.build();
// Mock the creation of the PING packet, so that we can control the hash, which gets validated
// when receiving the PONG.
final PingPacketData mockPing =
PingPacketData.create(
Optional.ofNullable(localPeer.getEndpoint()), peers.get(0).getEndpoint(), UInt64.ONE);
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, nodeKeys.get(0));
mockPingPacketCreation(mockPacket);
controller.start();
final PongPacketData pongRequestPacketData =
PongPacketData.create(localPeer.getEndpoint(), mockPacket.getHash(), UInt64.ONE);
final ENRRequestPacketData enrRequestPacketData = ENRRequestPacketData.create();
final Packet enrPacket =
Packet.create(PacketType.ENR_REQUEST, enrRequestPacketData, nodeKeys.get(0));
final Packet pongPacket =
Packet.create(PacketType.PONG, pongRequestPacketData, nodeKeys.get(0));
controller.onMessage(enrPacket, peers.get(0));
enrs.cleanUp();
controller.onMessage(pongPacket, peers.get(0));
verify(outboundMessageHandler, never())
.send(
argThat((DiscoveryPeer peer) -> peer.equals(peers.get(0))),
matchPacketOfType(PacketType.ENR_RESPONSE));
}
private static Packet mockPingPacket(final DiscoveryPeer from, final DiscoveryPeer to) { private static Packet mockPingPacket(final DiscoveryPeer from, final DiscoveryPeer to) {
final Packet packet = mock(Packet.class); final Packet packet = mock(Packet.class);
@ -1441,10 +1539,18 @@ public class PeerDiscoveryControllerTest {
private final Subscribers<PeerBondedObserver> peerBondedObservers = Subscribers.create(); private final Subscribers<PeerBondedObserver> peerBondedObservers = Subscribers.create();
private PeerPermissions peerPermissions = PeerPermissions.noop(); private PeerPermissions peerPermissions = PeerPermissions.noop();
private Cache<Bytes, Packet> enrs =
CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, TimeUnit.SECONDS).build();
public static ControllerBuilder create() { public static ControllerBuilder create() {
return new ControllerBuilder(); return new ControllerBuilder();
} }
ControllerBuilder enrCache(final Cache<Bytes, Packet> cacheToUse) {
this.enrs = cacheToUse;
return this;
}
ControllerBuilder peers(final Collection<DiscoveryPeer> discoPeers) { ControllerBuilder peers(final Collection<DiscoveryPeer> discoPeers) {
this.discoPeers = discoPeers; this.discoPeers = discoPeers;
return this; return this;
@ -1507,6 +1613,7 @@ public class PeerDiscoveryControllerTest {
.peerPermissions(peerPermissions) .peerPermissions(peerPermissions)
.peerBondedObservers(peerBondedObservers) .peerBondedObservers(peerBondedObservers)
.metricsSystem(new NoOpMetricsSystem()) .metricsSystem(new NoOpMetricsSystem())
.cacheForEnrRequests(enrs)
.build()); .build());
} }
} }

Loading…
Cancel
Save