[PAN-2342] Update discovery logic to trust bootnodes only when out of sync (#1039)

* [PAN-2342] Created SyncStatusNodePermissioningProvider and NodePermissioningController

* Fix block height comparison logic

* Unit test for SyncStatusNodePermissioningProvider

* Add comment about permissioning while not in sync

* PR comments

* Fix missing final

* Fixing unit test

* Unsubscribing from Synchronizer SyncStatus updates after reaching sync

* Fix race condition

* Simplifying synchronization between callbacks

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Lucas Saldanha 6 years ago committed by GitHub
parent 55563e04b2
commit 2fac2397b3
  1. 12
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java
  2. 1
      ethereum/permissioning/build.gradle
  3. 50
      ethereum/permissioning/src/main/java/tech/pegasys/pantheon/ethereum/permissioning/node/NodePermissioningController.java
  4. 21
      ethereum/permissioning/src/main/java/tech/pegasys/pantheon/ethereum/permissioning/node/NodePermissioningProvider.java
  5. 102
      ethereum/permissioning/src/main/java/tech/pegasys/pantheon/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProvider.java
  6. 60
      ethereum/permissioning/src/test/java/tech/pegasys/pantheon/ethereum/permissioning/node/NodePermissioningControllerTest.java
  7. 157
      ethereum/permissioning/src/test/java/tech/pegasys/pantheon/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProviderTest.java

@ -30,6 +30,7 @@ import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerTable.EvictResu
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist;
import tech.pegasys.pantheon.ethereum.permissioning.NodeWhitelistController;
import tech.pegasys.pantheon.ethereum.permissioning.node.NodePermissioningController;
import tech.pegasys.pantheon.ethereum.permissioning.node.NodeWhitelistUpdatedEvent;
import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -135,6 +136,9 @@ public class PeerDiscoveryController {
private RecursivePeerRefreshState recursivePeerRefreshState;
private final Optional<NodePermissioningController> nodePermissioningController =
Optional.empty();
public PeerDiscoveryController(
final KeyPair keypair,
final DiscoveryPeer localPeer,
@ -189,7 +193,15 @@ public class PeerDiscoveryController {
bootstrapNodes.stream()
.filter(this::whitelistIfPresentIsNodePermitted)
.collect(Collectors.toList());
if (nodePermissioningController.isPresent()) {
nodePermissioningController
.get()
.startPeerDiscoveryCallback(
() -> recursivePeerRefreshState.start(initialDiscoveryPeers, localPeer.getId()));
} else {
recursivePeerRefreshState.start(initialDiscoveryPeers, localPeer.getId());
}
final long timerId =
timerUtil.setPeriodic(

@ -27,6 +27,7 @@ jar {
dependencies {
implementation project(':util')
implementation project(':ethereum:core')
implementation 'com.google.guava:guava'
implementation 'net.consensys.cava:cava-toml'

@ -0,0 +1,50 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.permissioning.node;
import tech.pegasys.pantheon.ethereum.permissioning.node.provider.SyncStatusNodePermissioningProvider;
import tech.pegasys.pantheon.util.enode.EnodeURL;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class NodePermissioningController {
private static final Logger LOG = LogManager.getLogger();
private final Optional<SyncStatusNodePermissioningProvider> syncStatusNodePermissioningProvider;
public NodePermissioningController(
final SyncStatusNodePermissioningProvider syncStatusNodePermissioningProvider) {
this.syncStatusNodePermissioningProvider = Optional.of(syncStatusNodePermissioningProvider);
}
public NodePermissioningController() {
this.syncStatusNodePermissioningProvider = Optional.empty();
}
public boolean isPermitted(final EnodeURL sourceEnode, final EnodeURL destinationEnode) {
LOG.trace("Checking node permission: {} -> {}", sourceEnode, destinationEnode);
return syncStatusNodePermissioningProvider
.map((provider) -> provider.isPermitted(sourceEnode, destinationEnode))
.orElse(true);
}
public void startPeerDiscoveryCallback(final Runnable peerDiscoveryCallback) {
syncStatusNodePermissioningProvider.ifPresent(
(p) -> p.setHasReachedSyncCallback(peerDiscoveryCallback));
}
}

@ -0,0 +1,21 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.permissioning.node;
import tech.pegasys.pantheon.util.enode.EnodeURL;
@FunctionalInterface
public interface NodePermissioningProvider {
boolean isPermitted(final EnodeURL sourceEnode, final EnodeURL destinationEnode);
}

@ -0,0 +1,102 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.permissioning.node.provider;
import static com.google.common.base.Preconditions.checkNotNull;
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.permissioning.node.NodePermissioningProvider;
import tech.pegasys.pantheon.util.enode.EnodeURL;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.OptionalLong;
import com.google.common.annotations.VisibleForTesting;
public class SyncStatusNodePermissioningProvider implements NodePermissioningProvider {
private final Synchronizer synchronizer;
private final Collection<EnodeURL> bootnodes = new HashSet<>();
private OptionalLong syncStatusObserverId;
private boolean hasReachedSync = false;
private Optional<Runnable> hasReachedSyncCallback = Optional.empty();
public SyncStatusNodePermissioningProvider(
final Synchronizer synchronizer, final Collection<EnodeURL> bootnodes) {
checkNotNull(synchronizer);
this.synchronizer = synchronizer;
long id = this.synchronizer.observeSyncStatus(this::handleSyncStatusUpdate);
this.syncStatusObserverId = OptionalLong.of(id);
this.bootnodes.addAll(bootnodes);
}
private void handleSyncStatusUpdate(final SyncStatus syncStatus) {
if (syncStatus != null) {
long blocksBehind = syncStatus.getHighestBlock() - syncStatus.getCurrentBlock();
if (blocksBehind <= 0) {
synchronized (this) {
if (!hasReachedSync) {
runCallback();
syncStatusObserverId.ifPresent(
id -> {
synchronizer.removeObserver(id);
syncStatusObserverId = OptionalLong.empty();
});
hasReachedSync = true;
}
}
}
}
}
public synchronized void setHasReachedSyncCallback(final Runnable runnable) {
if (hasReachedSync) {
runCallback();
} else {
this.hasReachedSyncCallback = Optional.of(runnable);
}
}
private synchronized void runCallback() {
hasReachedSyncCallback.ifPresent(Runnable::run);
hasReachedSyncCallback = Optional.empty();
}
/**
* Before reaching a sync'd state, the node will only be allowed to talk to its bootnodes
* (outgoing connections). After reaching a sync'd state, it is expected that other providers will
* check the permissions (most likely the smart contract based provider). That's why we always
* return true after reaching a sync'd state.
*
* @param sourceEnode the enode source of the packet or connection
* @param destinationEnode the enode target of the packet or connection
* @return true, if the communication from sourceEnode to destinationEnode is permitted, false
* otherwise
*/
@Override
public boolean isPermitted(final EnodeURL sourceEnode, final EnodeURL destinationEnode) {
if (hasReachedSync) {
return true;
} else {
return bootnodes.contains(destinationEnode);
}
}
@VisibleForTesting
boolean hasReachedSync() {
return hasReachedSync;
}
}

@ -0,0 +1,60 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.permissioning.node;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import tech.pegasys.pantheon.ethereum.permissioning.node.provider.SyncStatusNodePermissioningProvider;
import tech.pegasys.pantheon.util.enode.EnodeURL;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class NodePermissioningControllerTest {
private static final EnodeURL enode1 =
new EnodeURL(
"enode://94c15d1b9e2fe7ce56e458b9a3b672ef11894ddedd0c6f247e0f1d3487f52b66208fb4aeb8179fce6e3a749ea93ed147c37976d67af557508d199d9594c35f09@192.168.0.2:1234");
private static final EnodeURL enode2 =
new EnodeURL(
"enode://6f8a80d14311c39f35f516fa664deaaaa13e85b2f7493f37f6144d86991ec012937307647bd3b9a82abe2974e1407241d54947bbb39763a4cac9f77166ad92a0@192.168.0.3:5678");
@Mock private SyncStatusNodePermissioningProvider syncStatusNodePermissioningProvider;
private NodePermissioningController controller;
@Before
public void before() {
this.controller = new NodePermissioningController(syncStatusNodePermissioningProvider);
}
@Test
public void isPermittedShouldDelegateToSyncStatusProvider() {
controller.isPermitted(enode1, enode2);
verify(syncStatusNodePermissioningProvider).isPermitted(eq(enode1), eq(enode2));
}
@Test
public void peerDiscoveryCallbackShouldBeDelegatedToSyncStatusNodePermissioningProvider() {
controller.startPeerDiscoveryCallback(() -> {});
verify(syncStatusNodePermissioningProvider).setHasReachedSyncCallback(any(Runnable.class));
}
}

@ -0,0 +1,157 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.permissioning.node.provider;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.util.enode.EnodeURL;
import java.util.ArrayList;
import java.util.Collection;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class SyncStatusNodePermissioningProviderTest {
private static final EnodeURL bootnode =
new EnodeURL(
"enode://6332792c4a00e3e4ee0926ed89e0d27ef985424d97b6a45bf0f23e51f0dcb5e66b875777506458aea7af6f9e4ffb69f43f3778ee73c81ed9d34c51c4b16b0b0f@192.168.0.1:9999");
private static final EnodeURL enode1 =
new EnodeURL(
"enode://94c15d1b9e2fe7ce56e458b9a3b672ef11894ddedd0c6f247e0f1d3487f52b66208fb4aeb8179fce6e3a749ea93ed147c37976d67af557508d199d9594c35f09@192.168.0.2:1234");
private static final EnodeURL enode2 =
new EnodeURL(
"enode://6f8a80d14311c39f35f516fa664deaaaa13e85b2f7493f37f6144d86991ec012937307647bd3b9a82abe2974e1407241d54947bbb39763a4cac9f77166ad92a0@192.168.0.3:5678");
@Mock private Synchronizer synchronizer;
private Collection<EnodeURL> bootnodes = new ArrayList<>();
private SyncStatusNodePermissioningProvider provider;
private SyncStatusListener syncStatusListener;
private long syncStatusObserverId = 1L;
@Before
public void before() {
final ArgumentCaptor<SyncStatusListener> captor =
ArgumentCaptor.forClass(SyncStatusListener.class);
when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(syncStatusObserverId);
bootnodes.add(bootnode);
this.provider = new SyncStatusNodePermissioningProvider(synchronizer, bootnodes);
this.syncStatusListener = captor.getValue();
verify(synchronizer).observeSyncStatus(any());
}
@Test
public void whenIsNotInSyncHasReachedSyncShouldReturnFalse() {
syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2));
assertThat(provider.hasReachedSync()).isFalse();
}
@Test
public void whenInSyncHasReachedSyncShouldReturnTrue() {
syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 1));
assertThat(provider.hasReachedSync()).isTrue();
}
@Test
public void whenInSyncChangesFromTrueToFalseHasReachedSyncShouldReturnTrue() {
syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2));
assertThat(provider.hasReachedSync()).isFalse();
syncStatusListener.onSyncStatus(new SyncStatus(0, 2, 1));
assertThat(provider.hasReachedSync()).isTrue();
syncStatusListener.onSyncStatus(new SyncStatus(0, 2, 3));
assertThat(provider.hasReachedSync()).isTrue();
}
@Test
public void whenNotInSyncShouldNotExecuteCallback() {
final Runnable callbackFunction = mock(Runnable.class);
provider.setHasReachedSyncCallback(callbackFunction);
syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2));
verifyZeroInteractions(callbackFunction);
}
@Test
public void whenInSyncShouldExecuteCallback() {
final Runnable callbackFunction = mock(Runnable.class);
provider.setHasReachedSyncCallback(callbackFunction);
syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 1));
verify(callbackFunction).run();
// after executing callback, it should unsubscribe from the SyncStatus updates
verify(synchronizer).removeObserver(eq(syncStatusObserverId));
}
@Test
public void whenHasNotSyncedNonBootnodeShouldNotBePermitted() {
syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2));
assertThat(provider.hasReachedSync()).isFalse();
boolean isPermitted = provider.isPermitted(enode1, enode2);
assertThat(isPermitted).isFalse();
}
@Test
public void whenHasNotSyncedBootnodeIncomingConnectionShouldNotBePermitted() {
syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2));
assertThat(provider.hasReachedSync()).isFalse();
boolean isPermitted = provider.isPermitted(bootnode, enode1);
assertThat(isPermitted).isFalse();
}
@Test
public void whenHasNotSyncedBootnodeOutgoingConnectionShouldBePermitted() {
syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2));
assertThat(provider.hasReachedSync()).isFalse();
boolean isPermitted = provider.isPermitted(enode1, bootnode);
assertThat(isPermitted).isTrue();
}
@Test
public void whenHasSyncedIsPermittedShouldReturnTrue() {
syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 1));
assertThat(provider.hasReachedSync()).isTrue();
boolean isPermitted = provider.isPermitted(enode1, enode2);
assertThat(isPermitted).isTrue();
}
}
Loading…
Cancel
Save