[PIE-1784] Select the pivot block from a minimal peer set (#1710)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
mbaxter 5 years ago committed by GitHub
parent e1ef09a1b7
commit bbdfe3257d
  1. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java
  2. 45
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActions.java
  3. 17
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeersTest.java
  4. 9
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  5. 20
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java
  6. 160
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActionsTest.java
  7. 1
      pantheon/src/main/java/tech/pegasys/pantheon/cli/DefaultCommandValues.java

@ -127,6 +127,10 @@ public class EthPeers {
return streamAvailablePeers().max(BEST_CHAIN);
}
public Optional<EthPeer> bestPeerWithHeightEstimate() {
return streamAvailablePeers().filter(p -> p.chainState().hasEstimatedHeight()).max(BEST_CHAIN);
}
@FunctionalInterface
public interface ConnectCallback {
void onPeerConnected(EthPeer newPeer);

@ -80,14 +80,8 @@ public class FastSyncActions<C> {
return waitForAnyPeer().thenApply(ignore -> fastSyncState);
}
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(
ethContext, syncConfig.getFastSyncMinimumPeerCount(), metricsSystem);
LOG.debug("Waiting for at least {} peers.", syncConfig.getFastSyncMinimumPeerCount());
return ethContext
.getScheduler()
.scheduleServiceTask(waitForPeersTask)
return waitForPeers(syncConfig.getFastSyncMinimumPeerCount())
.thenApply(successfulWaitResult -> fastSyncState);
}
@ -104,6 +98,12 @@ public class FastSyncActions<C> {
});
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(ethContext, count, metricsSystem);
return waitForPeersTask.run();
}
public CompletableFuture<FastSyncState> selectPivotBlock(final FastSyncState fastSyncState) {
return fastSyncState.hasPivotBlockHeader()
? completedFuture(fastSyncState)
@ -113,14 +113,28 @@ public class FastSyncActions<C> {
private CompletableFuture<FastSyncState> selectPivotBlockFromPeers() {
return ethContext
.getEthPeers()
.bestPeer()
.filter(peer -> peer.chainState().hasEstimatedHeight())
.bestPeerWithHeightEstimate()
// Only select a pivot block number when we have a minimum number of height estimates
.filter(
peer -> {
final long peerCount = countPeersWithEstimatedHeight();
final int minPeerCount = syncConfig.getFastSyncMinimumPeerCount();
if (peerCount < minPeerCount) {
LOG.info(
"Waiting for peers with chain height information. {} / {} required peers currently available.",
peerCount,
minPeerCount);
return false;
}
return true;
})
.map(
peer -> {
final long pivotBlockNumber =
peer.chainState().getEstimatedHeight() - syncConfig.getFastSyncPivotDistance();
if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) {
// Peer's chain isn't long enough, return an empty value so we can try again.
LOG.info("Waiting for peer with sufficient chain height");
return null;
}
LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber);
@ -131,12 +145,21 @@ public class FastSyncActions<C> {
.orElseGet(this::retrySelectPivotBlockAfterDelay);
}
private long countPeersWithEstimatedHeight() {
return ethContext
.getEthPeers()
.streamAvailablePeers()
.filter(peer -> peer.chainState().hasEstimatedHeight())
.count();
}
private CompletableFuture<FastSyncState> retrySelectPivotBlockAfterDelay() {
LOG.info("Waiting for peer with sufficient chain height");
return ethContext
.getScheduler()
.scheduleFutureTask(
() -> waitForAnyPeer().thenCompose(ignore -> selectPivotBlockFromPeers()),
() ->
waitForPeers(syncConfig.getFastSyncMinimumPeerCount())
.thenCompose(ignore -> selectPivotBlockFromPeers()),
Duration.ofSeconds(1));
}

@ -32,6 +32,7 @@ import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.D
import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CancellationException;
import java.util.function.Consumer;
@ -67,14 +68,22 @@ public class EthPeersTest {
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerA)).isGreaterThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerB);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate())
.contains(peerB);
}
@Test
public void comparesPeersWithTdAndNoHeight() {
final EthPeer peerA =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, UInt256.of(100), 0).getEthPeer();
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, UInt256.of(100), OptionalLong.empty())
.getEthPeer();
final EthPeer peerB =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, UInt256.of(50), 0).getEthPeer();
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, UInt256.of(50), OptionalLong.empty())
.getEthPeer();
// Sanity check
assertThat(peerA.chainState().getEstimatedHeight()).isEqualTo(0);
@ -87,6 +96,10 @@ public class EthPeersTest {
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerA)).isLessThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerA);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate())
.isEmpty();
}
@Test

