Create peer discovery packets on a worker thread. (#955)

The signing process is potentially slow causing the Vertx queue to back up.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 7926bbc4b8
commit 9d06ec2fcf
  1. 4
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgent.java
  2. 32
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java
  3. 61
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java
  4. 2
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java
  5. 26
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/BlockingAsyncExecutor.java
  6. 6
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java
  7. 91
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java
  8. 1
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java

@ -26,6 +26,7 @@ import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerBonde
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerDroppedEvent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.Packet;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerRequirement;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerTable;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PingPacketData;
@ -115,6 +116,8 @@ public abstract class PeerDiscoveryAgent implements DisconnectCallback {
protected abstract TimerUtil createTimer();
protected abstract AsyncExecutor createWorkerExecutor();
protected abstract CompletableFuture<InetSocketAddress> listenForConnections();
protected abstract CompletableFuture<Void> sendOutgoingPacket(
@ -162,6 +165,7 @@ public abstract class PeerDiscoveryAgent implements DisconnectCallback {
bootstrapPeers,
this::handleOutgoingPacket,
createTimer(),
createWorkerExecutor(),
PEER_REFRESH_INTERVAL_MS,
peerRequirement,
peerBlacklist,

@ -18,6 +18,7 @@ import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.Packet;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerRequirement;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.TimerUtil;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.VertxTimerUtil;
@ -34,6 +35,7 @@ import java.net.SocketException;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import io.vertx.core.AsyncResult;
import io.vertx.core.Vertx;
@ -67,6 +69,11 @@ public class VertxPeerDiscoveryAgent extends PeerDiscoveryAgent {
return new VertxTimerUtil(vertx);
}
@Override
protected AsyncExecutor createWorkerExecutor() {
return new VertxAsyncExecutor();
}
@Override
protected CompletableFuture<InetSocketAddress> listenForConnections() {
CompletableFuture<InetSocketAddress> future = new CompletableFuture<>();
@ -194,4 +201,29 @@ public class VertxPeerDiscoveryAgent extends PeerDiscoveryAgent {
LOG.error("Encountered error while handling packet", t);
}
}
private class VertxAsyncExecutor implements AsyncExecutor {
@Override
public <T> CompletableFuture<T> execute(final Supplier<T> action) {
final CompletableFuture<T> result = new CompletableFuture<>();
vertx.<T>executeBlocking(
future -> {
try {
future.complete(action.get());
} catch (final Throwable t) {
future.fail(t);
}
},
false,
event -> {
if (event.succeeded()) {
result.complete(event.result());
} else {
result.completeExceptionally(event.cause());
}
});
return result;
}
}
}

@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@ -119,6 +120,7 @@ public class PeerDiscoveryController {
private RetryDelayFunction retryDelayFunction = RetryDelayFunction.linear(1.5, 2000, 60000);
private final AsyncExecutor workerExecutor;
private final long tableRefreshIntervalMs;
private final PeerRequirement peerRequirement;
@ -140,6 +142,7 @@ public class PeerDiscoveryController {
final Collection<DiscoveryPeer> bootstrapNodes,
final OutboundMessageHandler outboundMessageHandler,
final TimerUtil timerUtil,
final AsyncExecutor workerExecutor,
final long tableRefreshIntervalMs,
final PeerRequirement peerRequirement,
final PeerBlacklist peerBlacklist,
@ -151,6 +154,7 @@ public class PeerDiscoveryController {
this.localPeer = localPeer;
this.bootstrapNodes = bootstrapNodes;
this.peerTable = peerTable;
this.workerExecutor = workerExecutor;
this.tableRefreshIntervalMs = tableRefreshIntervalMs;
this.peerRequirement = peerRequirement;
this.peerBlacklist = peerBlacklist;
@ -391,19 +395,23 @@ public class PeerDiscoveryController {
interaction -> {
final PingPacketData data =
PingPacketData.create(localPeer.getEndpoint(), peer.getEndpoint());
final Packet pingPacket = createPacket(PacketType.PING, data);
final BytesValue pingHash = pingPacket.getHash();
// Update the matching filter to only accept the PONG if it echoes the hash of our PING.
final Predicate<Packet> newFilter =
packet ->
packet
.getPacketData(PongPacketData.class)
.map(pong -> pong.getPingHash().equals(pingHash))
.orElse(false);
interaction.updateFilter(newFilter);
sendPacket(peer, pingPacket);
createPacket(
PacketType.PING,
data,
pingPacket -> {
final BytesValue pingHash = pingPacket.getHash();
// Update the matching filter to only accept the PONG if it echoes the hash of our
// PING.
final Predicate<Packet> newFilter =
packet ->
packet
.getPacketData(PongPacketData.class)
.map(pong -> pong.getPingHash().equals(pingHash))
.orElse(false);
interaction.updateFilter(newFilter);
sendPacket(peer, pingPacket);
});
};
// The filter condition will be updated as soon as the action is performed.
@ -413,9 +421,13 @@ public class PeerDiscoveryController {
}
private void sendPacket(final DiscoveryPeer peer, final PacketType type, final PacketData data) {
Packet packet = createPacket(type, data);
logSendingPacket(peer, packet);
outboundMessageHandler.send(peer, packet);
createPacket(
type,
data,
packet -> {
logSendingPacket(peer, packet);
outboundMessageHandler.send(peer, packet);
});
}
private void sendPacket(final DiscoveryPeer peer, final Packet packet) {
@ -424,8 +436,17 @@ public class PeerDiscoveryController {
}
@VisibleForTesting
Packet createPacket(final PacketType type, final PacketData data) {
return Packet.create(type, data, keypair);
void createPacket(final PacketType type, final PacketData data, final Consumer<Packet> handler) {
// Creating packets is quite expensive because they have to be cryptographically signed
// So ensure the work is done on a worker thread to avoid blocking the vertx event thread.
workerExecutor
.execute(() -> Packet.create(type, data, keypair))
.thenAccept(handler)
.exceptionally(
error -> {
LOG.error("Error while creating packet", error);
return null;
});
}
/**
@ -563,4 +584,8 @@ public class PeerDiscoveryController {
timerId.ifPresent(timerUtil::cancelTimer);
}
}
public interface AsyncExecutor {
<T> CompletableFuture<T> execute(Supplier<T> action);
}
}

@ -17,6 +17,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.BlockingAsyncExecutor;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.MockPeerDiscoveryAgent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.MockTimerUtil;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.OutboundMessageHandler;
@ -58,6 +59,7 @@ public class PeerDiscoveryTimestampsTest {
Collections.emptyList(),
OutboundMessageHandler.NOOP,
new MockTimerUtil(),
new BlockingAsyncExecutor(),
TimeUnit.HOURS.toMillis(1),
() -> true,
new PeerBlacklist(),

@ -0,0 +1,26 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
public class BlockingAsyncExecutor implements AsyncExecutor {
@Override
public <T> CompletableFuture<T> execute(final Supplier<T> action) {
return CompletableFuture.completedFuture(action.get());
}
}

@ -16,6 +16,7 @@ import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration;
import tech.pegasys.pantheon.ethereum.p2p.discovery.DiscoveryPeer;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryAgent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist;
import tech.pegasys.pantheon.ethereum.permissioning.NodeWhitelistController;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -93,6 +94,11 @@ public class MockPeerDiscoveryAgent extends PeerDiscoveryAgent {
return new MockTimerUtil();
}
@Override
protected AsyncExecutor createWorkerExecutor() {
return new BlockingAsyncExecutor();
}
@Override
public CompletableFuture<?> stop() {
return CompletableFuture.completedFuture(null);

@ -19,6 +19,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -123,7 +124,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData mockPing =
PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint());
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0));
doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any());
mockPingPacketCreation(mockPacket);
controller.start();
@ -152,6 +153,27 @@ public class PeerDiscoveryControllerTest {
.forEach(p -> assertThat(p.getStatus()).isEqualTo(PeerDiscoveryStatus.BONDING));
}
private void mockPingPacketCreation(final Packet mockPacket) {
mockPacketCreation(PacketType.PING, Optional.empty(), mockPacket);
}
private void mockPacketCreation(
final PacketType type, final DiscoveryPeer peer, final Packet mockPacket) {
mockPacketCreation(type, Optional.of(peer), mockPacket);
}
private void mockPacketCreation(
final PacketType type, final Optional<DiscoveryPeer> peer, final Packet mockPacket) {
doAnswer(
invocation -> {
final Consumer<Packet> handler = invocation.getArgument(2);
handler.accept(mockPacket);
return null;
})
.when(controller)
.createPacket(eq(type), peer.isPresent() ? matchPingDataForPeer(peer.get()) : any(), any());
}
@Test
public void bootstrapPeersRetriesStoppedUponResponse() {
// Create peers.
@ -172,7 +194,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData mockPing =
PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint());
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0));
doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any());
mockPingPacketCreation(mockPacket);
controller.start();
@ -226,7 +248,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData mockPing =
PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint());
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0));
doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any());
mockPingPacketCreation(mockPacket);
controller.start();
@ -261,7 +283,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData mockPing =
PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint());
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0));
doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any());
mockPingPacketCreation(mockPacket);
controller.start();
@ -317,7 +339,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData mockPing =
PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint());
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0));
doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any());
mockPingPacketCreation(mockPacket);
controller.start();
@ -364,7 +386,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData mockPing =
PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint());
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0));
doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any());
mockPingPacketCreation(mockPacket);
controller.setRetryDelayFunction((prev) -> 999999999L);
controller.start();
@ -429,7 +451,7 @@ public class PeerDiscoveryControllerTest {
PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint());
final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(pingPacket).when(controller).createPacket(eq(PacketType.PING), any());
mockPingPacketCreation(pingPacket);
controller.setRetryDelayFunction((prev) -> 999999999L);
controller.start();
@ -589,9 +611,7 @@ public class PeerDiscoveryControllerTest {
List<SECP256K1.KeyPair> keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1);
PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(discoPeerPing)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer));
mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1))
@ -608,17 +628,13 @@ public class PeerDiscoveryControllerTest {
keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1);
pingPacketData = PingPacketData.create(localEndpoint, otherPeer.getEndpoint());
final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(pingPacket)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer));
mockPacketCreation(PacketType.PING, otherPeer, pingPacket);
// Setup ping to be sent to otherPeer2 after neighbors packet is received
keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1);
pingPacketData = PingPacketData.create(localEndpoint, otherPeer2.getEndpoint());
final Packet pingPacket2 = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(pingPacket2)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer2));
mockPacketCreation(PacketType.PING, otherPeer2, pingPacket2);
final Packet neighborsPacket =
MockPacketDataFactory.mockNeighborsPacket(discoPeer, otherPeer, otherPeer2);
@ -673,9 +689,7 @@ public class PeerDiscoveryControllerTest {
List<SECP256K1.KeyPair> keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1);
PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(discoPeerPing)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer));
mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING));
@ -691,17 +705,13 @@ public class PeerDiscoveryControllerTest {
keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1);
pingPacketData = PingPacketData.create(localEndpoint, otherPeer.getEndpoint());
final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(pingPacket)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer));
mockPacketCreation(PacketType.PING, otherPeer, pingPacket);
// Setup ping to be sent to otherPeer2 after neighbors packet is received
keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1);
pingPacketData = PingPacketData.create(localEndpoint, otherPeer2.getEndpoint());
final Packet pingPacket2 = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(pingPacket2)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer2));
mockPacketCreation(PacketType.PING, otherPeer2, pingPacket2);
// Blacklist peer
blacklist.add(otherPeer);
@ -737,9 +747,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData pingPacketData =
PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(discoPeerPing)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer));
mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING));
@ -782,9 +790,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData pingPacketData =
PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(discoPeerPing)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer));
mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING));
@ -825,9 +831,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData pingPacketData =
PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(discoPeerPing)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer));
mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING));
@ -863,7 +867,7 @@ public class PeerDiscoveryControllerTest {
.peers(peers.get(0))
.outboundMessageHandler(outboundMessageHandler)
.build();
doReturn(pingPacket).when(controller).createPacket(eq(PacketType.PING), any());
mockPingPacketCreation(pingPacket);
controller.setRetryDelayFunction((prev) -> 999999999L);
controller.start();
@ -895,7 +899,7 @@ public class PeerDiscoveryControllerTest {
final PingPacketData pingPacketData =
PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint());
final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(pingPacket).when(controller).createPacket(eq(PacketType.PING), any());
mockPingPacketCreation(pingPacket);
controller.start();
@ -961,7 +965,7 @@ public class PeerDiscoveryControllerTest {
.peers(peers.get(0))
.outboundMessageHandler(outboundMessageHandler)
.build();
doReturn(pingPacket).when(controller).createPacket(eq(PacketType.PING), any());
mockPingPacketCreation(pingPacket);
controller.start();
verify(outboundMessageHandler, times(1))
@ -1009,9 +1013,7 @@ public class PeerDiscoveryControllerTest {
List<SECP256K1.KeyPair> keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1);
PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint());
final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(discoPeerPing)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer));
mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing);
controller.start();
verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING));
@ -1027,17 +1029,13 @@ public class PeerDiscoveryControllerTest {
keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1);
pingPacketData = PingPacketData.create(localEndpoint, otherPeer.getEndpoint());
final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(pingPacket)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer));
mockPacketCreation(PacketType.PING, otherPeer, pingPacket);
// Setup ping to be sent to otherPeer2 after neighbors packet is received
keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1);
pingPacketData = PingPacketData.create(localEndpoint, otherPeer2.getEndpoint());
final Packet pingPacket2 = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0));
doReturn(pingPacket2)
.when(controller)
.createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer2));
mockPacketCreation(PacketType.PING, otherPeer2, pingPacket2);
final Packet neighborsPacket =
MockPacketDataFactory.mockNeighborsPacket(discoPeer, otherPeer, otherPeer2);
@ -1308,6 +1306,7 @@ public class PeerDiscoveryControllerTest {
discoPeers,
outboundMessageHandler,
timerUtil,
new BlockingAsyncExecutor(),
TABLE_REFRESH_INTERVAL_MS,
PEER_REQUIREMENT,
blacklist,

@ -60,6 +60,7 @@ public class PeerDiscoveryTableRefreshTest {
emptyList(),
outboundMessageHandler,
timer,
new BlockingAsyncExecutor(),
0,
() -> true,
new PeerBlacklist(),

Loading…
Cancel
Save