|
|
|
@ -22,7 +22,7 @@ import tech.pegasys.pantheon.ethereum.p2p.discovery.DiscoveryPeer; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.peers.EnodeURL; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.peers.LocalNode; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerProperties; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerPrivileges; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.permissions.PeerPermissions; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.rlpx.connections.ConnectionInitializer; |
|
|
|
|
import tech.pegasys.pantheon.ethereum.p2p.rlpx.connections.PeerConnection; |
|
|
|
@ -59,20 +59,18 @@ public class RlpxAgent { |
|
|
|
|
private static final Logger LOG = LogManager.getLogger(); |
|
|
|
|
|
|
|
|
|
private final LocalNode localNode; |
|
|
|
|
private final PeerRlpxPermissions peerPermissions; |
|
|
|
|
private final PeerConnectionEvents connectionEvents; |
|
|
|
|
private final Subscribers<ConnectCallback> connectSubscribers = Subscribers.create(); |
|
|
|
|
private final ConnectionInitializer connectionInitializer; |
|
|
|
|
private final boolean limitRemoteWireConnectionsEnabled; |
|
|
|
|
private final double fractionRemoteConnectionsAllowed; |
|
|
|
|
private final Subscribers<ConnectCallback> connectSubscribers = Subscribers.create(); |
|
|
|
|
|
|
|
|
|
private final int maxPeers; |
|
|
|
|
private final PeerRlpxPermissions peerPermissions; |
|
|
|
|
private final PeerPrivileges peerPrivileges; |
|
|
|
|
private final int maxConnections; |
|
|
|
|
private final int maxRemotelyInitiatedConnections; |
|
|
|
|
|
|
|
|
|
@VisibleForTesting |
|
|
|
|
final Map<BytesValue, RlpxConnection> connectionsById = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
|
|
private final PeerProperties peerProperties; |
|
|
|
|
|
|
|
|
|
private final AtomicBoolean started = new AtomicBoolean(false); |
|
|
|
|
private final AtomicBoolean stopped = new AtomicBoolean(false); |
|
|
|
|
|
|
|
|
@ -80,22 +78,21 @@ public class RlpxAgent { |
|
|
|
|
|
|
|
|
|
private RlpxAgent( |
|
|
|
|
final LocalNode localNode, |
|
|
|
|
final PeerRlpxPermissions peerPermissions, |
|
|
|
|
final PeerConnectionEvents connectionEvents, |
|
|
|
|
final ConnectionInitializer connectionInitializer, |
|
|
|
|
final int maxPeers, |
|
|
|
|
final PeerProperties peerProperties, |
|
|
|
|
final MetricsSystem metricsSystem, |
|
|
|
|
final boolean limitRemoteWireConnectionsEnabled, |
|
|
|
|
final double fractionRemoteConnectionsAllowed) { |
|
|
|
|
this.maxPeers = maxPeers; |
|
|
|
|
this.peerProperties = peerProperties; |
|
|
|
|
final PeerRlpxPermissions peerPermissions, |
|
|
|
|
final PeerPrivileges peerPrivileges, |
|
|
|
|
final int maxConnections, |
|
|
|
|
final int maxRemotelyInitiatedConnections, |
|
|
|
|
final MetricsSystem metricsSystem) { |
|
|
|
|
this.localNode = localNode; |
|
|
|
|
this.peerPermissions = peerPermissions; |
|
|
|
|
this.connectionEvents = connectionEvents; |
|
|
|
|
this.connectionInitializer = connectionInitializer; |
|
|
|
|
this.limitRemoteWireConnectionsEnabled = limitRemoteWireConnectionsEnabled; |
|
|
|
|
this.fractionRemoteConnectionsAllowed = fractionRemoteConnectionsAllowed; |
|
|
|
|
this.peerPermissions = peerPermissions; |
|
|
|
|
this.peerPrivileges = peerPrivileges; |
|
|
|
|
this.maxConnections = maxConnections; |
|
|
|
|
this.maxRemotelyInitiatedConnections = |
|
|
|
|
Math.min(maxConnections, maxRemotelyInitiatedConnections); |
|
|
|
|
|
|
|
|
|
// Setup metrics
|
|
|
|
|
connectedPeersCounter = |
|
|
|
@ -111,7 +108,7 @@ public class RlpxAgent { |
|
|
|
|
PantheonMetricCategory.ETHEREUM, |
|
|
|
|
"peer_limit", |
|
|
|
|
"The maximum number of peers this node allows to connect", |
|
|
|
|
() -> maxPeers); |
|
|
|
|
() -> maxConnections); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public static Builder builder() { |
|
|
|
@ -153,7 +150,7 @@ public class RlpxAgent { |
|
|
|
|
if (!localNode.isReady()) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
final int availablePeerSlots = Math.max(0, maxPeers - getConnectionCount()); |
|
|
|
|
final int availablePeerSlots = Math.max(0, maxConnections - getConnectionCount()); |
|
|
|
|
peerStream |
|
|
|
|
.filter(peer -> !connectionsById.containsKey(peer.getId())) |
|
|
|
|
.filter(peer -> peer.getEnodeURL().isListening()) |
|
|
|
@ -207,10 +204,10 @@ public class RlpxAgent { |
|
|
|
|
return peerConnection.get(); |
|
|
|
|
} |
|
|
|
|
// Check max peers
|
|
|
|
|
if (!peerProperties.ignoreMaxPeerLimits(peer) && getConnectionCount() >= maxPeers) { |
|
|
|
|
if (!peerPrivileges.canExceedConnectionLimits(peer) && getConnectionCount() >= maxConnections) { |
|
|
|
|
final String errorMsg = |
|
|
|
|
"Max peer peer connections established (" |
|
|
|
|
+ maxPeers |
|
|
|
|
+ maxConnections |
|
|
|
|
+ "). Cannot connect to peer: " |
|
|
|
|
+ peer; |
|
|
|
|
return FutureUtils.completedExceptionally(new IllegalStateException(errorMsg)); |
|
|
|
@ -325,7 +322,12 @@ public class RlpxAgent { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Disconnect if too many peers
|
|
|
|
|
if (!peerProperties.ignoreMaxPeerLimits(peer) && getConnectionCount() >= maxPeers) { |
|
|
|
|
if (!peerPrivileges.canExceedConnectionLimits(peer) && getConnectionCount() >= maxConnections) { |
|
|
|
|
peerConnection.disconnect(DisconnectReason.TOO_MANY_PEERS); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Disconnect if too many remotely-initiated connections
|
|
|
|
|
if (!peerPrivileges.canExceedConnectionLimits(peer) && remoteConnectionLimitReached()) { |
|
|
|
|
peerConnection.disconnect(DisconnectReason.TOO_MANY_PEERS); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -335,16 +337,6 @@ public class RlpxAgent { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Disconnect if the fraction of wire connections initiated by peers is too high to protect
|
|
|
|
|
// against eclipse attacks
|
|
|
|
|
if (limitRemoteWireConnectionsEnabled && remoteConnectionExceedsLimit()) { |
|
|
|
|
LOG.warn( |
|
|
|
|
"Fraction of remotely initiated connection is too high, rejecting incoming connection. (max ratio allowed: {})", |
|
|
|
|
fractionRemoteConnectionsAllowed); |
|
|
|
|
peerConnection.disconnect(DisconnectReason.TOO_MANY_PEERS); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Track this new connection, deduplicating existing connection if necessary
|
|
|
|
|
final AtomicBoolean newConnectionAccepted = new AtomicBoolean(false); |
|
|
|
|
final RlpxConnection inboundConnection = RlpxConnection.inboundConnection(peerConnection); |
|
|
|
@ -387,33 +379,63 @@ public class RlpxAgent { |
|
|
|
|
if (newConnectionAccepted.get()) { |
|
|
|
|
dispatchConnect(peerConnection); |
|
|
|
|
} |
|
|
|
|
// Check remote connections again to control for race conditions
|
|
|
|
|
enforceRemoteConnectionLimits(); |
|
|
|
|
enforceConnectionLimits(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean remoteConnectionExceedsLimit() { |
|
|
|
|
final int remotelyInitiatedConnectionsCount = |
|
|
|
|
Math.toIntExact( |
|
|
|
|
connectionsById.values().stream() |
|
|
|
|
private boolean shouldLimitRemoteConnections() { |
|
|
|
|
return maxRemotelyInitiatedConnections < maxConnections; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean remoteConnectionLimitReached() { |
|
|
|
|
return shouldLimitRemoteConnections() |
|
|
|
|
&& countUntrustedRemotelyInitiatedConnections() >= maxRemotelyInitiatedConnections; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private long countUntrustedRemotelyInitiatedConnections() { |
|
|
|
|
return connectionsById.values().stream() |
|
|
|
|
.filter(RlpxConnection::isActive) |
|
|
|
|
.filter(RlpxConnection::initiatedRemotely) |
|
|
|
|
.count()); |
|
|
|
|
final double fractionRemoteConnections = |
|
|
|
|
(double) (remotelyInitiatedConnectionsCount + 1) / (double) maxPeers; |
|
|
|
|
return fractionRemoteConnections > fractionRemoteConnectionsAllowed; |
|
|
|
|
.filter(conn -> !peerPrivileges.canExceedConnectionLimits(conn.getPeer())) |
|
|
|
|
.count(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void enforceRemoteConnectionLimits() { |
|
|
|
|
if (!shouldLimitRemoteConnections() |
|
|
|
|
|| connectionsById.size() < maxRemotelyInitiatedConnections) { |
|
|
|
|
// Nothing to do
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
getActivePrioritizedConnections() |
|
|
|
|
.filter(RlpxConnection::initiatedRemotely) |
|
|
|
|
.filter(conn -> !peerPrivileges.canExceedConnectionLimits(conn.getPeer())) |
|
|
|
|
.skip(maxRemotelyInitiatedConnections) |
|
|
|
|
.forEach(c -> c.disconnect(DisconnectReason.TOO_MANY_PEERS)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void enforceConnectionLimits() { |
|
|
|
|
connectionsById.values().stream() |
|
|
|
|
.filter(RlpxConnection::isActive) |
|
|
|
|
.sorted(this::prioritizeConnections) |
|
|
|
|
.skip(maxPeers) |
|
|
|
|
.filter(c -> !peerProperties.ignoreMaxPeerLimits(c.getPeer())) |
|
|
|
|
if (connectionsById.size() < maxConnections) { |
|
|
|
|
// Nothing to do - we're under our limits
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
getActivePrioritizedConnections() |
|
|
|
|
.skip(maxConnections) |
|
|
|
|
.filter(c -> !peerPrivileges.canExceedConnectionLimits(c.getPeer())) |
|
|
|
|
.forEach(c -> c.disconnect(DisconnectReason.TOO_MANY_PEERS)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private Stream<RlpxConnection> getActivePrioritizedConnections() { |
|
|
|
|
return connectionsById.values().stream() |
|
|
|
|
.filter(RlpxConnection::isActive) |
|
|
|
|
.sorted(this::prioritizeConnections); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private int prioritizeConnections(final RlpxConnection a, final RlpxConnection b) { |
|
|
|
|
final boolean aIgnoresPeerLimits = peerProperties.ignoreMaxPeerLimits(a.getPeer()); |
|
|
|
|
final boolean bIgnoresPeerLimits = peerProperties.ignoreMaxPeerLimits(b.getPeer()); |
|
|
|
|
final boolean aIgnoresPeerLimits = peerPrivileges.canExceedConnectionLimits(a.getPeer()); |
|
|
|
|
final boolean bIgnoresPeerLimits = peerPrivileges.canExceedConnectionLimits(b.getPeer()); |
|
|
|
|
if (aIgnoresPeerLimits && !bIgnoresPeerLimits) { |
|
|
|
|
return -1; |
|
|
|
|
} else if (bIgnoresPeerLimits && !aIgnoresPeerLimits) { |
|
|
|
@ -476,7 +498,7 @@ public class RlpxAgent { |
|
|
|
|
private KeyPair keyPair; |
|
|
|
|
private LocalNode localNode; |
|
|
|
|
private RlpxConfiguration config; |
|
|
|
|
private PeerProperties peerProperties; |
|
|
|
|
private PeerPrivileges peerPrivileges; |
|
|
|
|
private PeerPermissions peerPermissions; |
|
|
|
|
private ConnectionInitializer connectionInitializer; |
|
|
|
|
private PeerConnectionEvents connectionEvents; |
|
|
|
@ -500,21 +522,20 @@ public class RlpxAgent { |
|
|
|
|
new PeerRlpxPermissions(localNode, peerPermissions); |
|
|
|
|
return new RlpxAgent( |
|
|
|
|
localNode, |
|
|
|
|
rlpxPermissions, |
|
|
|
|
connectionEvents, |
|
|
|
|
connectionInitializer, |
|
|
|
|
rlpxPermissions, |
|
|
|
|
peerPrivileges, |
|
|
|
|
config.getMaxPeers(), |
|
|
|
|
peerProperties, |
|
|
|
|
metricsSystem, |
|
|
|
|
config.isLimitRemoteWireConnectionsEnabled(), |
|
|
|
|
config.getFractionRemoteWireConnectionsAllowed()); |
|
|
|
|
config.getMaxRemotelyInitiatedConnections(), |
|
|
|
|
metricsSystem); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void validate() { |
|
|
|
|
checkState(keyPair != null, "KeyPair must be configured"); |
|
|
|
|
checkState(localNode != null, "LocalNode must be configured"); |
|
|
|
|
checkState(config != null, "RlpxConfiguration must be set"); |
|
|
|
|
checkState(peerProperties != null, "MaintainedPeers must be configured"); |
|
|
|
|
checkState(peerPrivileges != null, "PeerPrivileges must be configured"); |
|
|
|
|
checkState(peerPermissions != null, "PeerPermissions must be configured"); |
|
|
|
|
checkState(metricsSystem != null, "MetricsSystem must be configured"); |
|
|
|
|
} |
|
|
|
@ -543,9 +564,9 @@ public class RlpxAgent { |
|
|
|
|
return this; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public Builder peerProperties(final PeerProperties peerProperties) { |
|
|
|
|
checkNotNull(peerProperties); |
|
|
|
|
this.peerProperties = peerProperties; |
|
|
|
|
public Builder peerPrivileges(final PeerPrivileges peerPrivileges) { |
|
|
|
|
checkNotNull(peerPrivileges); |
|
|
|
|
this.peerPrivileges = peerPrivileges; |
|
|
|
|
return this; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|