@ -32,6 +32,8 @@ import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.OptionalLong;
public class EthProtocolManagerTestUtil {
public static EthProtocolManager create(
@ -137,6 +139,13 @@ public class EthProtocolManagerTestUtil {
return RespondingEthPeer.create(ethProtocolManager, td, estimatedHeight);
}
public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager,
final UInt256 td,
final OptionalLong estimatedHeight) {
return RespondingEthPeer.create(ethProtocolManager, td, estimatedHeight);
}
public static RespondingEthPeer createPeer(final EthProtocolManager ethProtocolManager) {
return RespondingEthPeer.create(ethProtocolManager, UInt256.of(1000L));
}

@ -43,6 +43,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@ -110,6 +111,14 @@ public class RespondingEthPeer {
return create(ethProtocolManager, chainHeadHash, totalDifficulty, estimatedHeight);
}
public static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final UInt256 totalDifficulty,
final OptionalLong estimatedHeight) {
final Hash chainHeadHash = gen.hash();
return create(ethProtocolManager, chainHeadHash, totalDifficulty, estimatedHeight);
}
public static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final Hash chainHeadHash,
@ -122,6 +131,15 @@ public class RespondingEthPeer {
final Hash chainHeadHash,
final UInt256 totalDifficulty,
final long estimatedHeight) {
return create(
ethProtocolManager, chainHeadHash, totalDifficulty, OptionalLong.of(estimatedHeight));
}
public static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final Hash chainHeadHash,
final UInt256 totalDifficulty,
final OptionalLong estimatedHeight) {
final EthPeers ethPeers = ethProtocolManager.ethContext().getEthPeers();
final Set<Capability> caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63));
@ -132,7 +150,7 @@ public class RespondingEthPeer {
ethPeers.registerConnection(peerConnection);
final EthPeer peer = ethPeers.peer(peerConnection);
peer.registerStatusReceived(chainHeadHash, totalDifficulty);
peer.chainState().update(chainHeadHash, estimatedHeight);
estimatedHeight.ifPresent(height -> peer.chainState().update(chainHeadHash, height));
peer.registerStatusSent();
return new RespondingEthPeer(ethProtocolManager, peerConnection, peer, outgoingMessages);

@ -36,6 +36,9 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@ -44,39 +47,27 @@ import org.junit.Test;
public class FastSyncActionsTest {
private final SynchronizerConfiguration syncConfig =
new SynchronizerConfiguration.Builder()
.syncMode(SyncMode.FAST)
.fastSyncPivotDistance(1000)
.build();
private final BlockchainSetupUtil<Void> blockchainSetupUtil = BlockchainSetupUtil.forTesting();
private final SynchronizerConfiguration.Builder syncConfigBuilder =
new SynchronizerConfiguration.Builder().syncMode(SyncMode.FAST).fastSyncPivotDistance(1000);
private final FastSyncStateStorage fastSyncStateStorage = mock(FastSyncStateStorage.class);
private final AtomicInteger timeoutCount = new AtomicInteger(0);
private SynchronizerConfiguration syncConfig = syncConfigBuilder.build();
private FastSyncActions<Void> fastSyncActions;
private EthProtocolManager ethProtocolManager;
private MutableBlockchain blockchain;
@Before
public void setUp() {
final BlockchainSetupUtil<Void> blockchainSetupUtil = BlockchainSetupUtil.forTesting();
blockchainSetupUtil.importAllBlocks();
blockchain = blockchainSetupUtil.getBlockchain();
final ProtocolSchedule<Void> protocolSchedule = blockchainSetupUtil.getProtocolSchedule();
final ProtocolContext<Void> protocolContext = blockchainSetupUtil.getProtocolContext();
ethProtocolManager =
EthProtocolManagerTestUtil.create(
blockchain,
blockchainSetupUtil.getWorldArchive(),
() -> timeoutCount.getAndDecrement() > 0);
final EthContext ethContext = ethProtocolManager.ethContext();
fastSyncActions =
new FastSyncActions<>(
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
new SyncState(blockchain, ethContext.getEthPeers()),
new NoOpMetricsSystem());
fastSyncActions = createFastSyncActions(syncConfig);
}
@Test
@ -116,6 +107,11 @@ public class FastSyncActionsTest {
@Test
public void selectPivotBlockShouldSelectBlockPivotDistanceFromBestPeer() {
final int minPeers = 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000);
final CompletableFuture<FastSyncState> result =
@ -126,6 +122,11 @@ public class FastSyncActionsTest {
@Test
public void selectPivotBlockShouldConsiderTotalDifficultyWhenSelectingBestPeer() {
final int minPeers = 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, UInt256.of(1000), 5500);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, UInt256.of(2000), 4000);
@ -136,19 +137,124 @@ public class FastSyncActionsTest {
}
@Test
public void selectPivotBlockShouldWaitAndRetryIfNoPeersAreAvailable() {
public void selectPivotBlockShouldWaitAndRetryUntilMinHeightEstimatesAreAvailable() {
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
final int minPeers = 2;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
final CompletableFuture<FastSyncState> result =
fastSyncActions.selectPivotBlock(EMPTY_SYNC_STATE);
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isNotDone();
// First peer is under the threshold, we should keep retrying
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000);
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isNotDone();
// Second peers meets min peer threshold, we should select the pivot
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000);
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isDone();
final FastSyncState expected = new FastSyncState(4000);
assertThat(result).isCompletedWithValue(expected);
}
@Test
public void selectPivotBlockShouldWaitAndRetryIfSufficientChainHeightEstimatesAreUnavailable() {
final int minPeers = 3;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
// Create peers without chain height estimates
List<RespondingEthPeer> peers = new ArrayList<>();
for (int i = 0; i < minPeers; i++) {
final UInt256 td = UInt256.of(i);
final OptionalLong height = OptionalLong.empty();
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height);
peers.add(peer);
}
// No pivot should be selected while peers do not have height estimates
final CompletableFuture<FastSyncState> result =
fastSyncActions.selectPivotBlock(EMPTY_SYNC_STATE);
assertThat(result).isNotDone();
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isNotDone();
// Set subset of heights
peers
.subList(0, minPeers - 1)
.forEach(p -> p.getEthPeer().chainState().updateHeightEstimate(minPivotHeight + 10));
// No pivot should be selected while only a subset of peers have height estimates
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isNotDone();
// Set final height
final long bestPeerHeight = minPivotHeight + 1;
peers.get(minPeers - 1).getEthPeer().chainState().updateHeightEstimate(bestPeerHeight);
final FastSyncState expected =
new FastSyncState(bestPeerHeight - syncConfig.getFastSyncPivotDistance());
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isCompletedWithValue(expected);
}
@Test
public void selectPivotBlockUsesBestPeerWithHeightEstimate() {
final int minPeers = 3;
final int peerCount = minPeers + 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
// Create peers without chain height estimates
List<RespondingEthPeer> peers = new ArrayList<>();
for (int i = 0; i < peerCount; i++) {
// Best peer by td is the first peer, td decreases as i increases
final UInt256 td = UInt256.of(peerCount - i);
final OptionalLong height;
if (i == 0) {
// Don't set a height estimate for the best peer
height = OptionalLong.empty();
} else {
// Height increases with i
height = OptionalLong.of(minPivotHeight + i);
}
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height);
peers.add(peer);
}
final CompletableFuture<FastSyncState> result =
fastSyncActions.selectPivotBlock(EMPTY_SYNC_STATE);
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
final long expectedBestChainHeight =
peers.get(1).getEthPeer().chainState().getEstimatedHeight();
final FastSyncState expected =
new FastSyncState(expectedBestChainHeight - syncConfig.getFastSyncPivotDistance());
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isCompletedWithValue(expected);
}
@Test
public void selectPivotBlockShouldWaitAndRetryIfBestPeerChainIsShorterThanPivotDistance() {
final int minPeers = 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
final long pivotDistance = syncConfig.getFastSyncPivotDistance();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, pivotDistance - 1);
@ -169,7 +275,10 @@ public class FastSyncActionsTest {
public void selectPivotBlockShouldRetryIfBestPeerChainIsEqualToPivotDistance() {
final long pivotDistance = syncConfig.getFastSyncPivotDistance();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, pivotDistance);
// Create peers with chains that are too short
for (int i = 0; i < syncConfig.getFastSyncMinimumPeerCount(); i++) {
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, pivotDistance);
}
final CompletableFuture<FastSyncState> result =
fastSyncActions.selectPivotBlock(EMPTY_SYNC_STATE);
@ -203,4 +312,17 @@ public class FastSyncActionsTest {
assertThat(result).isCompletedWithValue(new FastSyncState(blockchain.getBlockHeader(1).get()));
}
private FastSyncActions<Void> createFastSyncActions(final SynchronizerConfiguration syncConfig) {
final ProtocolSchedule<Void> protocolSchedule = blockchainSetupUtil.getProtocolSchedule();
final ProtocolContext<Void> protocolContext = blockchainSetupUtil.getProtocolContext();
final EthContext ethContext = ethProtocolManager.ethContext();
return new FastSyncActions<>(
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
new SyncState(blockchain, ethContext.getEthPeers()),
new NoOpMetricsSystem());
}
}

@ -62,7 +62,6 @@ public interface DefaultCommandValues {
// but we use FULL for the moment as Fast is still in progress
SyncMode DEFAULT_SYNC_MODE = SyncMode.FULL;
NatMethod DEFAULT_NAT_METHOD = NatMethod.NONE;
int FAST_SYNC_MAX_WAIT_TIME = 0;
int FAST_SYNC_MIN_PEER_COUNT = 5;
int DEFAULT_MAX_PEERS = 25;
float DEFAULT_FRACTION_REMOTE_WIRE_CONNECTIONS_ALLOWED =

Loading…
Cancel
Save