[Pan 3238] Fix rlpx startup (#114)

Signed-off-by: Meredith Baxter <meredith.baxter@consensys.net>
pull/122/head
mbaxter 5 years ago committed by GitHub
parent e9b2dd24c9
commit 3c82daf149
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java
  2. 5
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/ConnectionInitializer.java
  3. 29
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyConnectionInitializer.java
  4. 7
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/MockConnectionInitializer.java

@ -125,7 +125,19 @@ public class RlpxAgent {
} }
setupListeners(); setupListeners();
return connectionInitializer.start(); return connectionInitializer
.start()
.thenApply(
(socketAddress) -> {
LOG.info("P2P RLPx agent started and listening on {}.", socketAddress);
return socketAddress.getPort();
})
.whenComplete(
(res, err) -> {
if (err != null) {
LOG.error("Failed to start P2P RLPx agent.", err);
}
});
} }
public CompletableFuture<Void> stop() { public CompletableFuture<Void> stop() {

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.p2p.rlpx.connections;
import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback; import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
public interface ConnectionInitializer { public interface ConnectionInitializer {
@ -25,9 +26,9 @@ public interface ConnectionInitializer {
* Start the connection initializer. Begins listening for incoming connections. Start allowing * Start the connection initializer. Begins listening for incoming connections. Start allowing
* outbound connections. * outbound connections.
* *
* @return The port on which we're listening for incoming connections. * @return The address on which we're listening for incoming connections.
*/ */
CompletableFuture<Integer> start(); CompletableFuture<InetSocketAddress> start();
/** /**
* Shutdown the connection initializer. Stop listening for incoming connections and stop * Shutdown the connection initializer. Stop listening for incoming connections and stop

@ -14,8 +14,6 @@
*/ */
package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty; package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty;
import static com.google.common.base.Preconditions.checkState;
import org.hyperledger.besu.crypto.SECP256K1.KeyPair; import org.hyperledger.besu.crypto.SECP256K1.KeyPair;
import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration; import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer; import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
@ -48,12 +46,9 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.concurrent.SingleThreadEventExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class NettyConnectionInitializer implements ConnectionInitializer { public class NettyConnectionInitializer implements ConnectionInitializer {
private static final Logger LOG = LogManager.getLogger();
private static final int TIMEOUT_SECONDS = 10; private static final int TIMEOUT_SECONDS = 10;
private final KeyPair keyPair; private final KeyPair keyPair;
@ -95,8 +90,8 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
} }
@Override @Override
public CompletableFuture<Integer> start() { public CompletableFuture<InetSocketAddress> start() {
final CompletableFuture<Integer> listeningPortFuture = new CompletableFuture<>(); final CompletableFuture<InetSocketAddress> listeningPortFuture = new CompletableFuture<>();
if (!started.compareAndSet(false, true)) { if (!started.compareAndSet(false, true)) {
listeningPortFuture.completeExceptionally( listeningPortFuture.completeExceptionally(
new IllegalStateException( new IllegalStateException(
@ -114,19 +109,17 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
future -> { future -> {
final InetSocketAddress socketAddress = final InetSocketAddress socketAddress =
(InetSocketAddress) server.channel().localAddress(); (InetSocketAddress) server.channel().localAddress();
final String message = if (!future.isSuccess() || socketAddress == null) {
String.format( final String message =
"Unable start up P2P network on %s:%s. Check for port conflicts.", String.format(
config.getBindHost(), config.getBindPort()); "Unable start listening on %s:%s. Check for port conflicts.",
config.getBindHost(), config.getBindPort());
if (!future.isSuccess()) { listeningPortFuture.completeExceptionally(
LOG.error(message, future.cause()); new IllegalStateException(message, future.cause()));
return;
} }
checkState(socketAddress != null, message);
LOG.info("P2P network started and listening on {}", socketAddress); listeningPortFuture.complete(socketAddress);
final int listeningPort = socketAddress.getPort();
listeningPortFuture.complete(listeningPort);
}); });
return listeningPortFuture; return listeningPortFuture;

@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback; import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;
import org.hyperledger.besu.util.Subscribers; import org.hyperledger.besu.util.Subscribers;
import java.net.InetSocketAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -52,8 +53,10 @@ public class MockConnectionInitializer implements ConnectionInitializer {
} }
@Override @Override
public CompletableFuture<Integer> start() { public CompletableFuture<InetSocketAddress> start() {
return CompletableFuture.completedFuture(NEXT_PORT.incrementAndGet()); InetSocketAddress socketAddress =
new InetSocketAddress("127.0.0.1", NEXT_PORT.incrementAndGet());
return CompletableFuture.completedFuture(socketAddress);
} }
@Override @Override

Loading…
Cancel
Save