[NC-2180] Wait for a peer with an estimated chain height before selecting a pivot block (#772)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 96dfc5aa30
commit 9249c9aa2e
  1. 23
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActions.java
  2. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java
  3. 21
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActionsTest.java
  4. 10
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java

@ -12,8 +12,8 @@
*/ */
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT; import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT;
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.NO_PEERS_AVAILABLE;
import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeader;
@ -27,6 +27,7 @@ import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.ExceptionUtils;
import java.time.Duration;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -76,6 +77,8 @@ public class FastSyncActions<C> {
"Fast sync timed out before minimum peer count was reached. Continuing with reduced peers."); "Fast sync timed out before minimum peer count was reached. Continuing with reduced peers.");
result.complete(null); result.complete(null);
} else { } else {
LOG.warn(
"Maximum wait time for fast sync reached but no peers available. Continuing to wait for any available peer.");
waitForAnyPeer() waitForAnyPeer()
.thenAccept(result::complete) .thenAccept(result::complete)
.exceptionally( .exceptionally(
@ -97,8 +100,6 @@ public class FastSyncActions<C> {
} }
private CompletableFuture<Void> waitForAnyPeer() { private CompletableFuture<Void> waitForAnyPeer() {
LOG.warn(
"Maximum wait time for fast sync reached but no peers available. Continuing to wait for any available peer.");
final CompletableFuture<Void> result = new CompletableFuture<>(); final CompletableFuture<Void> result = new CompletableFuture<>();
waitForAnyPeer(result); waitForAnyPeer(result);
return result; return result;
@ -120,10 +121,11 @@ public class FastSyncActions<C> {
}); });
} }
public FastSyncState selectPivotBlock() { public CompletableFuture<FastSyncState> selectPivotBlock() {
return ethContext return ethContext
.getEthPeers() .getEthPeers()
.bestPeer() .bestPeer()
.filter(peer -> peer.chainState().hasEstimatedHeight())
.map( .map(
peer -> { peer -> {
final long pivotBlockNumber = final long pivotBlockNumber =
@ -132,10 +134,19 @@ public class FastSyncActions<C> {
throw new FastSyncException(CHAIN_TOO_SHORT); throw new FastSyncException(CHAIN_TOO_SHORT);
} else { } else {
LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber); LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber);
return new FastSyncState(OptionalLong.of(pivotBlockNumber)); return completedFuture(new FastSyncState(OptionalLong.of(pivotBlockNumber)));
} }
}) })
.orElseThrow(() -> new FastSyncException(NO_PEERS_AVAILABLE)); .orElseGet(this::retrySelectPivotBlockAfterDelay);
}
private CompletableFuture<FastSyncState> retrySelectPivotBlockAfterDelay() {
LOG.info("Waiting for peer with known chain height");
return ethContext
.getScheduler()
.scheduleFutureTask(
() -> waitForAnyPeer().thenCompose(ignore -> selectPivotBlock()),
Duration.ofSeconds(1));
} }
public CompletableFuture<FastSyncState> downloadPivotBlockHeader( public CompletableFuture<FastSyncState> downloadPivotBlockHeader(

@ -34,7 +34,7 @@ public class FastSyncDownloader<C> {
LOG.info("Fast sync enabled"); LOG.info("Fast sync enabled");
return fastSyncActions return fastSyncActions
.waitForSuitablePeers() .waitForSuitablePeers()
.thenApply(state -> fastSyncActions.selectPivotBlock()) .thenCompose(state -> fastSyncActions.selectPivotBlock())
.thenCompose(fastSyncActions::downloadPivotBlockHeader) .thenCompose(fastSyncActions::downloadPivotBlockHeader)
.thenCompose(this::downloadChainAndWorldState); .thenCompose(this::downloadChainAndWorldState);
} }

@ -15,7 +15,6 @@ package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT; import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT;
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.NO_PEERS_AVAILABLE;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_TIMER; import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_TIMER;
import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.ProtocolContext;
@ -49,9 +48,6 @@ public class FastSyncActionsTest {
.fastSyncPivotDistance(1000) .fastSyncPivotDistance(1000)
.build(); .build();
private ProtocolSchedule<Void> protocolSchedule;
private ProtocolContext<Void> protocolContext;
private final LabelledMetric<OperationTimer> ethTasksTimer = NO_OP_LABELLED_TIMER; private final LabelledMetric<OperationTimer> ethTasksTimer = NO_OP_LABELLED_TIMER;
private final AtomicInteger timeoutCount = new AtomicInteger(0); private final AtomicInteger timeoutCount = new AtomicInteger(0);
private FastSyncActions<Void> fastSyncActions; private FastSyncActions<Void> fastSyncActions;
@ -63,8 +59,8 @@ public class FastSyncActionsTest {
final BlockchainSetupUtil<Void> blockchainSetupUtil = BlockchainSetupUtil.forTesting(); final BlockchainSetupUtil<Void> blockchainSetupUtil = BlockchainSetupUtil.forTesting();
blockchainSetupUtil.importAllBlocks(); blockchainSetupUtil.importAllBlocks();
blockchain = blockchainSetupUtil.getBlockchain(); blockchain = blockchainSetupUtil.getBlockchain();
protocolSchedule = blockchainSetupUtil.getProtocolSchedule(); final ProtocolSchedule<Void> protocolSchedule = blockchainSetupUtil.getProtocolSchedule();
protocolContext = blockchainSetupUtil.getProtocolContext(); final ProtocolContext<Void> protocolContext = blockchainSetupUtil.getProtocolContext();
ethProtocolManager = ethProtocolManager =
EthProtocolManagerTestUtil.create( EthProtocolManagerTestUtil.create(
blockchain, blockchain,
@ -111,14 +107,19 @@ public class FastSyncActionsTest {
public void selectPivotBlockShouldSelectBlockPivotDistanceFromBestPeer() { public void selectPivotBlockShouldSelectBlockPivotDistanceFromBestPeer() {
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000); EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000);
final FastSyncState result = fastSyncActions.selectPivotBlock(); final CompletableFuture<FastSyncState> result = fastSyncActions.selectPivotBlock();
final FastSyncState expected = new FastSyncState(OptionalLong.of(4000)); final FastSyncState expected = new FastSyncState(OptionalLong.of(4000));
assertThat(result).isEqualTo(expected); assertThat(result).isCompletedWithValue(expected);
} }
@Test @Test
public void selectPivotBlockShouldFailIfNoPeersAreAvailable() { public void selectPivotBlockShouldWaitAndRetryIfNoPeersAreAvailable() {
assertThrowsFastSyncException(NO_PEERS_AVAILABLE, fastSyncActions::selectPivotBlock); final CompletableFuture<FastSyncState> result = fastSyncActions.selectPivotBlock();
assertThat(result).isNotDone();
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000);
final FastSyncState expected = new FastSyncState(OptionalLong.of(4000));
assertThat(result).isCompletedWithValue(expected);
} }
@Test @Test

@ -51,7 +51,7 @@ public class FastSyncDownloaderTest {
final FastSyncState downloadPivotBlockHeaderState = final FastSyncState downloadPivotBlockHeaderState =
new FastSyncState(OptionalLong.of(50), Optional.of(pivotBlockHeader)); new FastSyncState(OptionalLong.of(50), Optional.of(pivotBlockHeader));
when(fastSyncActions.waitForSuitablePeers()).thenReturn(COMPLETE); when(fastSyncActions.waitForSuitablePeers()).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock()).thenReturn(selectPivotBlockState); when(fastSyncActions.selectPivotBlock()).thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState)) when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState)); .thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(COMPLETE); when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(COMPLETE);
@ -105,7 +105,7 @@ public class FastSyncDownloaderTest {
final FastSyncState downloadPivotBlockHeaderState = final FastSyncState downloadPivotBlockHeaderState =
new FastSyncState(OptionalLong.of(50), Optional.of(pivotBlockHeader)); new FastSyncState(OptionalLong.of(50), Optional.of(pivotBlockHeader));
when(fastSyncActions.waitForSuitablePeers()).thenReturn(COMPLETE); when(fastSyncActions.waitForSuitablePeers()).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock()).thenReturn(selectPivotBlockState); when(fastSyncActions.selectPivotBlock()).thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState)) when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState)); .thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture); when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture);
@ -137,7 +137,7 @@ public class FastSyncDownloaderTest {
final FastSyncState downloadPivotBlockHeaderState = final FastSyncState downloadPivotBlockHeaderState =
new FastSyncState(OptionalLong.of(50), Optional.of(pivotBlockHeader)); new FastSyncState(OptionalLong.of(50), Optional.of(pivotBlockHeader));
when(fastSyncActions.waitForSuitablePeers()).thenReturn(COMPLETE); when(fastSyncActions.waitForSuitablePeers()).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock()).thenReturn(selectPivotBlockState); when(fastSyncActions.selectPivotBlock()).thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState)) when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState)); .thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture); when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture);
@ -169,7 +169,7 @@ public class FastSyncDownloaderTest {
final FastSyncState downloadPivotBlockHeaderState = final FastSyncState downloadPivotBlockHeaderState =
new FastSyncState(OptionalLong.of(50), Optional.of(pivotBlockHeader)); new FastSyncState(OptionalLong.of(50), Optional.of(pivotBlockHeader));
when(fastSyncActions.waitForSuitablePeers()).thenReturn(COMPLETE); when(fastSyncActions.waitForSuitablePeers()).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock()).thenReturn(selectPivotBlockState); when(fastSyncActions.selectPivotBlock()).thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState)) when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState)); .thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture); when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture);
@ -200,7 +200,7 @@ public class FastSyncDownloaderTest {
final FastSyncState downloadPivotBlockHeaderState = final FastSyncState downloadPivotBlockHeaderState =
new FastSyncState(OptionalLong.of(50), Optional.of(pivotBlockHeader)); new FastSyncState(OptionalLong.of(50), Optional.of(pivotBlockHeader));
when(fastSyncActions.waitForSuitablePeers()).thenReturn(COMPLETE); when(fastSyncActions.waitForSuitablePeers()).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock()).thenReturn(selectPivotBlockState); when(fastSyncActions.selectPivotBlock()).thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState)) when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState)); .thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture); when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture);

Loading…
Cancel
Save