Allow Random Peer Prioritization (#1440)

Allows nodes in a network to not strictly prefer older peers. This 
helps break up impenetrable cliques in small, stable networks (private 
networks often fit this description). We generate a random mask for the 
lifetime of the process and use that to xor against the nodeId of a 
potential peer. The desired behavior of the network is that some of the 
nodes will allow the new peer to make an inbound connection but that 
peer can't try to farm a nodeId that would make prioritize it over all 
other peers in a guaranteed fashion.

Signed-off-by: Ratan Rai Sur <ratan.r.sur@gmail.com>
pull/1410/head
Ratan (Rai) Sur 4 years ago committed by GitHub
parent bc69af3930
commit b2e3c42ece
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      CHANGELOG.md
  2. 7
      besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
  3. 7
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  4. 16
      besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
  5. 1
      besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java
  6. 1
      besu/src/test/resources/everything_config.toml
  7. 2
      crypto/src/main/java/org/hyperledger/besu/crypto/SECP256K1.java
  8. 7
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java
  9. 59
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java
  10. 81
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java

@ -1,5 +1,8 @@
# Changelog
### Additions and Improvements
* `--random-peer-priority-enabled` flag added. Allows for incoming connections to be prioritized randomly. This will prevent (typically small, stable) networks from forming impenetrable peer cliques. [#1440](https://github.com/hyperledger/besu/pull/1440)
## Deprecated and Scheduled for removal in _Next_ Release
### --privacy-precompiled-address

@ -164,6 +164,7 @@ public class RunnerBuilder {
private Optional<String> identityString = Optional.empty();
private BesuPluginContextImpl besuPluginContext;
private boolean autoLogBloomCaching = true;
private boolean randomPeerPriority;
public RunnerBuilder vertx(final Vertx vertx) {
this.vertx = vertx;
@ -244,6 +245,11 @@ public class RunnerBuilder {
return this;
}
public RunnerBuilder randomPeerPriority(final boolean randomPeerPriority) {
this.randomPeerPriority = randomPeerPriority;
return this;
}
public RunnerBuilder ethstatsUrl(final String ethstatsUrl) {
this.ethstatsUrl = ethstatsUrl;
return this;
@ -404,6 +410,7 @@ public class RunnerBuilder {
.metricsSystem(metricsSystem)
.supportedCapabilities(caps)
.natService(natService)
.randomPeerPriority(randomPeerPriority)
.build();
final NetworkRunner networkRunner =

@ -351,6 +351,12 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
.toPercentage()
.getValue();
@Option(
names = {"--random-peer-priority-enabled"},
description =
"Allow for incoming connections to be prioritized randomly. This will prevent (typically small, stable) networks from forming impenetrable peer cliques. (default: ${DEFAULT-VALUE})")
private final Boolean randomPeerPriority = false;
@Option(
names = {"--banned-node-ids", "--banned-node-id"},
paramLabel = MANDATORY_NODE_ID_FORMAT_HELP,
@ -1999,6 +2005,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
.limitRemoteWireConnectionsEnabled(isLimitRemoteWireConnectionsEnabled)
.fractionRemoteConnectionsAllowed(
Fraction.fromPercentage(maxRemoteConnectionsPercentage).getValue())
.randomPeerPriority(randomPeerPriority)
.networkingConfiguration(unstableNetworkingOptions.toDomainObject())
.graphQLConfiguration(graphQLConfiguration)
.jsonRpcConfiguration(jsonRpcConfiguration)

@ -1224,6 +1224,22 @@ public class BesuCommandTest extends CommandTestAbstract {
"should be a number between 0 and 100 inclusive");
}
@Test
public void enableRandomConnectionPrioritization() {
parseCommand("--random-peer-priority-enabled");
verify(mockRunnerBuilder).randomPeerPriority(eq(true));
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty();
}
@Test
public void randomConnectionPrioritizationDisabledByDefault() {
parseCommand();
verify(mockRunnerBuilder).randomPeerPriority(eq(false));
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty();
}
@Test
public void syncMode_fast() {
parseCommand("--sync-mode", "FAST");

@ -213,6 +213,7 @@ public abstract class CommandTestAbstract {
.thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.fractionRemoteConnectionsAllowed(anyFloat()))
.thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.randomPeerPriority(anyBoolean())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.p2pEnabled(anyBoolean())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.natMethod(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.natManagerServiceName(any())).thenReturn(mockRunnerBuilder);

@ -39,6 +39,7 @@ p2p-port=1234
max-peers=42
remote-connections-limit-enabled=true
remote-connections-max-percentage=60
random-peer-priority-enabled=false
host-whitelist=["all"]
required-blocks=["8675309=123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"]

@ -493,7 +493,7 @@ public class SECP256K1 {
public static class PublicKey implements java.security.PublicKey {
private static final int BYTE_LENGTH = 64;
public static final int BYTE_LENGTH = 64;
private final Bytes encoded;

@ -398,6 +398,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
private PeerPermissions peerPermissions = PeerPermissions.noop();
private NatService natService = new NatService(Optional.empty());
private boolean randomPeerPriority;
private MetricsSystem metricsSystem;
@ -456,6 +457,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
.peerPrivileges(peerPrivileges)
.localNode(localNode)
.metricsSystem(metricsSystem)
.randomPeerPriority(randomPeerPriority)
.build();
}
@ -471,6 +473,11 @@ public class DefaultP2PNetwork implements P2PNetwork {
return this;
}
public Builder randomPeerPriority(final boolean randomPeerPriority) {
this.randomPeerPriority = randomPeerPriority;
return this;
}
public Builder vertx(final Vertx vertx) {
checkNotNull(vertx);
this.vertx = vertx;

@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.isNull;
import org.hyperledger.besu.crypto.NodeKey;
import org.hyperledger.besu.crypto.SECP256K1;
import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL;
@ -67,7 +68,11 @@ public class RlpxAgent {
private final PeerRlpxPermissions peerPermissions;
private final PeerPrivileges peerPrivileges;
private final int maxConnections;
private final boolean randomPeerPriority;
private final int maxRemotelyInitiatedConnections;
// xor'ing with this mask will allow us to randomly let new peers connect
// without allowing the counterparty to play nodeId farming games
private final Bytes nodeIdMask = Bytes.random(SECP256K1.PublicKey.BYTE_LENGTH);
@VisibleForTesting final Map<Bytes, RlpxConnection> connectionsById = new ConcurrentHashMap<>();
@ -84,6 +89,7 @@ public class RlpxAgent {
final PeerPrivileges peerPrivileges,
final int maxConnections,
final int maxRemotelyInitiatedConnections,
final boolean randomPeerPriority,
final MetricsSystem metricsSystem) {
this.localNode = localNode;
this.connectionEvents = connectionEvents;
@ -91,6 +97,7 @@ public class RlpxAgent {
this.peerPermissions = peerPermissions;
this.peerPrivileges = peerPrivileges;
this.maxConnections = maxConnections;
this.randomPeerPriority = randomPeerPriority;
this.maxRemotelyInitiatedConnections =
Math.min(maxConnections, maxRemotelyInitiatedConnections);
@ -338,19 +345,22 @@ public class RlpxAgent {
peerConnection.disconnect(DisconnectReason.UNKNOWN);
return;
}
// Disconnect if too many peers
if (!peerPrivileges.canExceedConnectionLimits(peer) && getConnectionCount() >= maxConnections) {
LOG.debug("Too many peers. Disconnect incoming connection: {}", peerConnection);
peerConnection.disconnect(DisconnectReason.TOO_MANY_PEERS);
return;
}
// Disconnect if too many remotely-initiated connections
if (!peerPrivileges.canExceedConnectionLimits(peer) && remoteConnectionLimitReached()) {
LOG.debug(
"Too many remotely-initiated connections. Disconnect incoming connection: {}",
peerConnection);
peerConnection.disconnect(DisconnectReason.TOO_MANY_PEERS);
return;
if (!randomPeerPriority) {
// Disconnect if too many peers
if (!peerPrivileges.canExceedConnectionLimits(peer)
&& getConnectionCount() >= maxConnections) {
LOG.debug("Too many peers. Disconnect incoming connection: {}", peerConnection);
peerConnection.disconnect(DisconnectReason.TOO_MANY_PEERS);
return;
}
// Disconnect if too many remotely-initiated connections
if (!peerPrivileges.canExceedConnectionLimits(peer) && remoteConnectionLimitReached()) {
LOG.debug(
"Too many remotely-initiated connections. Disconnect incoming connection: {}",
peerConnection);
peerConnection.disconnect(DisconnectReason.TOO_MANY_PEERS);
return;
}
}
// Disconnect if not permitted
if (!peerPermissions.allowNewInboundConnectionFrom(peer)) {
@ -463,10 +473,10 @@ public class RlpxAgent {
private Stream<RlpxConnection> getActivePrioritizedConnections() {
return connectionsById.values().stream()
.filter(RlpxConnection::isActive)
.sorted(this::prioritizeConnections);
.sorted(this::comparePeerPriorities);
}
private int prioritizeConnections(final RlpxConnection a, final RlpxConnection b) {
private int comparePeerPriorities(final RlpxConnection a, final RlpxConnection b) {
final boolean aIgnoresPeerLimits = peerPrivileges.canExceedConnectionLimits(a.getPeer());
final boolean bIgnoresPeerLimits = peerPrivileges.canExceedConnectionLimits(b.getPeer());
if (aIgnoresPeerLimits && !bIgnoresPeerLimits) {
@ -474,10 +484,20 @@ public class RlpxAgent {
} else if (bIgnoresPeerLimits && !aIgnoresPeerLimits) {
return 1;
} else {
return Math.toIntExact(a.getInitiatedAt() - b.getInitiatedAt());
return randomPeerPriority
? compareByMaskedNodeId(a, b)
: compareConnectionInitiationTimes(a, b);
}
}
private int compareConnectionInitiationTimes(final RlpxConnection a, final RlpxConnection b) {
return Math.toIntExact(a.getInitiatedAt() - b.getInitiatedAt());
}
private int compareByMaskedNodeId(final RlpxConnection a, final RlpxConnection b) {
return a.getPeer().getId().xor(nodeIdMask).compareTo(b.getPeer().getId().xor(nodeIdMask));
}
/**
* Compares two connections to the same peer to determine which connection should be kept
*
@ -535,6 +555,7 @@ public class RlpxAgent {
private PeerPermissions peerPermissions;
private ConnectionInitializer connectionInitializer;
private PeerConnectionEvents connectionEvents;
private boolean randomPeerPriority;
private MetricsSystem metricsSystem;
private Builder() {}
@ -561,6 +582,7 @@ public class RlpxAgent {
peerPrivileges,
config.getMaxPeers(),
config.getMaxRemotelyInitiatedConnections(),
randomPeerPriority,
metricsSystem);
}
@ -620,5 +642,10 @@ public class RlpxAgent {
this.metricsSystem = metricsSystem;
return this;
}
public Builder randomPeerPriority(final boolean randomPeerPriority) {
this.randomPeerPriority = randomPeerPriority;
return this;
}
}
}

@ -58,6 +58,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
@ -321,6 +324,48 @@ public class RlpxAgentTest {
}
}
@Test
public void incomingConnection_succeedsEventuallyWithRandomPeerPrioritization() {
// Saturate connections with one local and one remote
final int maxPeers = 25;
startAgentWithMaxPeers(
maxPeers,
builder -> builder.randomPeerPriority(true),
rlpxConfiguration -> rlpxConfiguration.setLimitRemoteWireConnectionsEnabled(false));
agent.connect(createPeer());
for (int i = 0; i < 24; i++) {
connectionInitializer.simulateIncomingConnection(connection(createPeer()));
}
// Sanity check
assertThat(agent.getConnectionCount()).isEqualTo(maxPeers);
boolean newConnectionDisconnected = false;
boolean oldConnectionDisconnected = false;
// With very high probability we should see the connections churn
for (int i = 0; i < 1000 && !(newConnectionDisconnected && oldConnectionDisconnected); ++i) {
final List<PeerConnection> connectionsBefore =
agent.streamConnections().collect(Collectors.toUnmodifiableList());
// Simulate incoming connection
final Peer newPeer = createPeer();
final MockPeerConnection incomingConnection = connection(newPeer);
connectionInitializer.simulateIncomingConnection(incomingConnection);
final List<PeerConnection> connectionsAfter =
agent.streamConnections().collect(Collectors.toUnmodifiableList());
if (connectionsBefore.equals(connectionsAfter)) {
newConnectionDisconnected = true;
} else if (!connectionsBefore.equals(connectionsAfter)) {
oldConnectionDisconnected = true;
}
assertThat(agent.getConnectionCount()).isEqualTo(maxPeers);
}
assertThat(newConnectionDisconnected).isTrue();
assertThat(oldConnectionDisconnected).isTrue();
}
@Test
public void incomingConnection_afterMaxRemotelyInitiatedConnectionsHaveBeenEstablished() {
final int maxPeers = 10;
@ -959,8 +1004,15 @@ public class RlpxAgentTest {
}
private void startAgentWithMaxPeers(final int maxPeers) {
startAgentWithMaxPeers(maxPeers, Function.identity(), __ -> {});
}
private void startAgentWithMaxPeers(
final int maxPeers,
final Function<RlpxAgent.Builder, RlpxAgent.Builder> buildCustomization,
final Consumer<RlpxConfiguration> rlpxConfigurationModifier) {
config.setMaxPeers(maxPeers);
agent = agent();
agent = agent(buildCustomization, rlpxConfigurationModifier);
startAgent();
}
@ -970,16 +1022,25 @@ public class RlpxAgentTest {
}
private RlpxAgent agent() {
return agent(Function.identity(), __ -> {});
}
private RlpxAgent agent(
final Function<RlpxAgent.Builder, RlpxAgent.Builder> buildCustomization,
final Consumer<RlpxConfiguration> rlpxConfigurationModifier) {
config.setLimitRemoteWireConnectionsEnabled(true);
return RlpxAgent.builder()
.nodeKey(NodeKeyUtils.createFrom(KEY_PAIR))
.config(config)
.peerPermissions(peerPermissions)
.peerPrivileges(peerPrivileges)
.localNode(localNode)
.metricsSystem(metrics)
.connectionInitializer(connectionInitializer)
.connectionEvents(peerConnectionEvents)
rlpxConfigurationModifier.accept(config);
return buildCustomization
.apply(
RlpxAgent.builder()
.nodeKey(NodeKeyUtils.createFrom(KEY_PAIR))
.config(config)
.peerPermissions(peerPermissions)
.peerPrivileges(peerPrivileges)
.localNode(localNode)
.metricsSystem(metrics)
.connectionInitializer(connectionInitializer)
.connectionEvents(peerConnectionEvents))
.build();
}

Loading…
Cancel
Save