mirror of https://github.com/hyperledger/besu
[NC-2056] Remove vertx from discovery tests (#539)
parent
cf7a739fc2
commit
78bb7949f3
@ -0,0 +1,196 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p.discovery; |
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument; |
||||
|
||||
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.Packet; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerRequirement; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.TimerUtil; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.VertxTimerUtil; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.Endpoint; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist; |
||||
import tech.pegasys.pantheon.ethereum.p2p.permissioning.NodeWhitelistController; |
||||
import tech.pegasys.pantheon.util.NetworkUtility; |
||||
import tech.pegasys.pantheon.util.Preconditions; |
||||
|
||||
import java.io.IOException; |
||||
import java.net.BindException; |
||||
import java.net.InetSocketAddress; |
||||
import java.net.SocketException; |
||||
import java.util.OptionalInt; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import io.vertx.core.AsyncResult; |
||||
import io.vertx.core.Vertx; |
||||
import io.vertx.core.datagram.DatagramPacket; |
||||
import io.vertx.core.datagram.DatagramSocket; |
||||
import io.vertx.core.datagram.DatagramSocketOptions; |
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
public class VertxPeerDiscoveryAgent extends PeerDiscoveryAgent { |
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
|
||||
private final Vertx vertx; |
||||
/* The vert.x UDP socket. */ |
||||
private DatagramSocket socket; |
||||
|
||||
public VertxPeerDiscoveryAgent( |
||||
final Vertx vertx, |
||||
final KeyPair keyPair, |
||||
final DiscoveryConfiguration config, |
||||
final PeerRequirement peerRequirement, |
||||
final PeerBlacklist peerBlacklist, |
||||
final NodeWhitelistController nodeWhitelistController) { |
||||
super(keyPair, config, peerRequirement, peerBlacklist, nodeWhitelistController); |
||||
checkArgument(vertx != null, "vertx instance cannot be null"); |
||||
this.vertx = vertx; |
||||
} |
||||
|
||||
@Override |
||||
protected TimerUtil createTimer() { |
||||
return new VertxTimerUtil(vertx); |
||||
} |
||||
|
||||
@Override |
||||
protected CompletableFuture<InetSocketAddress> listenForConnections() { |
||||
CompletableFuture<InetSocketAddress> future = new CompletableFuture<>(); |
||||
vertx |
||||
.createDatagramSocket(new DatagramSocketOptions().setIpV6(NetworkUtility.isIPv6Available())) |
||||
.listen( |
||||
config.getBindPort(), config.getBindHost(), res -> handleListenerSetup(res, future)); |
||||
return future; |
||||
} |
||||
|
||||
protected void handleListenerSetup( |
||||
final AsyncResult<DatagramSocket> listenResult, |
||||
final CompletableFuture<InetSocketAddress> addressFuture) { |
||||
if (listenResult.failed()) { |
||||
Throwable cause = listenResult.cause(); |
||||
LOG.error("An exception occurred when starting the peer discovery agent", cause); |
||||
|
||||
if (cause instanceof BindException || cause instanceof SocketException) { |
||||
cause = |
||||
new PeerDiscoveryServiceException( |
||||
String.format( |
||||
"Failed to bind Ethereum UDP discovery listener to %s:%d: %s", |
||||
config.getBindHost(), config.getBindPort(), cause.getMessage())); |
||||
} |
||||
addressFuture.completeExceptionally(cause); |
||||
return; |
||||
} |
||||
|
||||
this.socket = listenResult.result(); |
||||
|
||||
// TODO: when using wildcard hosts (0.0.0.0), we need to handle multiple addresses by
|
||||
// selecting
|
||||
// the correct 'announce' address.
|
||||
final String effectiveHost = socket.localAddress().host(); |
||||
final int effectivePort = socket.localAddress().port(); |
||||
|
||||
LOG.info( |
||||
"Started peer discovery agent successfully, on effective host={} and port={}", |
||||
effectiveHost, |
||||
effectivePort); |
||||
|
||||
socket.exceptionHandler(this::handleException); |
||||
socket.handler(this::handlePacket); |
||||
|
||||
InetSocketAddress address = |
||||
new InetSocketAddress(socket.localAddress().host(), socket.localAddress().port()); |
||||
addressFuture.complete(address); |
||||
} |
||||
|
||||
@Override |
||||
protected CompletableFuture<Void> sendOutgoingPacket( |
||||
final DiscoveryPeer peer, final Packet packet) { |
||||
CompletableFuture<Void> result = new CompletableFuture<>(); |
||||
socket.send( |
||||
packet.encode(), |
||||
peer.getEndpoint().getUdpPort(), |
||||
peer.getEndpoint().getHost(), |
||||
ar -> { |
||||
if (ar.failed()) { |
||||
result.completeExceptionally(ar.cause()); |
||||
} else { |
||||
result.complete(null); |
||||
} |
||||
}); |
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
public CompletableFuture<?> stop() { |
||||
if (socket == null) { |
||||
return CompletableFuture.completedFuture(null); |
||||
} |
||||
|
||||
final CompletableFuture<?> completion = new CompletableFuture<>(); |
||||
socket.close( |
||||
ar -> { |
||||
if (ar.succeeded()) { |
||||
controller.ifPresent(PeerDiscoveryController::stop); |
||||
socket = null; |
||||
completion.complete(null); |
||||
} else { |
||||
completion.completeExceptionally(ar.cause()); |
||||
} |
||||
}); |
||||
return completion; |
||||
} |
||||
|
||||
/** |
||||
* For uncontrolled exceptions occurring in the packet handlers. |
||||
* |
||||
* @param exception the exception that was raised |
||||
*/ |
||||
private void handleException(final Throwable exception) { |
||||
if (exception instanceof IOException) { |
||||
LOG.debug("Packet handler exception", exception); |
||||
} else { |
||||
LOG.error("Packet handler exception", exception); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* The UDP packet handler. This is the entrypoint for all received datagrams. |
||||
* |
||||
* @param datagram the received datagram. |
||||
*/ |
||||
private void handlePacket(final DatagramPacket datagram) { |
||||
try { |
||||
final int length = datagram.data().length(); |
||||
Preconditions.checkGuard( |
||||
validatePacketSize(length), |
||||
PeerDiscoveryPacketDecodingException::new, |
||||
"Packet too large. Actual size (bytes): %s", |
||||
length); |
||||
|
||||
// We allow exceptions to bubble up, as they'll be picked up by the exception handler.
|
||||
final Packet packet = Packet.decode(datagram.data()); |
||||
// Acquire the senders coordinates to build a Peer representation from them.
|
||||
final String host = datagram.sender().host(); |
||||
final int port = datagram.sender().port(); |
||||
final Endpoint endpoint = new Endpoint(host, port, OptionalInt.empty()); |
||||
handleIncomingPacket(endpoint, packet); |
||||
} catch (final PeerDiscoveryPacketDecodingException e) { |
||||
LOG.debug("Discarding invalid peer discovery packet", e); |
||||
} catch (final Throwable t) { |
||||
LOG.error("Encountered error while handling packet", t); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,22 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.DiscoveryPeer; |
||||
|
||||
@FunctionalInterface |
||||
public interface OutboundMessageHandler { |
||||
public static OutboundMessageHandler NOOP = (peer, packet) -> {}; |
||||
|
||||
void send(final DiscoveryPeer toPeer, final Packet packet); |
||||
} |
@ -0,0 +1,26 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal; |
||||
|
||||
public interface TimerUtil { |
||||
long setPeriodic(long delay, TimerHandler handler); |
||||
|
||||
long setTimer(long delay, TimerHandler handler); |
||||
|
||||
void cancelTimer(long timerId); |
||||
|
||||
@FunctionalInterface |
||||
interface TimerHandler { |
||||
void handle(); |
||||
} |
||||
} |
@ -0,0 +1,39 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal; |
||||
|
||||
import io.vertx.core.Vertx; |
||||
|
||||
public class VertxTimerUtil implements TimerUtil { |
||||
|
||||
private final Vertx vertx; |
||||
|
||||
public VertxTimerUtil(final Vertx vertx) { |
||||
this.vertx = vertx; |
||||
} |
||||
|
||||
@Override |
||||
public long setPeriodic(final long delay, final TimerHandler handler) { |
||||
return vertx.setPeriodic(delay, (l) -> handler.handle()); |
||||
} |
||||
|
||||
@Override |
||||
public long setTimer(final long delay, final TimerHandler handler) { |
||||
return vertx.setTimer(delay, (l) -> handler.handle()); |
||||
} |
||||
|
||||
@Override |
||||
public void cancelTimer(final long timerId) { |
||||
vertx.cancelTimer(timerId); |
||||
} |
||||
} |
@ -1,293 +0,0 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p.discovery; |
||||
|
||||
import static io.vertx.core.Vertx.vertx; |
||||
|
||||
import tech.pegasys.pantheon.crypto.SECP256K1; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.Packet; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PacketType; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PingPacketData; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.Endpoint; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist; |
||||
import tech.pegasys.pantheon.ethereum.p2p.permissioning.NodeWhitelistController; |
||||
import tech.pegasys.pantheon.ethereum.permissioning.PermissioningConfiguration; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
|
||||
import java.util.List; |
||||
import java.util.concurrent.ArrayBlockingQueue; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.concurrent.CopyOnWriteArrayList; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.stream.Collectors; |
||||
import java.util.stream.Stream; |
||||
|
||||
import io.vertx.core.Vertx; |
||||
import io.vertx.core.datagram.DatagramSocket; |
||||
import junit.framework.AssertionFailedError; |
||||
import org.junit.After; |
||||
|
||||
/** |
||||
* A test class you can extend to acquire the ability to easily start discovery agents with a |
||||
* generated Peer and keypair, as well as test sockets to communicate with those discovery agents. |
||||
* |
||||
* <p>Call {@link #startDiscoveryAgent(List)} and variants to start one or more discovery agents, or |
||||
* {@link #startTestSocket()} or variants to start one or more test sockets. The lifecycle of those |
||||
* objects is managed automatically for you via @Before and @After hooks, so you don't need to worry |
||||
* about starting and stopping. |
||||
*/ |
||||
public abstract class AbstractPeerDiscoveryTest { |
||||
private static final String LOOPBACK_IP_ADDR = "127.0.0.1"; |
||||
private static final int TEST_SOCKET_START_TIMEOUT_SECS = 5; |
||||
private static final int BROADCAST_TCP_PORT = 12356; |
||||
private final Vertx vertx = vertx(); |
||||
|
||||
List<DiscoveryTestSocket> discoveryTestSockets = new CopyOnWriteArrayList<>(); |
||||
List<PeerDiscoveryAgent> agents = new CopyOnWriteArrayList<>(); |
||||
|
||||
@After |
||||
public void stopServices() { |
||||
// Close all sockets, will bubble up exceptions.
|
||||
final CompletableFuture<?>[] completions = |
||||
discoveryTestSockets |
||||
.stream() |
||||
.filter(p -> p.getSocket() != null) |
||||
.map( |
||||
p -> { |
||||
final CompletableFuture<?> completion = new CompletableFuture<>(); |
||||
p.getSocket() |
||||
.close( |
||||
ar -> { |
||||
if (ar.succeeded()) { |
||||
completion.complete(null); |
||||
} else { |
||||
completion.completeExceptionally(ar.cause()); |
||||
} |
||||
}); |
||||
return completion; |
||||
}) |
||||
.toArray(CompletableFuture<?>[]::new); |
||||
try { |
||||
CompletableFuture.allOf(completions).join(); |
||||
} finally { |
||||
agents.forEach(PeerDiscoveryAgent::stop); |
||||
vertx.close(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Starts multiple discovery agents with the provided boostrap peers. |
||||
* |
||||
* @param count the number of agents to start |
||||
* @param bootstrapPeers the list of bootstrap peers |
||||
* @return a list of discovery agents. |
||||
*/ |
||||
protected List<PeerDiscoveryAgent> startDiscoveryAgents( |
||||
final int count, final List<DiscoveryPeer> bootstrapPeers) { |
||||
return Stream.generate(() -> startDiscoveryAgent(bootstrapPeers)) |
||||
.limit(count) |
||||
.collect(Collectors.toList()); |
||||
} |
||||
|
||||
/** |
||||
* Start a single discovery agent with the provided bootstrap peers. |
||||
* |
||||
* @param bootstrapPeers the list of bootstrap peers |
||||
* @return a list of discovery agents. |
||||
*/ |
||||
protected PeerDiscoveryAgent startDiscoveryAgent(final List<DiscoveryPeer> bootstrapPeers) { |
||||
return startDiscoveryAgent(bootstrapPeers, new PeerBlacklist()); |
||||
} |
||||
|
||||
/** |
||||
* Start a single discovery agent with the provided bootstrap peers. |
||||
* |
||||
* @param bootstrapPeers the list of bootstrap peers |
||||
* @param blacklist the peer blacklist |
||||
* @return a list of discovery agents. |
||||
*/ |
||||
protected PeerDiscoveryAgent startDiscoveryAgent( |
||||
final List<DiscoveryPeer> bootstrapPeers, final PeerBlacklist blacklist) { |
||||
final DiscoveryConfiguration config = new DiscoveryConfiguration(); |
||||
config.setBootstrapPeers(bootstrapPeers); |
||||
config.setBindPort(0); |
||||
|
||||
return startDiscoveryAgent(config, blacklist); |
||||
} |
||||
|
||||
protected PeerDiscoveryAgent startDiscoveryAgent( |
||||
final DiscoveryConfiguration config, final PeerBlacklist blacklist) { |
||||
final PeerDiscoveryAgent agent = |
||||
new PeerDiscoveryAgent( |
||||
vertx, |
||||
SECP256K1.KeyPair.generate(), |
||||
config, |
||||
() -> true, |
||||
blacklist, |
||||
new NodeWhitelistController(PermissioningConfiguration.createDefault())); |
||||
try { |
||||
agent.start(BROADCAST_TCP_PORT).get(5, TimeUnit.SECONDS); |
||||
} catch (final Exception ex) { |
||||
throw new AssertionError("Could not initialize discovery agent", ex); |
||||
} |
||||
agents.add(agent); |
||||
return agent; |
||||
} |
||||
|
||||
/** |
||||
* Start multiple test sockets. |
||||
* |
||||
* <p>A test socket allows you to send messages to a discovery agent, as well as to react to |
||||
* received messages. A test socket encapsulates: (1) a {@link DiscoveryPeer} and its {@link |
||||
* tech.pegasys.pantheon.crypto.SECP256K1.KeyPair}, (2) an {@link ArrayBlockingQueue} where |
||||
* received messages are placed automatically, and (3) the socket itself. |
||||
* |
||||
* @param count the number of test sockets to start. |
||||
* @return the test sockets. |
||||
*/ |
||||
protected List<DiscoveryTestSocket> startTestSockets(final int count) { |
||||
return Stream.generate(this::startTestSocket).limit(count).collect(Collectors.toList()); |
||||
} |
||||
|
||||
/** |
||||
* Starts a single test socket. |
||||
* |
||||
* @return the test socket |
||||
*/ |
||||
protected DiscoveryTestSocket startTestSocket() { |
||||
final ArrayBlockingQueue<Packet> queue = new ArrayBlockingQueue<>(100); |
||||
|
||||
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate(); |
||||
final BytesValue peerId = keyPair.getPublicKey().getEncodedBytes(); |
||||
|
||||
final CompletableFuture<DiscoveryTestSocket> result = new CompletableFuture<>(); |
||||
// Test packet handler which feeds the received packet into a Future we later consume from.
|
||||
vertx |
||||
.createDatagramSocket() |
||||
.listen( |
||||
0, |
||||
LOOPBACK_IP_ADDR, |
||||
ar -> { |
||||
if (!ar.succeeded()) { |
||||
result.completeExceptionally(ar.cause()); |
||||
return; |
||||
} |
||||
|
||||
final DatagramSocket socket = ar.result(); |
||||
socket.handler(p -> queue.add(Packet.decode(p.data()))); |
||||
final DiscoveryPeer peer = |
||||
new DiscoveryPeer( |
||||
peerId, |
||||
LOOPBACK_IP_ADDR, |
||||
socket.localAddress().port(), |
||||
socket.localAddress().port()); |
||||
final DiscoveryTestSocket discoveryTestSocket = |
||||
new DiscoveryTestSocket(peer, keyPair, queue, socket); |
||||
result.complete(discoveryTestSocket); |
||||
}); |
||||
|
||||
final DiscoveryTestSocket discoveryTestSocket; |
||||
try { |
||||
discoveryTestSocket = result.get(TEST_SOCKET_START_TIMEOUT_SECS, TimeUnit.SECONDS); |
||||
} catch (final Exception ex) { |
||||
throw new AssertionError("Could not initialize test peer", ex); |
||||
} |
||||
discoveryTestSockets.add(discoveryTestSocket); |
||||
return discoveryTestSocket; |
||||
} |
||||
|
||||
protected void bondViaIncomingPing( |
||||
final PeerDiscoveryAgent agent, final DiscoveryTestSocket peerSocket) |
||||
throws InterruptedException { |
||||
final DiscoveryPeer peer = peerSocket.getPeer(); |
||||
|
||||
final PingPacketData ping = |
||||
PingPacketData.create(peer.getEndpoint(), agent.getAdvertisedPeer().getEndpoint()); |
||||
final Packet pingPacket = Packet.create(PacketType.PING, ping, peerSocket.getKeyPair()); |
||||
peerSocket.sendToAgent(agent, pingPacket); |
||||
|
||||
// Wait for returned pong packet to finish bonding
|
||||
peerSocket.getIncomingPackets().poll(10, TimeUnit.SECONDS); |
||||
} |
||||
|
||||
/** |
||||
* Encapsulates a test socket representing a Peer, with an associated queue where all incoming |
||||
* packets are placed. |
||||
*/ |
||||
protected static class DiscoveryTestSocket { |
||||
private final DiscoveryPeer peer; |
||||
private final SECP256K1.KeyPair keyPair; |
||||
private final ArrayBlockingQueue<Packet> queue; |
||||
private final DatagramSocket socket; |
||||
|
||||
public DiscoveryTestSocket( |
||||
final DiscoveryPeer peer, |
||||
final SECP256K1.KeyPair keyPair, |
||||
final ArrayBlockingQueue<Packet> queue, |
||||
final DatagramSocket socket) { |
||||
this.peer = peer; |
||||
this.keyPair = keyPair; |
||||
this.queue = queue; |
||||
this.socket = socket; |
||||
} |
||||
|
||||
public DiscoveryPeer getPeer() { |
||||
return peer; |
||||
} |
||||
|
||||
public ArrayBlockingQueue<Packet> getIncomingPackets() { |
||||
return queue; |
||||
} |
||||
|
||||
public DatagramSocket getSocket() { |
||||
return socket; |
||||
} |
||||
|
||||
public SECP256K1.KeyPair getKeyPair() { |
||||
return keyPair; |
||||
} |
||||
|
||||
/** |
||||
* Sends a message to an agent. |
||||
* |
||||
* @param agent the recipient |
||||
* @param packet the packet to send |
||||
*/ |
||||
public void sendToAgent(final PeerDiscoveryAgent agent, final Packet packet) { |
||||
final Endpoint endpoint = agent.getAdvertisedPeer().getEndpoint(); |
||||
socket.send(packet.encode(), endpoint.getUdpPort(), endpoint.getHost(), ar -> {}); |
||||
} |
||||
|
||||
/** |
||||
* Retrieves the head of the queue, compulsorily. If no message exists, or no message appears in |
||||
* 5 seconds, it throws an assertion error. |
||||
* |
||||
* @return the head of the queue |
||||
*/ |
||||
public Packet compulsoryPoll() { |
||||
final Packet packet; |
||||
try { |
||||
packet = queue.poll(5, TimeUnit.SECONDS); |
||||
} catch (final Exception e) { |
||||
throw new RuntimeException(e); |
||||
} |
||||
|
||||
if (packet == null) { |
||||
throw new AssertionFailedError( |
||||
"Expected a message in the test peer queue, but found none."); |
||||
} |
||||
return packet; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,113 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal; |
||||
|
||||
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.DiscoveryPeer; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryAgent; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist; |
||||
import tech.pegasys.pantheon.ethereum.p2p.permissioning.NodeWhitelistController; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
|
||||
import java.net.InetSocketAddress; |
||||
import java.util.ArrayDeque; |
||||
import java.util.Arrays; |
||||
import java.util.Deque; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
public class MockPeerDiscoveryAgent extends PeerDiscoveryAgent { |
||||
// The set of known agents operating on the network
|
||||
private final Map<BytesValue, MockPeerDiscoveryAgent> agentNetwork; |
||||
private final Deque<IncomingPacket> incomingPackets = new ArrayDeque<>(); |
||||
|
||||
public MockPeerDiscoveryAgent( |
||||
final KeyPair keyPair, |
||||
final DiscoveryConfiguration config, |
||||
final PeerRequirement peerRequirement, |
||||
final PeerBlacklist peerBlacklist, |
||||
final NodeWhitelistController nodeWhitelistController, |
||||
final Map<BytesValue, MockPeerDiscoveryAgent> agentNetwork) { |
||||
super(keyPair, config, peerRequirement, peerBlacklist, nodeWhitelistController); |
||||
this.agentNetwork = agentNetwork; |
||||
} |
||||
|
||||
public void processIncomingPacket(final MockPeerDiscoveryAgent fromAgent, final Packet packet) { |
||||
// Cycle packet through encode / decode to make clone of any data
|
||||
// This ensures that any data passed between agents is not shared
|
||||
final Packet packetClone = Packet.decode(packet.encode()); |
||||
incomingPackets.add(new IncomingPacket(fromAgent, packetClone)); |
||||
handleIncomingPacket(fromAgent.getAdvertisedPeer().getEndpoint(), packetClone); |
||||
} |
||||
|
||||
/** |
||||
* Get and clear the list of any incoming packets to this agent. |
||||
* |
||||
* @return A list of packets received by this agent |
||||
*/ |
||||
public List<IncomingPacket> getIncomingPackets() { |
||||
List<IncomingPacket> packets = Arrays.asList(incomingPackets.toArray(new IncomingPacket[0])); |
||||
incomingPackets.clear(); |
||||
return packets; |
||||
} |
||||
|
||||
@Override |
||||
protected CompletableFuture<InetSocketAddress> listenForConnections() { |
||||
// Skip network setup for tests
|
||||
InetSocketAddress address = |
||||
new InetSocketAddress(config.getAdvertisedHost(), config.getBindPort()); |
||||
return CompletableFuture.completedFuture(address); |
||||
} |
||||
|
||||
@Override |
||||
protected CompletableFuture<Void> sendOutgoingPacket( |
||||
final DiscoveryPeer toPeer, final Packet packet) { |
||||
CompletableFuture<Void> result = new CompletableFuture<>(); |
||||
MockPeerDiscoveryAgent toAgent = agentNetwork.get(toPeer.getId()); |
||||
if (toAgent == null) { |
||||
result.completeExceptionally( |
||||
new Exception( |
||||
"Attempt to send to unknown peer. Agents must be constructed through PeerDiscoveryTestHelper.")); |
||||
} else { |
||||
toAgent.processIncomingPacket(this, packet); |
||||
result.complete(null); |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
protected TimerUtil createTimer() { |
||||
return new MockTimerUtil(); |
||||
} |
||||
|
||||
@Override |
||||
public CompletableFuture<?> stop() { |
||||
return CompletableFuture.completedFuture(null); |
||||
} |
||||
|
||||
public KeyPair getKeyPair() { |
||||
return keyPair; |
||||
} |
||||
|
||||
public static class IncomingPacket { |
||||
public final MockPeerDiscoveryAgent fromAgent; |
||||
public final Packet packet; |
||||
|
||||
public IncomingPacket(final MockPeerDiscoveryAgent fromAgent, final Packet packet) { |
||||
this.fromAgent = fromAgent; |
||||
this.packet = packet; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,67 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.concurrent.atomic.AtomicLong; |
||||
|
||||
public class MockTimerUtil implements TimerUtil { |
||||
private final AtomicLong nextId = new AtomicLong(0); |
||||
private final Map<Long, TimerHandler> timerHandlers = new HashMap<>(); |
||||
private final Map<Long, TimerHandler> periodicHandlers = new HashMap<>(); |
||||
|
||||
@Override |
||||
public long setPeriodic(final long delay, final TimerHandler handler) { |
||||
long id = nextId.incrementAndGet(); |
||||
periodicHandlers.put(id, handler); |
||||
return id; |
||||
} |
||||
|
||||
@Override |
||||
public long setTimer(final long delay, final TimerHandler handler) { |
||||
long id = nextId.incrementAndGet(); |
||||
timerHandlers.put(id, handler); |
||||
return id; |
||||
} |
||||
|
||||
@Override |
||||
public void cancelTimer(final long timerId) { |
||||
timerHandlers.remove(timerId); |
||||
periodicHandlers.remove(timerId); |
||||
} |
||||
|
||||
public void runHandlers() { |
||||
runTimerHandlers(); |
||||
runPeriodicHandlers(); |
||||
} |
||||
|
||||
public void runTimerHandlers() { |
||||
// Create a copy of the handlers to avoid concurrent modification as handlers run
|
||||
List<TimerHandler> handlers = new ArrayList<>(); |
||||
timerHandlers.forEach((id, handler) -> handlers.add(handler)); |
||||
timerHandlers.clear(); |
||||
|
||||
handlers.forEach(TimerHandler::handle); |
||||
} |
||||
|
||||
public void runPeriodicHandlers() { |
||||
// Create a copy of the handlers to avoid concurrent modification as handlers run
|
||||
List<TimerHandler> handlers = new ArrayList<>(); |
||||
periodicHandlers.forEach((id, handler) -> handlers.add(handler)); |
||||
|
||||
handlers.forEach(TimerHandler::handle); |
||||
} |
||||
} |
File diff suppressed because it is too large
Load Diff
Loading…
Reference in new issue