|
|
|
@ -19,8 +19,6 @@ import tech.pegasys.pantheon.crypto.SECP256K1; |
|
|
|
|
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.chain.Blockchain; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.ConnectingToLocalNodeException; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.PeerNotPermittedException; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.api.DisconnectCallback; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.api.Message; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork; |
|
|
|
@ -71,6 +69,7 @@ import java.util.concurrent.Executors; |
|
|
|
|
import java.util.concurrent.ScheduledExecutorService; |
|
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
import java.util.concurrent.TimeoutException; |
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
|
|
|
import java.util.function.Consumer; |
|
|
|
|
import java.util.function.Supplier; |
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
@ -182,12 +181,14 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
|
|
|
|
|
private final String advertisedHost; |
|
|
|
|
|
|
|
|
|
private volatile EnodeURL ourEnodeURL; |
|
|
|
|
private volatile Optional<EnodeURL> localEnode = Optional.empty(); |
|
|
|
|
|
|
|
|
|
private final Optional<NodePermissioningController> nodePermissioningController; |
|
|
|
|
private final Optional<Blockchain> blockchain; |
|
|
|
|
private OptionalLong blockAddedObserverId = OptionalLong.empty(); |
|
|
|
|
|
|
|
|
|
private final AtomicBoolean started = new AtomicBoolean(false); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Creates a peer networking service for production purposes. |
|
|
|
|
* |
|
|
|
@ -346,7 +347,7 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (!isPeerConnectionAllowed(connection)) { |
|
|
|
|
if (!isPeerAllowed(connection)) { |
|
|
|
|
connection.disconnect(DisconnectReason.UNKNOWN); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -362,21 +363,13 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public boolean addMaintainConnectionPeer(final Peer peer) { |
|
|
|
|
if (!isPeerAllowed(peer)) { |
|
|
|
|
throw new PeerNotPermittedException(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (peer.getId().equals(ourPeerInfo.getNodeId())) { |
|
|
|
|
throw new ConnectingToLocalNodeException(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
final boolean added = peerMaintainConnectionList.add(peer); |
|
|
|
|
if (added) { |
|
|
|
|
if (isPeerAllowed(peer) && !isConnectingOrConnected(peer)) { |
|
|
|
|
// Connect immediately if appropriate
|
|
|
|
|
connect(peer); |
|
|
|
|
return true; |
|
|
|
|
} else { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return added; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@ -394,12 +387,11 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
return removed; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void checkMaintainedConnectionPeers() { |
|
|
|
|
for (final Peer peer : peerMaintainConnectionList) { |
|
|
|
|
if (!(isConnecting(peer) || isConnected(peer))) { |
|
|
|
|
connect(peer); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
void checkMaintainedConnectionPeers() { |
|
|
|
|
peerMaintainConnectionList.stream() |
|
|
|
|
.filter(p -> !isConnectingOrConnected(p)) |
|
|
|
|
.filter(this::isPeerAllowed) |
|
|
|
|
.forEach(this::connect); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@VisibleForTesting |
|
|
|
@ -529,6 +521,10 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void start() { |
|
|
|
|
if (!started.compareAndSet(false, true)) { |
|
|
|
|
LOG.warn("Attempted to start an already started " + getClass().getSimpleName()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
peerDiscoveryAgent.start(ourPeerInfo.getPort()).join(); |
|
|
|
|
peerBondedObserverId = |
|
|
|
|
OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent())); |
|
|
|
@ -549,11 +545,10 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
this.ourEnodeURL = buildSelfEnodeURL(); |
|
|
|
|
LOG.info("Enode URL {}", ourEnodeURL.toString()); |
|
|
|
|
createLocalEnode(); |
|
|
|
|
|
|
|
|
|
peerConnectionScheduler.scheduleWithFixedDelay( |
|
|
|
|
this::checkMaintainedConnectionPeers, 60, 60, TimeUnit.SECONDS); |
|
|
|
|
this::checkMaintainedConnectionPeers, 2, 60, TimeUnit.SECONDS); |
|
|
|
|
peerConnectionScheduler.scheduleWithFixedDelay( |
|
|
|
|
this::attemptPeerConnections, 30, 30, TimeUnit.SECONDS); |
|
|
|
|
} |
|
|
|
@ -584,7 +579,7 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
.getPeerConnections() |
|
|
|
|
.forEach( |
|
|
|
|
peerConnection -> { |
|
|
|
|
if (!isPeerConnectionAllowed(peerConnection)) { |
|
|
|
|
if (!isPeerAllowed(peerConnection)) { |
|
|
|
|
peerConnection.disconnect(DisconnectReason.REQUESTED); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
@ -595,38 +590,36 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
.getPeerConnections() |
|
|
|
|
.forEach( |
|
|
|
|
peerConnection -> { |
|
|
|
|
if (!isPeerConnectionAllowed(peerConnection)) { |
|
|
|
|
if (!isPeerAllowed(peerConnection)) { |
|
|
|
|
peerConnection.disconnect(DisconnectReason.REQUESTED); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean isPeerConnectionAllowed(final PeerConnection peerConnection) { |
|
|
|
|
if (peerBlacklist.contains(peerConnection)) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
LOG.trace( |
|
|
|
|
"Checking if connection with peer {} is permitted", |
|
|
|
|
peerConnection.getPeerInfo().getNodeId()); |
|
|
|
|
|
|
|
|
|
return nodePermissioningController |
|
|
|
|
.map( |
|
|
|
|
c -> { |
|
|
|
|
final EnodeURL localPeerEnodeURL = getLocalEnode().orElse(buildSelfEnodeURL()); |
|
|
|
|
final EnodeURL remotePeerEnodeURL = peerConnection.getRemoteEnode(); |
|
|
|
|
return c.isPermitted(localPeerEnodeURL, remotePeerEnodeURL); |
|
|
|
|
}) |
|
|
|
|
.orElse(true); |
|
|
|
|
private boolean isPeerAllowed(final PeerConnection conn) { |
|
|
|
|
return isPeerAllowed(conn.getRemoteEnode()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean isPeerAllowed(final Peer peer) { |
|
|
|
|
if (peerBlacklist.contains(peer)) { |
|
|
|
|
return isPeerAllowed(peer.getEnodeURL()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean isPeerAllowed(final EnodeURL enode) { |
|
|
|
|
if (peerBlacklist.contains(enode.getNodeId())) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
if (enode.getNodeId().equals(ourPeerInfo.getNodeId())) { |
|
|
|
|
// Peer matches our node id
|
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Optional<EnodeURL> maybeEnode = getLocalEnode(); |
|
|
|
|
if (!maybeEnode.isPresent()) { |
|
|
|
|
// If local enode isn't yet available we can't evaluate permissions
|
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
return nodePermissioningController |
|
|
|
|
.map(c -> c.isPermitted(ourEnodeURL, peer.getEnodeURL())) |
|
|
|
|
.map(c -> c.isPermitted(maybeEnode.get(), enode)) |
|
|
|
|
.orElse(true); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -640,6 +633,10 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
return connections.isAlreadyConnected(peer.getId()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean isConnectingOrConnected(final Peer peer) { |
|
|
|
|
return isConnected(peer) || isConnecting(peer); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void stop() { |
|
|
|
|
sendClientQuittingToPeers(); |
|
|
|
@ -691,10 +688,14 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public Optional<EnodeURL> getLocalEnode() { |
|
|
|
|
return Optional.ofNullable(ourEnodeURL); |
|
|
|
|
return localEnode; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private EnodeURL buildSelfEnodeURL() { |
|
|
|
|
private void createLocalEnode() { |
|
|
|
|
if (localEnode.isPresent()) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
final BytesValue nodeId = ourPeerInfo.getNodeId(); |
|
|
|
|
final int listeningPort = ourPeerInfo.getPort(); |
|
|
|
|
final OptionalInt discoveryPort = |
|
|
|
@ -704,12 +705,16 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
.filter(port -> port.getAsInt() != listeningPort) |
|
|
|
|
.orElse(OptionalInt.empty()); |
|
|
|
|
|
|
|
|
|
return EnodeURL.builder() |
|
|
|
|
.nodeId(nodeId) |
|
|
|
|
.ipAddress(advertisedHost) |
|
|
|
|
.listeningPort(listeningPort) |
|
|
|
|
.discoveryPort(discoveryPort) |
|
|
|
|
.build(); |
|
|
|
|
final EnodeURL localEnode = |
|
|
|
|
EnodeURL.builder() |
|
|
|
|
.nodeId(nodeId) |
|
|
|
|
.ipAddress(advertisedHost) |
|
|
|
|
.listeningPort(listeningPort) |
|
|
|
|
.discoveryPort(discoveryPort) |
|
|
|
|
.build(); |
|
|
|
|
|
|
|
|
|
LOG.info("Enode URL {}", localEnode.toString()); |
|
|
|
|
this.localEnode = Optional.of(localEnode); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void onConnectionEstablished(final PeerConnection connection) { |
|
|
|
@ -719,14 +724,14 @@ public class DefaultP2PNetwork implements P2PNetwork { |
|
|
|
|
|
|
|
|
|
public static class Builder { |
|
|
|
|
|
|
|
|
|
protected PeerDiscoveryAgent peerDiscoveryAgent; |
|
|
|
|
protected KeyPair keyPair; |
|
|
|
|
protected NetworkingConfiguration config = NetworkingConfiguration.create(); |
|
|
|
|
protected List<Capability> supportedCapabilities; |
|
|
|
|
protected PeerBlacklist peerBlacklist; |
|
|
|
|
protected MetricsSystem metricsSystem; |
|
|
|
|
protected Optional<NodePermissioningController> nodePermissioningController = Optional.empty(); |
|
|
|
|
protected Blockchain blockchain = null; |
|
|
|
|
private PeerDiscoveryAgent peerDiscoveryAgent; |
|
|
|
|
private KeyPair keyPair; |
|
|
|
|
private NetworkingConfiguration config = NetworkingConfiguration.create(); |
|
|
|
|
private List<Capability> supportedCapabilities; |
|
|
|
|
private PeerBlacklist peerBlacklist; |
|
|
|
|
private MetricsSystem metricsSystem; |
|
|
|
|
private Optional<NodePermissioningController> nodePermissioningController = Optional.empty(); |
|
|
|
|
private Blockchain blockchain = null; |
|
|
|
|
private Vertx vertx; |
|
|
|
|
private Optional<NodeLocalConfigPermissioningController> |
|
|
|
|
nodeLocalConfigPermissioningController = Optional.empty(); |
|
|
|
|