mirror of https://github.com/hyperledger/besu
[NC-1273] Start of fast sync downloader (#613)
* Add support for initiating fast sync to DefaultSynchronizer, starting full sync once that completes. * Wait for a minimum number of peers to be available before starting fast sync. * Select pivot block. * Fetch the pivot block header. * Ensure that a majority of peers (which have the pivot block) agree on the block. * Pull isRetryingError and assignPeer up to AbstractRetryingPeerTask so it can be reused. Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>pull/2/head
parent
d6a04f416b
commit
069b2393a9
@ -0,0 +1,146 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; |
||||
|
||||
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT; |
||||
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.NO_PEERS_AVAILABLE; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.ProtocolContext; |
||||
import tech.pegasys.pantheon.ethereum.core.BlockHeader; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; |
||||
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeersTask; |
||||
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; |
||||
import tech.pegasys.pantheon.metrics.LabelledMetric; |
||||
import tech.pegasys.pantheon.metrics.OperationTimer; |
||||
import tech.pegasys.pantheon.util.ExceptionUtils; |
||||
|
||||
import java.util.OptionalLong; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.concurrent.TimeoutException; |
||||
|
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
public class FastSyncActions<C> { |
||||
|
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
private final SynchronizerConfiguration syncConfig; |
||||
private final ProtocolSchedule<C> protocolSchedule; |
||||
private final ProtocolContext<C> protocolContext; |
||||
private final EthContext ethContext; |
||||
private final LabelledMetric<OperationTimer> ethTasksTimer; |
||||
|
||||
public FastSyncActions( |
||||
final SynchronizerConfiguration syncConfig, |
||||
final ProtocolSchedule<C> protocolSchedule, |
||||
final ProtocolContext<C> protocolContext, |
||||
final EthContext ethContext, |
||||
final LabelledMetric<OperationTimer> ethTasksTimer) { |
||||
this.syncConfig = syncConfig; |
||||
this.protocolSchedule = protocolSchedule; |
||||
this.protocolContext = protocolContext; |
||||
this.ethContext = ethContext; |
||||
this.ethTasksTimer = ethTasksTimer; |
||||
} |
||||
|
||||
public CompletableFuture<Void> waitForSuitablePeers() { |
||||
final WaitForPeersTask waitForPeersTask = |
||||
WaitForPeersTask.create( |
||||
ethContext, syncConfig.getFastSyncMinimumPeerCount(), ethTasksTimer); |
||||
|
||||
final EthScheduler scheduler = ethContext.getScheduler(); |
||||
final CompletableFuture<Void> result = new CompletableFuture<>(); |
||||
scheduler |
||||
.timeout(waitForPeersTask, syncConfig.getFastSyncMaximumPeerWaitTime()) |
||||
.handle( |
||||
(waitResult, error) -> { |
||||
if (ExceptionUtils.rootCause(error) instanceof TimeoutException) { |
||||
if (ethContext.getEthPeers().availablePeerCount() > 0) { |
||||
LOG.warn( |
||||
"Fast sync timed out before minimum peer count was reached. Continuing with reduced peers."); |
||||
result.complete(null); |
||||
} else { |
||||
waitForAnyPeer() |
||||
.thenAccept(result::complete) |
||||
.exceptionally( |
||||
taskError -> { |
||||
result.completeExceptionally(error); |
||||
return null; |
||||
}); |
||||
} |
||||
} else if (error != null) { |
||||
LOG.error("Failed to find peers for fast sync", error); |
||||
result.completeExceptionally(error); |
||||
} else { |
||||
result.complete(null); |
||||
} |
||||
return null; |
||||
}); |
||||
|
||||
return result; |
||||
} |
||||
|
||||
private CompletableFuture<Void> waitForAnyPeer() { |
||||
LOG.warn( |
||||
"Maximum wait time for fast sync reached but no peers available. Continuing to wait for any available peer."); |
||||
final CompletableFuture<Void> result = new CompletableFuture<>(); |
||||
waitForAnyPeer(result); |
||||
return result; |
||||
} |
||||
|
||||
private void waitForAnyPeer(final CompletableFuture<Void> result) { |
||||
ethContext |
||||
.getScheduler() |
||||
.timeout(WaitForPeersTask.create(ethContext, 1, ethTasksTimer)) |
||||
.whenComplete( |
||||
(waitResult, throwable) -> { |
||||
if (ExceptionUtils.rootCause(throwable) instanceof TimeoutException) { |
||||
waitForAnyPeer(result); |
||||
} else if (throwable != null) { |
||||
result.completeExceptionally(throwable); |
||||
} else { |
||||
result.complete(waitResult); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
public FastSyncState selectPivotBlock() { |
||||
return ethContext |
||||
.getEthPeers() |
||||
.bestPeer() |
||||
.map( |
||||
peer -> { |
||||
final long pivotBlockNumber = |
||||
peer.chainState().getEstimatedHeight() - syncConfig.fastSyncPivotDistance(); |
||||
if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) { |
||||
throw new FastSyncException(CHAIN_TOO_SHORT); |
||||
} else { |
||||
LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber); |
||||
return new FastSyncState(OptionalLong.of(pivotBlockNumber)); |
||||
} |
||||
}) |
||||
.orElseThrow(() -> new FastSyncException(NO_PEERS_AVAILABLE)); |
||||
} |
||||
|
||||
public CompletableFuture<FastSyncState> downloadPivotBlockHeader( |
||||
final FastSyncState currentState) { |
||||
return new PivotBlockRetriever<>( |
||||
protocolSchedule, |
||||
ethContext, |
||||
ethTasksTimer, |
||||
currentState.getPivotBlockNumber().getAsLong()) |
||||
.downloadPivotBlockHeader(); |
||||
} |
||||
} |
@ -0,0 +1,42 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; |
||||
|
||||
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.FAST_SYNC_UNAVAILABLE; |
||||
|
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
public class FastSyncDownloader<C> { |
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
private final FastSyncActions<C> fastSyncActions; |
||||
|
||||
public FastSyncDownloader(final FastSyncActions<C> fastSyncActions) { |
||||
this.fastSyncActions = fastSyncActions; |
||||
} |
||||
|
||||
public CompletableFuture<FastSyncState> start() { |
||||
LOG.info("Fast sync enabled"); |
||||
return fastSyncActions |
||||
.waitForSuitablePeers() |
||||
.thenApply(state -> fastSyncActions.selectPivotBlock()) |
||||
.thenCompose(fastSyncActions::downloadPivotBlockHeader) |
||||
.thenCompose( |
||||
state -> { |
||||
LOG.info("Reached end of current fast sync implementation with state {}", state); |
||||
throw new FastSyncException(FAST_SYNC_UNAVAILABLE); |
||||
}); |
||||
} |
||||
} |
@ -0,0 +1,21 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; |
||||
|
||||
public enum FastSyncError { |
||||
FAST_SYNC_UNAVAILABLE, |
||||
NO_PEERS_AVAILABLE, |
||||
CHAIN_TOO_SHORT, |
||||
PIVOT_BLOCK_HEADER_MISMATCH, |
||||
UNEXPECTED_ERROR |
||||
} |
@ -0,0 +1,27 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; |
||||
|
||||
public class FastSyncException extends RuntimeException { |
||||
|
||||
private final FastSyncError error; |
||||
|
||||
public FastSyncException(final FastSyncError error) { |
||||
super("Fast sync failed: " + error); |
||||
this.error = error; |
||||
} |
||||
|
||||
public FastSyncError getError() { |
||||
return error; |
||||
} |
||||
} |
@ -0,0 +1,75 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.core.BlockHeader; |
||||
|
||||
import java.util.Objects; |
||||
import java.util.Optional; |
||||
import java.util.OptionalLong; |
||||
|
||||
import com.google.common.base.MoreObjects; |
||||
|
||||
public class FastSyncState { |
||||
|
||||
private final OptionalLong pivotBlockNumber; |
||||
private final Optional<BlockHeader> pivotBlockHeader; |
||||
|
||||
public FastSyncState() { |
||||
this(OptionalLong.empty(), Optional.empty()); |
||||
} |
||||
|
||||
public FastSyncState(final OptionalLong pivotBlockNumber) { |
||||
this(pivotBlockNumber, Optional.empty()); |
||||
} |
||||
|
||||
public FastSyncState( |
||||
final OptionalLong pivotBlockNumber, final Optional<BlockHeader> pivotBlockHeader) { |
||||
this.pivotBlockNumber = pivotBlockNumber; |
||||
this.pivotBlockHeader = pivotBlockHeader; |
||||
} |
||||
|
||||
public OptionalLong getPivotBlockNumber() { |
||||
return pivotBlockNumber; |
||||
} |
||||
|
||||
public Optional<BlockHeader> getPivotBlockHeader() { |
||||
return pivotBlockHeader; |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(final Object o) { |
||||
if (this == o) { |
||||
return true; |
||||
} |
||||
if (o == null || getClass() != o.getClass()) { |
||||
return false; |
||||
} |
||||
final FastSyncState that = (FastSyncState) o; |
||||
return Objects.equals(pivotBlockNumber, that.pivotBlockNumber) |
||||
&& Objects.equals(pivotBlockHeader, that.pivotBlockHeader); |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
return Objects.hash(pivotBlockNumber, pivotBlockHeader); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return MoreObjects.toStringHelper(this) |
||||
.add("pivotBlockNumber", pivotBlockNumber) |
||||
.add("pivotBlockHeader", pivotBlockHeader) |
||||
.toString(); |
||||
} |
||||
} |
@ -0,0 +1,125 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.core.BlockHeader; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; |
||||
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByNumberTask; |
||||
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; |
||||
import tech.pegasys.pantheon.metrics.LabelledMetric; |
||||
import tech.pegasys.pantheon.metrics.OperationTimer; |
||||
|
||||
import java.util.Collection; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.OptionalLong; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.ConcurrentLinkedQueue; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
public class PivotBlockRetriever<C> { |
||||
|
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
private static final int MAX_PIVOT_BLOCK_RETRIES = 3; |
||||
private final long pivotBlockNumber; |
||||
private final EthContext ethContext; |
||||
private final LabelledMetric<OperationTimer> ethTasksTimer; |
||||
private final ProtocolSchedule<C> protocolSchedule; |
||||
private final Map<BlockHeader, AtomicInteger> confirmationsByBlockNumber = |
||||
new ConcurrentHashMap<>(); |
||||
private final CompletableFuture<FastSyncState> result = new CompletableFuture<>(); |
||||
private final Collection<RetryingGetHeaderFromPeerByNumberTask> getHeaderTasks = |
||||
new ConcurrentLinkedQueue<>(); |
||||
|
||||
public PivotBlockRetriever( |
||||
final ProtocolSchedule<C> protocolSchedule, |
||||
final EthContext ethContext, |
||||
final LabelledMetric<OperationTimer> ethTasksTimer, |
||||
final long pivotBlockNumber) { |
||||
this.pivotBlockNumber = pivotBlockNumber; |
||||
this.ethContext = ethContext; |
||||
this.ethTasksTimer = ethTasksTimer; |
||||
this.protocolSchedule = protocolSchedule; |
||||
} |
||||
|
||||
@SuppressWarnings("rawtypes") |
||||
public CompletableFuture<FastSyncState> downloadPivotBlockHeader() { |
||||
final CompletableFuture[] requestFutures = requestHeaderFromAllPeers(); |
||||
|
||||
CompletableFuture.allOf(requestFutures) |
||||
.thenRun( |
||||
() -> { |
||||
// All requests have completed but we still haven't reached agreement on a header.
|
||||
result.completeExceptionally( |
||||
new FastSyncException(FastSyncError.PIVOT_BLOCK_HEADER_MISMATCH)); |
||||
}); |
||||
return result; |
||||
} |
||||
|
||||
@SuppressWarnings("rawtypes") |
||||
private CompletableFuture[] requestHeaderFromAllPeers() { |
||||
final List<EthPeer> peersToQuery = |
||||
ethContext |
||||
.getEthPeers() |
||||
.availablePeers() |
||||
.filter(peer -> peer.chainState().getEstimatedHeight() >= pivotBlockNumber) |
||||
.collect(Collectors.toList()); |
||||
|
||||
final int confirmationsRequired = peersToQuery.size() / 2 + 1; |
||||
return peersToQuery |
||||
.stream() |
||||
.map( |
||||
peer -> { |
||||
final RetryingGetHeaderFromPeerByNumberTask getHeaderTask = createGetHeaderTask(peer); |
||||
getHeaderTasks.add(getHeaderTask); |
||||
return ethContext |
||||
.getScheduler() |
||||
.scheduleSyncWorkerTask(getHeaderTask::getHeader) |
||||
.thenAccept(header -> countHeader(header, confirmationsRequired)); |
||||
}) |
||||
.toArray(CompletableFuture[]::new); |
||||
} |
||||
|
||||
private RetryingGetHeaderFromPeerByNumberTask createGetHeaderTask(final EthPeer peer) { |
||||
final RetryingGetHeaderFromPeerByNumberTask task = |
||||
RetryingGetHeaderFromPeerByNumberTask.forPivotBlock( |
||||
protocolSchedule, ethContext, ethTasksTimer, pivotBlockNumber, MAX_PIVOT_BLOCK_RETRIES); |
||||
task.assignPeer(peer); |
||||
return task; |
||||
} |
||||
|
||||
private void countHeader(final BlockHeader header, final int confirmationsRequired) { |
||||
final int confirmations = |
||||
confirmationsByBlockNumber |
||||
.computeIfAbsent(header, key -> new AtomicInteger(0)) |
||||
.incrementAndGet(); |
||||
LOG.debug( |
||||
"Received header {} which now has {} confirmations out of {} required.", |
||||
header.getHash(), |
||||
confirmations, |
||||
confirmationsRequired); |
||||
if (confirmations >= confirmationsRequired) { |
||||
LOG.info( |
||||
"Confirmed pivot block hash {} with {} confirmations", header.getHash(), confirmations); |
||||
result.complete(new FastSyncState(OptionalLong.of(header.getNumber()), Optional.of(header))); |
||||
getHeaderTasks.forEach(RetryingGetHeaderFromPeerByNumberTask::cancel); |
||||
} |
||||
} |
||||
} |
@ -1,39 +0,0 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.state; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; |
||||
|
||||
public final class FastSyncState { |
||||
private long fastSyncTargetBlockNumber = -1; |
||||
|
||||
private final SynchronizerConfiguration config; |
||||
|
||||
public FastSyncState(final SynchronizerConfiguration config) { |
||||
this.config = config; |
||||
} |
||||
|
||||
/** |
||||
* Registers the chain height that we're trying to sync to. |
||||
* |
||||
* @param blockNumber the height of the chain we are syncing to. |
||||
*/ |
||||
public void setFastSyncChainTarget(final long blockNumber) { |
||||
fastSyncTargetBlockNumber = blockNumber; |
||||
} |
||||
|
||||
/** @return the block number at which we switch from fast sync to full sync */ |
||||
public long pivot() { |
||||
return Math.max(fastSyncTargetBlockNumber - config.fastSyncPivotDistance(), 0); |
||||
} |
||||
} |
@ -0,0 +1,77 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.tasks; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.core.BlockHeader; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractRetryingPeerTask; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; |
||||
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; |
||||
import tech.pegasys.pantheon.metrics.LabelledMetric; |
||||
import tech.pegasys.pantheon.metrics.OperationTimer; |
||||
|
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
public class RetryingGetHeaderFromPeerByNumberTask |
||||
extends AbstractRetryingPeerTask<List<BlockHeader>> { |
||||
private final ProtocolSchedule<?> protocolSchedule; |
||||
private final EthContext ethContext; |
||||
private final LabelledMetric<OperationTimer> ethTasksTimer; |
||||
private final long pivotBlockNumber; |
||||
|
||||
private RetryingGetHeaderFromPeerByNumberTask( |
||||
final ProtocolSchedule<?> protocolSchedule, |
||||
final EthContext ethContext, |
||||
final LabelledMetric<OperationTimer> ethTasksTimer, |
||||
final long pivotBlockNumber, |
||||
final int maxRetries) { |
||||
super(ethContext, maxRetries, ethTasksTimer); |
||||
this.protocolSchedule = protocolSchedule; |
||||
this.ethContext = ethContext; |
||||
this.ethTasksTimer = ethTasksTimer; |
||||
this.pivotBlockNumber = pivotBlockNumber; |
||||
} |
||||
|
||||
public static RetryingGetHeaderFromPeerByNumberTask forPivotBlock( |
||||
final ProtocolSchedule<?> protocolSchedule, |
||||
final EthContext ethContext, |
||||
final LabelledMetric<OperationTimer> ethTasksTimer, |
||||
final long pivotBlockNumber, |
||||
final int maxRetries) { |
||||
return new RetryingGetHeaderFromPeerByNumberTask( |
||||
protocolSchedule, ethContext, ethTasksTimer, pivotBlockNumber, maxRetries); |
||||
} |
||||
|
||||
@Override |
||||
protected CompletableFuture<List<BlockHeader>> executePeerTask( |
||||
final Optional<EthPeer> assignedPeer) { |
||||
final AbstractGetHeadersFromPeerTask getHeadersTask = |
||||
GetHeadersFromPeerByNumberTask.forSingleNumber( |
||||
protocolSchedule, ethContext, pivotBlockNumber, ethTasksTimer); |
||||
assignedPeer.ifPresent(getHeadersTask::assignPeer); |
||||
return executeSubTask(getHeadersTask::run) |
||||
.thenApply( |
||||
peerResult -> { |
||||
if (!peerResult.getResult().isEmpty()) { |
||||
result.get().complete(peerResult.getResult()); |
||||
} |
||||
return peerResult.getResult(); |
||||
}); |
||||
} |
||||
|
||||
public CompletableFuture<BlockHeader> getHeader() { |
||||
return run().thenApply(singletonList -> singletonList.get(0)); |
||||
} |
||||
} |
@ -0,0 +1,156 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy; |
||||
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT; |
||||
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.NO_PEERS_AVAILABLE; |
||||
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_TIMER; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.ProtocolContext; |
||||
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil; |
||||
import tech.pegasys.pantheon.ethereum.eth.sync.SyncMode; |
||||
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; |
||||
import tech.pegasys.pantheon.metrics.LabelledMetric; |
||||
import tech.pegasys.pantheon.metrics.OperationTimer; |
||||
|
||||
import java.util.OptionalLong; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
import org.assertj.core.api.ThrowableAssert.ThrowingCallable; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
|
||||
public class FastSyncActionsTest { |
||||
|
||||
private final SynchronizerConfiguration syncConfig = |
||||
new SynchronizerConfiguration.Builder() |
||||
.syncMode(SyncMode.FAST) |
||||
.fastSyncPivotDistance(1000) |
||||
.build(); |
||||
|
||||
private ProtocolSchedule<Void> protocolSchedule; |
||||
private ProtocolContext<Void> protocolContext; |
||||
|
||||
private final LabelledMetric<OperationTimer> ethTasksTimer = NO_OP_LABELLED_TIMER; |
||||
private final AtomicInteger timeoutCount = new AtomicInteger(0); |
||||
private FastSyncActions<Void> fastSyncActions; |
||||
private EthProtocolManager ethProtocolManager; |
||||
private MutableBlockchain blockchain; |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
final BlockchainSetupUtil<Void> blockchainSetupUtil = BlockchainSetupUtil.forTesting(); |
||||
blockchainSetupUtil.importAllBlocks(); |
||||
blockchain = blockchainSetupUtil.getBlockchain(); |
||||
protocolSchedule = blockchainSetupUtil.getProtocolSchedule(); |
||||
protocolContext = blockchainSetupUtil.getProtocolContext(); |
||||
ethProtocolManager = |
||||
EthProtocolManagerTestUtil.create( |
||||
blockchain, |
||||
blockchainSetupUtil.getWorldArchive(), |
||||
() -> timeoutCount.getAndDecrement() > 0); |
||||
fastSyncActions = |
||||
new FastSyncActions<>( |
||||
syncConfig, |
||||
protocolSchedule, |
||||
protocolContext, |
||||
ethProtocolManager.ethContext(), |
||||
ethTasksTimer); |
||||
} |
||||
|
||||
@Test |
||||
public void waitForPeersShouldSucceedIfEnoughPeersAreFound() { |
||||
for (int i = 0; i < syncConfig.getFastSyncMinimumPeerCount(); i++) { |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager); |
||||
} |
||||
final CompletableFuture<Void> result = fastSyncActions.waitForSuitablePeers(); |
||||
assertThat(result).isCompleted(); |
||||
} |
||||
|
||||
@Test |
||||
public void waitForPeersShouldReportSuccessWhenTimeLimitReachedAndAPeerIsAvailable() { |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager); |
||||
timeoutCount.set(Integer.MAX_VALUE); |
||||
assertThat(fastSyncActions.waitForSuitablePeers()).isCompleted(); |
||||
} |
||||
|
||||
@Test |
||||
public void waitForPeersShouldContinueWaitingUntilAtLeastOnePeerIsAvailable() { |
||||
timeoutCount.set(1); |
||||
final CompletableFuture<Void> result = fastSyncActions.waitForSuitablePeers(); |
||||
assertThat(result).isNotCompleted(); |
||||
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager); |
||||
assertThat(result).isCompleted(); |
||||
} |
||||
|
||||
@Test |
||||
public void selectPivotBlockShouldSelectBlockPivotDistanceFromBestPeer() { |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000); |
||||
|
||||
final FastSyncState result = fastSyncActions.selectPivotBlock(); |
||||
final FastSyncState expected = new FastSyncState(OptionalLong.of(4000)); |
||||
assertThat(result).isEqualTo(expected); |
||||
} |
||||
|
||||
@Test |
||||
public void selectPivotBlockShouldFailIfNoPeersAreAvailable() { |
||||
assertThrowsFastSyncException(NO_PEERS_AVAILABLE, fastSyncActions::selectPivotBlock); |
||||
} |
||||
|
||||
@Test |
||||
public void selectPivotBlockShouldFailIfBestPeerChainIsShorterThanPivotDistance() { |
||||
EthProtocolManagerTestUtil.createPeer( |
||||
ethProtocolManager, syncConfig.fastSyncPivotDistance() - 1); |
||||
|
||||
assertThrowsFastSyncException(CHAIN_TOO_SHORT, fastSyncActions::selectPivotBlock); |
||||
} |
||||
|
||||
@Test |
||||
public void selectPivotBlockShouldFailIfBestPeerChainIsEqualToPivotDistance() { |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, syncConfig.fastSyncPivotDistance()); |
||||
|
||||
assertThrowsFastSyncException(CHAIN_TOO_SHORT, fastSyncActions::selectPivotBlock); |
||||
} |
||||
|
||||
@Test |
||||
public void downloadPivotBlockHeaderShouldRetrievePivotBlockHeader() { |
||||
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1001); |
||||
final CompletableFuture<FastSyncState> result = |
||||
fastSyncActions.downloadPivotBlockHeader(new FastSyncState(OptionalLong.of(1))); |
||||
assertThat(result).isNotCompleted(); |
||||
|
||||
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); |
||||
peer.respond(responder); |
||||
|
||||
assertThat(result) |
||||
.isCompletedWithValue(new FastSyncState(OptionalLong.of(1), blockchain.getBlockHeader(1))); |
||||
} |
||||
|
||||
private void assertThrowsFastSyncException( |
||||
final FastSyncError expectedError, final ThrowingCallable callable) { |
||||
assertThatThrownBy(callable) |
||||
.isInstanceOf(FastSyncException.class) |
||||
.extracting(exception -> ((FastSyncException) exception).getError()) |
||||
.isEqualTo(expectedError); |
||||
} |
||||
} |
@ -0,0 +1,106 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; |
||||
|
||||
import static java.util.concurrent.CompletableFuture.completedFuture; |
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.verifyNoMoreInteractions; |
||||
import static org.mockito.Mockito.when; |
||||
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT; |
||||
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.FAST_SYNC_UNAVAILABLE; |
||||
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.UNEXPECTED_ERROR; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; |
||||
|
||||
import java.util.Optional; |
||||
import java.util.OptionalLong; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import org.junit.Test; |
||||
|
||||
public class FastSyncDownloaderTest { |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
private final FastSyncActions<Void> fastSyncActions = mock(FastSyncActions.class); |
||||
|
||||
private final FastSyncDownloader<Void> downloader = new FastSyncDownloader<>(fastSyncActions); |
||||
|
||||
@Test |
||||
public void shouldCompleteFastSyncSuccessfully() { |
||||
final FastSyncState selectPivotBlockState = new FastSyncState(OptionalLong.of(50)); |
||||
final FastSyncState downloadPivotBlockHeaderState = |
||||
new FastSyncState( |
||||
OptionalLong.of(50), |
||||
Optional.of(new BlockHeaderTestFixture().number(50).buildHeader())); |
||||
when(fastSyncActions.waitForSuitablePeers()).thenReturn(completedFuture(null)); |
||||
when(fastSyncActions.selectPivotBlock()).thenReturn(selectPivotBlockState); |
||||
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState)) |
||||
.thenReturn(completedFuture(downloadPivotBlockHeaderState)); |
||||
|
||||
final CompletableFuture<FastSyncState> result = downloader.start(); |
||||
|
||||
verify(fastSyncActions).waitForSuitablePeers(); |
||||
verify(fastSyncActions).selectPivotBlock(); |
||||
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState); |
||||
verifyNoMoreInteractions(fastSyncActions); |
||||
assertCompletedExceptionally(result, FAST_SYNC_UNAVAILABLE); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldAbortIfWaitForSuitablePeersFails() { |
||||
when(fastSyncActions.waitForSuitablePeers()) |
||||
.thenReturn(completedExceptionally(new FastSyncException(UNEXPECTED_ERROR))); |
||||
|
||||
final CompletableFuture<FastSyncState> result = downloader.start(); |
||||
|
||||
assertCompletedExceptionally(result, UNEXPECTED_ERROR); |
||||
|
||||
verify(fastSyncActions).waitForSuitablePeers(); |
||||
verifyNoMoreInteractions(fastSyncActions); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldAbortIfSelectPivotBlockFails() { |
||||
when(fastSyncActions.waitForSuitablePeers()).thenReturn(completedFuture(null)); |
||||
when(fastSyncActions.selectPivotBlock()).thenThrow(new FastSyncException(CHAIN_TOO_SHORT)); |
||||
|
||||
final CompletableFuture<FastSyncState> result = downloader.start(); |
||||
|
||||
assertCompletedExceptionally(result, CHAIN_TOO_SHORT); |
||||
|
||||
verify(fastSyncActions).waitForSuitablePeers(); |
||||
verify(fastSyncActions).selectPivotBlock(); |
||||
verifyNoMoreInteractions(fastSyncActions); |
||||
} |
||||
|
||||
private <T> CompletableFuture<T> completedExceptionally(final Throwable error) { |
||||
final CompletableFuture<T> result = new CompletableFuture<>(); |
||||
result.completeExceptionally(error); |
||||
return result; |
||||
} |
||||
|
||||
private <T> void assertCompletedExceptionally( |
||||
final CompletableFuture<T> future, final FastSyncError expectedError) { |
||||
assertThat(future).isCompletedExceptionally(); |
||||
future.exceptionally( |
||||
actualError -> { |
||||
assertThat(actualError) |
||||
.isInstanceOf(FastSyncException.class) |
||||
.extracting(ex -> ((FastSyncException) ex).getError()) |
||||
.isEqualTo(expectedError); |
||||
return null; |
||||
}); |
||||
} |
||||
} |
@ -0,0 +1,181 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.when; |
||||
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_TIMER; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.ProtocolContext; |
||||
import tech.pegasys.pantheon.ethereum.chain.Blockchain; |
||||
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; |
||||
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil; |
||||
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; |
||||
import tech.pegasys.pantheon.metrics.LabelledMetric; |
||||
import tech.pegasys.pantheon.metrics.OperationTimer; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
|
||||
import java.util.Optional; |
||||
import java.util.OptionalLong; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.concurrent.atomic.AtomicBoolean; |
||||
import java.util.concurrent.atomic.AtomicReference; |
||||
|
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
|
||||
public class PivotBlockRetrieverTest { |
||||
|
||||
private static final long PIVOT_BLOCK_NUMBER = 10; |
||||
|
||||
private ProtocolContext<Void> protocolContext; |
||||
|
||||
private final LabelledMetric<OperationTimer> ethTasksTimer = NO_OP_LABELLED_TIMER; |
||||
private final AtomicBoolean timeout = new AtomicBoolean(false); |
||||
private EthProtocolManager ethProtocolManager; |
||||
private MutableBlockchain blockchain; |
||||
private PivotBlockRetriever<Void> pivotBlockRetriever; |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
final BlockchainSetupUtil<Void> blockchainSetupUtil = BlockchainSetupUtil.forTesting(); |
||||
blockchainSetupUtil.importAllBlocks(); |
||||
blockchain = blockchainSetupUtil.getBlockchain(); |
||||
final ProtocolSchedule<Void> protocolSchedule = blockchainSetupUtil.getProtocolSchedule(); |
||||
protocolContext = blockchainSetupUtil.getProtocolContext(); |
||||
ethProtocolManager = |
||||
EthProtocolManagerTestUtil.create( |
||||
blockchain, blockchainSetupUtil.getWorldArchive(), timeout::get); |
||||
pivotBlockRetriever = |
||||
new PivotBlockRetriever<>( |
||||
protocolSchedule, ethProtocolManager.ethContext(), ethTasksTimer, PIVOT_BLOCK_NUMBER); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldSucceedWhenAllPeersAgree() { |
||||
final Responder responder = |
||||
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); |
||||
final RespondingEthPeer respondingPeerA = |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); |
||||
final RespondingEthPeer respondingPeerB = |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); |
||||
final RespondingEthPeer respondingPeerC = |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); |
||||
|
||||
final CompletableFuture<FastSyncState> future = pivotBlockRetriever.downloadPivotBlockHeader(); |
||||
while (!future.isDone()) { |
||||
respondingPeerA.respond(responder); |
||||
respondingPeerB.respond(responder); |
||||
respondingPeerC.respond(responder); |
||||
} |
||||
|
||||
assertThat(future) |
||||
.isCompletedWithValue( |
||||
new FastSyncState( |
||||
OptionalLong.of(PIVOT_BLOCK_NUMBER), |
||||
blockchain.getBlockHeader(PIVOT_BLOCK_NUMBER))); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldIgnorePeersThatDoNotHaveThePivotBlock() { |
||||
final Responder responder = |
||||
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); |
||||
final RespondingEthPeer respondingPeerA = |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1); |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1); |
||||
|
||||
final CompletableFuture<FastSyncState> future = pivotBlockRetriever.downloadPivotBlockHeader(); |
||||
while (!future.isDone()) { |
||||
respondingPeerA.respondWhile(responder, () -> !future.isDone()); |
||||
} |
||||
|
||||
assertThat(future) |
||||
.isCompletedWithValue( |
||||
new FastSyncState( |
||||
OptionalLong.of(PIVOT_BLOCK_NUMBER), |
||||
blockchain.getBlockHeader(PIVOT_BLOCK_NUMBER))); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldSucceedWhenMajorityOfPeersAgree() { |
||||
final Responder responder = |
||||
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); |
||||
final Responder fakeResponder = responderForFakeBlock(); |
||||
|
||||
final RespondingEthPeer respondingPeerA = |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); |
||||
final RespondingEthPeer respondingPeerB = |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); |
||||
final RespondingEthPeer respondingPeerC = |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); |
||||
|
||||
final CompletableFuture<FastSyncState> future = pivotBlockRetriever.downloadPivotBlockHeader(); |
||||
while (!future.isDone()) { |
||||
respondingPeerA.respond(responder); |
||||
respondingPeerB.respond(fakeResponder); |
||||
respondingPeerC.respond(responder); |
||||
} |
||||
|
||||
assertThat(future) |
||||
.isCompletedWithValue( |
||||
new FastSyncState( |
||||
OptionalLong.of(PIVOT_BLOCK_NUMBER), |
||||
blockchain.getBlockHeader(PIVOT_BLOCK_NUMBER))); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldFailWhenPeersReturnDifferentHeaders() { |
||||
final Responder responderA = |
||||
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); |
||||
final RespondingEthPeer respondingPeerA = |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); |
||||
|
||||
final Responder responderB = responderForFakeBlock(); |
||||
final RespondingEthPeer respondingPeerB = |
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); |
||||
|
||||
// Execute task and wait for response
|
||||
final AtomicReference<Throwable> actualError = new AtomicReference<>(); |
||||
final CompletableFuture<FastSyncState> future = pivotBlockRetriever.downloadPivotBlockHeader(); |
||||
while (!future.isDone()) { |
||||
respondingPeerA.respond(responderA); |
||||
respondingPeerB.respond(responderB); |
||||
} |
||||
future.whenComplete((result, error) -> actualError.set(error)); |
||||
|
||||
assertThat(future).isCompletedExceptionally(); |
||||
assertThat(actualError.get()) |
||||
.isInstanceOf(FastSyncException.class) |
||||
.extracting(e -> ((FastSyncException) e).getError()) |
||||
.isEqualTo(FastSyncError.PIVOT_BLOCK_HEADER_MISMATCH); |
||||
} |
||||
|
||||
private Responder responderForFakeBlock() { |
||||
final Blockchain mockBlockchain = mock(Blockchain.class); |
||||
when(mockBlockchain.getBlockHeader(PIVOT_BLOCK_NUMBER)) |
||||
.thenReturn( |
||||
Optional.of( |
||||
new BlockHeaderTestFixture() |
||||
.number(PIVOT_BLOCK_NUMBER) |
||||
.extraData(BytesValue.of(1)) |
||||
.buildHeader())); |
||||
return RespondingEthPeer.blockchainResponder(mockBlockchain); |
||||
} |
||||
} |
@ -0,0 +1,46 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.eth.sync.tasks; |
||||
|
||||
import static java.util.Collections.singletonList; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.core.BlockHeader; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest; |
||||
|
||||
import java.util.List; |
||||
|
||||
import org.junit.Ignore; |
||||
import org.junit.Test; |
||||
|
||||
public class RetryingGetHeaderFromPeerByNumberTaskTest |
||||
extends RetryingMessageTaskTest<List<BlockHeader>> { |
||||
|
||||
private static final long PIVOT_BLOCK_NUMBER = 10; |
||||
|
||||
@Override |
||||
protected List<BlockHeader> generateDataToBeRequested() { |
||||
return singletonList(blockchain.getBlockHeader(PIVOT_BLOCK_NUMBER).get()); |
||||
} |
||||
|
||||
@Override |
||||
protected EthTask<List<BlockHeader>> createTask(final List<BlockHeader> requestedData) { |
||||
return RetryingGetHeaderFromPeerByNumberTask.forPivotBlock( |
||||
protocolSchedule, ethContext, ethTasksTimer, PIVOT_BLOCK_NUMBER, maxRetries); |
||||
} |
||||
|
||||
@Test |
||||
@Override |
||||
@Ignore("It's not possible to return a partial result as we only ever request one header.") |
||||
public void failsWhenPeerReturnsPartialResultThenStops() {} |
||||
} |
Loading…
Reference in new issue