[NC-2138] Extract out generic parts of Downloader (#659)

Separate the management of sync target and actual import from the rest of the Downloader logic in preparation for introducing a fast sync chain downloader.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent de9e7f40fd
commit 3249bbfbd3
  1. 182
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainDownloader.java
  2. 1
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java
  3. 123
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SyncTargetManager.java
  4. 105
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloader.java
  5. 107
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManager.java
  6. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncTarget.java
  7. 19
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTask.java
  8. 19
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java

@ -1,5 +1,5 @@
/*
* Copyright 2018 ConsenSys AG.
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@ -17,18 +17,12 @@ import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.ChainState;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DetermineCommonAncestorTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetHeadersFromPeerByHashTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ImportBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeersTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
@ -36,7 +30,6 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.Discon
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.time.Duration;
import java.util.ArrayList;
@ -50,43 +43,47 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.Lists;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class FullSyncDownloader<C> {
public class ChainDownloader<C> {
private static final Logger LOG = LogManager.getLogger();
private final SynchronizerConfiguration config;
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final SyncState syncState;
private final SyncTargetManager<C> syncTargetManager;
private final BlockImportTaskFactory blockImportTaskFactory;
private final ProtocolSchedule<C> protocolSchedule;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final Deque<BlockHeader> checkpointHeaders = new ConcurrentLinkedDeque<>();
private int checkpointTimeouts = 0;
private int chainSegmentTimeouts = 0;
private volatile boolean syncTargetDisconnected = false;
private final AtomicBoolean started = new AtomicBoolean(false);
private long syncTargetDisconnectListenerId;
protected CompletableFuture<?> currentTask;
private CompletableFuture<?> currentTask;
FullSyncDownloader(
public ChainDownloader(
final SynchronizerConfiguration config,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final LabelledMetric<OperationTimer> ethTasksTimer) {
final LabelledMetric<OperationTimer> ethTasksTimer,
final SyncTargetManager<C> syncTargetManager,
final BlockImportTaskFactory blockImportTaskFactory) {
this.protocolSchedule = protocolSchedule;
this.ethTasksTimer = ethTasksTimer;
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.syncState = syncState;
this.syncTargetManager = syncTargetManager;
this.blockImportTaskFactory = blockImportTaskFactory;
}
public void start() {
@ -98,11 +95,16 @@ public class FullSyncDownloader<C> {
}
}
@VisibleForTesting
public CompletableFuture<?> getCurrentTask() {
return currentTask;
}
private CompletableFuture<?> executeDownload() {
// Find target, pull checkpoint headers, import, repeat
currentTask =
waitForPeers()
.thenCompose(r -> findSyncTarget())
.thenCompose(r -> syncTargetManager.findSyncTarget())
.thenCompose(this::pullCheckpointHeaders)
.thenCompose(r -> importBlocks())
.thenCompose(r -> checkSyncTarget())
@ -132,75 +134,15 @@ public class FullSyncDownloader<C> {
return WaitForPeersTask.create(ethContext, 1, ethTasksTimer).run();
}
private CompletableFuture<?> waitForNewPeer() {
return ethContext
.getScheduler()
.timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5));
}
private CompletableFuture<SyncTarget> findSyncTarget() {
final Optional<SyncTarget> maybeSyncTarget = syncState.syncTarget();
if (maybeSyncTarget.isPresent()) {
// Nothing to do
return CompletableFuture.completedFuture(maybeSyncTarget.get());
}
final Optional<EthPeer> maybeBestPeer = ethContext.getEthPeers().bestPeer();
if (!maybeBestPeer.isPresent()) {
LOG.info("No sync target, wait for peers.");
return waitForPeerAndThenSetSyncTarget();
} else {
final EthPeer bestPeer = maybeBestPeer.get();
final long peerHeight = bestPeer.chainState().getEstimatedHeight();
final UInt256 peerTd = bestPeer.chainState().getBestBlock().getTotalDifficulty();
if (peerTd.compareTo(syncState.chainHeadTotalDifficulty()) <= 0
&& peerHeight <= syncState.chainHeadNumber()) {
// We're caught up to our best peer, try again when a new peer connects
LOG.debug("Caught up to best peer: " + bestPeer.chainState().getEstimatedHeight());
return waitForPeerAndThenSetSyncTarget();
}
return DetermineCommonAncestorTask.create(
protocolSchedule,
protocolContext,
ethContext,
bestPeer,
config.downloaderHeaderRequestSize(),
ethTasksTimer)
.run()
.handle((r, t) -> r)
.thenCompose(
(target) -> {
if (target == null) {
return waitForPeerAndThenSetSyncTarget();
}
final SyncTarget syncTarget = syncState.setSyncTarget(bestPeer, target);
LOG.info(
"Found common ancestor with peer {} at block {}", bestPeer, target.getNumber());
syncTargetDisconnectListenerId =
bestPeer.subscribeDisconnect(this::onSyncTargetPeerDisconnect);
return CompletableFuture.completedFuture(syncTarget);
});
}
}
private CompletableFuture<SyncTarget> waitForPeerAndThenSetSyncTarget() {
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget());
}
private void onSyncTargetPeerDisconnect(final EthPeer ethPeer) {
LOG.info("Sync target disconnected: {}", ethPeer);
syncTargetDisconnected = true;
}
private CompletableFuture<Void> checkSyncTarget() {
final Optional<SyncTarget> maybeSyncTarget = syncState.syncTarget();
if (!maybeSyncTarget.isPresent()) {
// Nothing to do
// No sync target, so nothing to check.
return CompletableFuture.completedFuture(null);
}
final SyncTarget syncTarget = maybeSyncTarget.get();
if (shouldSwitchSyncTarget(syncTarget)) {
if (syncTargetManager.shouldSwitchSyncTarget(syncTarget)) {
LOG.info("Better sync target found, clear current sync target: {}.", syncTarget);
clearSyncTarget(syncTarget);
return CompletableFuture.completedFuture(null);
@ -218,41 +160,10 @@ public class FullSyncDownloader<C> {
return CompletableFuture.completedFuture(null);
}
private boolean shouldSwitchSyncTarget(final SyncTarget currentTarget) {
final EthPeer currentPeer = currentTarget.peer();
final ChainState currentPeerChainState = currentPeer.chainState();
final Optional<EthPeer> maybeBestPeer = ethContext.getEthPeers().bestPeer();
return maybeBestPeer
.map(
bestPeer -> {
if (EthPeers.BEST_CHAIN.compare(bestPeer, currentPeer) <= 0) {
// Our current target is better or equal to the best peer
return false;
}
// Require some threshold to be exceeded before switching targets to keep some
// stability
// when multiple peers are in range of each other
final ChainState bestPeerChainState = bestPeer.chainState();
final long heightDifference =
bestPeerChainState.getEstimatedHeight()
- currentPeerChainState.getEstimatedHeight();
if (heightDifference == 0 && bestPeerChainState.getEstimatedHeight() == 0) {
// Only check td if we don't have a height metric
final UInt256 tdDifference =
bestPeerChainState
.getBestBlock()
.getTotalDifficulty()
.minus(currentPeerChainState.getBestBlock().getTotalDifficulty());
return tdDifference.compareTo(config.downloaderChangeTargetThresholdByTd()) > 0;
}
return heightDifference > config.downloaderChangeTargetThresholdByHeight();
})
.orElse(false);
}
private boolean finishedSyncingToCurrentTarget() {
return syncTargetDisconnected || checkpointsHaveTimedOut() || chainSegmentsHaveTimedOut();
return syncTargetManager.isSyncTargetDisconnected()
|| checkpointsHaveTimedOut()
|| chainSegmentsHaveTimedOut();
}
private boolean checkpointsHaveTimedOut() {
@ -274,13 +185,12 @@ public class FullSyncDownloader<C> {
chainSegmentTimeouts = 0;
checkpointTimeouts = 0;
checkpointHeaders.clear();
syncTarget.peer().unsubscribeDisconnect(syncTargetDisconnectListenerId);
syncTargetDisconnected = false;
syncTargetManager.clearSyncTarget(syncTarget);
syncState.clearSyncTarget();
}
private boolean shouldDownloadMoreCheckpoints() {
return !syncTargetDisconnected
return !syncTargetManager.isSyncTargetDisconnected()
&& checkpointHeaders.size() < config.downloaderHeaderRequestSize()
&& checkpointTimeouts < config.downloaderCheckpointTimeoutsPermitted();
}
@ -290,8 +200,10 @@ public class FullSyncDownloader<C> {
return CompletableFuture.completedFuture(null);
}
final BlockHeader lastHeader =
checkpointHeaders.size() > 0 ? checkpointHeaders.getLast() : syncTarget.commonAncestor();
// Try to pull more checkpoint headers
return checkpointHeadersTask(syncTarget)
return checkpointHeadersTask(lastHeader, syncTarget)
.run()
.handle(
(r, t) -> {
@ -321,9 +233,7 @@ public class FullSyncDownloader<C> {
}
private EthTask<PeerTaskResult<List<BlockHeader>>> checkpointHeadersTask(
final SyncTarget syncTarget) {
final BlockHeader lastHeader =
checkpointHeaders.size() > 0 ? checkpointHeaders.getLast() : syncTarget.commonAncestor();
final BlockHeader lastHeader, final SyncTarget syncTarget) {
LOG.debug("Requesting checkpoint headers from {}", lastHeader.getNumber());
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
@ -342,29 +252,8 @@ public class FullSyncDownloader<C> {
return CompletableFuture.completedFuture(Collections.emptyList());
}
final CompletableFuture<List<Block>> importedBlocks;
if (checkpointHeaders.size() < 2) {
// Download blocks without constraining the end block
final ImportBlocksTask<C> importTask =
ImportBlocksTask.fromHeader(
protocolSchedule,
protocolContext,
ethContext,
checkpointHeaders.getFirst(),
config.downloaderChainSegmentSize(),
ethTasksTimer);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
} else {
final PipelinedImportChainSegmentTask<C> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
config.downloaderParallelism(),
ethTasksTimer,
Lists.newArrayList(checkpointHeaders));
importedBlocks = importTask.run();
}
final CompletableFuture<List<Block>> importedBlocks =
blockImportTaskFactory.importBlocksForCheckpoints(checkpointHeaders);
return importedBlocks.whenComplete(
(r, t) -> {
@ -418,4 +307,9 @@ public class FullSyncDownloader<C> {
syncState.setCommonAncestor(lastImportedCheckpointHeader);
return imported.size() > 1;
}
public interface BlockImportTaskFactory {
CompletableFuture<List<Block>> importBlocksForCheckpoints(
final Deque<BlockHeader> checkpointHeaders);
}
}

@ -22,6 +22,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncActions;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncException;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;

@ -0,0 +1,123 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DetermineCommonAncestorTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public abstract class SyncTargetManager<C> {
private static final Logger LOG = LogManager.getLogger();
private volatile long syncTargetDisconnectListenerId;
private volatile boolean syncTargetDisconnected = false;
private final SynchronizerConfiguration config;
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final SyncState syncState;
private final LabelledMetric<OperationTimer> ethTasksTimer;
public SyncTargetManager(
final SynchronizerConfiguration config,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.syncState = syncState;
this.ethTasksTimer = ethTasksTimer;
}
public CompletableFuture<SyncTarget> findSyncTarget() {
final Optional<SyncTarget> maybeSyncTarget = syncState.syncTarget();
if (maybeSyncTarget.isPresent()) {
// Nothing to do
return CompletableFuture.completedFuture(maybeSyncTarget.get());
}
final Optional<EthPeer> maybeBestPeer = selectBestAvailableSyncTarget();
if (maybeBestPeer.isPresent()) {
final EthPeer bestPeer = maybeBestPeer.get();
return DetermineCommonAncestorTask.create(
protocolSchedule,
protocolContext,
ethContext,
bestPeer,
config.downloaderHeaderRequestSize(),
ethTasksTimer)
.run()
.handle((r, t) -> r)
.thenCompose(
(target) -> {
if (target == null) {
return waitForPeerAndThenSetSyncTarget();
}
final SyncTarget syncTarget = syncState.setSyncTarget(bestPeer, target);
LOG.info(
"Found common ancestor with peer {} at block {}", bestPeer, target.getNumber());
syncTargetDisconnectListenerId =
bestPeer.subscribeDisconnect(this::onSyncTargetPeerDisconnect);
return CompletableFuture.completedFuture(syncTarget);
});
} else {
return waitForPeerAndThenSetSyncTarget();
}
}
protected abstract Optional<EthPeer> selectBestAvailableSyncTarget();
private CompletableFuture<SyncTarget> waitForPeerAndThenSetSyncTarget() {
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget());
}
private CompletableFuture<?> waitForNewPeer() {
return ethContext
.getScheduler()
.timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5));
}
private void onSyncTargetPeerDisconnect(final EthPeer ethPeer) {
LOG.info("Sync target disconnected: {}", ethPeer);
syncTargetDisconnected = true;
}
public boolean isSyncTargetDisconnected() {
return syncTargetDisconnected;
}
public void clearSyncTarget(final SyncTarget syncTarget) {
syncTarget.peer().unsubscribeDisconnect(syncTargetDisconnectListenerId);
syncTargetDisconnected = false;
}
public abstract boolean shouldSwitchSyncTarget(final SyncTarget currentTarget);
}

@ -0,0 +1,105 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fullsync;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.ChainDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ImportBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
public class FullSyncDownloader<C> {
private final ChainDownloader<C> chainDownloader;
private final SynchronizerConfiguration config;
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final LabelledMetric<OperationTimer> ethTasksTimer;
public FullSyncDownloader(
final SynchronizerConfiguration config,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
chainDownloader =
new ChainDownloader<>(
config,
protocolSchedule,
protocolContext,
ethContext,
syncState,
ethTasksTimer,
new FullSyncTargetManager<>(
config, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer),
this::importBlocksForCheckpoints);
}
public void start() {
chainDownloader.start();
}
@VisibleForTesting
CompletableFuture<?> getCurrentTask() {
return chainDownloader.getCurrentTask();
}
private CompletableFuture<List<Block>> importBlocksForCheckpoints(
final Deque<BlockHeader> checkpointHeaders) {
final CompletableFuture<List<Block>> importedBlocks;
if (checkpointHeaders.size() < 2) {
// Download blocks without constraining the end block
final ImportBlocksTask<C> importTask =
ImportBlocksTask.fromHeader(
protocolSchedule,
protocolContext,
ethContext,
checkpointHeaders.getFirst(),
config.downloaderChainSegmentSize(),
ethTasksTimer);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
} else {
final PipelinedImportChainSegmentTask<C> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
config.downloaderParallelism(),
ethTasksTimer,
Lists.newArrayList(checkpointHeaders));
importedBlocks = importTask.run();
}
return importedBlocks;
}
}

@ -0,0 +1,107 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fullsync;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.eth.manager.ChainState;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.eth.sync.SyncTargetManager;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
class FullSyncTargetManager<C> extends SyncTargetManager<C> {
private static final Logger LOG = LogManager.getLogger();
private final SynchronizerConfiguration config;
private final EthContext ethContext;
private final SyncState syncState;
FullSyncTargetManager(
final SynchronizerConfiguration config,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(config, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer);
this.config = config;
this.ethContext = ethContext;
this.syncState = syncState;
}
@Override
protected Optional<EthPeer> selectBestAvailableSyncTarget() {
final Optional<EthPeer> maybeBestPeer = ethContext.getEthPeers().bestPeer();
if (!maybeBestPeer.isPresent()) {
LOG.info("No sync target, wait for peers.");
return Optional.empty();
} else {
final EthPeer bestPeer = maybeBestPeer.get();
final long peerHeight = bestPeer.chainState().getEstimatedHeight();
final UInt256 peerTd = bestPeer.chainState().getBestBlock().getTotalDifficulty();
if (peerTd.compareTo(syncState.chainHeadTotalDifficulty()) <= 0
&& peerHeight <= syncState.chainHeadNumber()) {
// We're caught up to our best peer, try again when a new peer connects
LOG.debug("Caught up to best peer: " + bestPeer.chainState().getEstimatedHeight());
return Optional.empty();
}
return maybeBestPeer;
}
}
@Override
public boolean shouldSwitchSyncTarget(final SyncTarget currentTarget) {
final EthPeer currentPeer = currentTarget.peer();
final ChainState currentPeerChainState = currentPeer.chainState();
final Optional<EthPeer> maybeBestPeer = ethContext.getEthPeers().bestPeer();
return maybeBestPeer
.map(
bestPeer -> {
if (EthPeers.BEST_CHAIN.compare(bestPeer, currentPeer) <= 0) {
// Our current target is better or equal to the best peer
return false;
}
// Require some threshold to be exceeded before switching targets to keep some
// stability
// when multiple peers are in range of each other
final ChainState bestPeerChainState = bestPeer.chainState();
final long heightDifference =
bestPeerChainState.getEstimatedHeight()
- currentPeerChainState.getEstimatedHeight();
if (heightDifference == 0 && bestPeerChainState.getEstimatedHeight() == 0) {
// Only check td if we don't have a height metric
final UInt256 tdDifference =
bestPeerChainState
.getBestBlock()
.getTotalDifficulty()
.minus(currentPeerChainState.getBestBlock().getTotalDifficulty());
return tdDifference.compareTo(config.downloaderChangeTargetThresholdByTd()) > 0;
}
return heightDifference > config.downloaderChangeTargetThresholdByHeight();
})
.orElse(false);
}
}

@ -24,7 +24,7 @@ public class SyncTarget {
private final EthPeer peer;
private BlockHeader commonAncestor;
SyncTarget(final EthPeer peer, final BlockHeader commonAncestor) {
public SyncTarget(final EthPeer peer, final BlockHeader commonAncestor) {
this.peer = peer;
this.commonAncestor = commonAncestor;
}

@ -111,6 +111,25 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask
ethTasksTimer);
}
public static AbstractGetHeadersFromPeerTask endingAtHash(
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,
final Hash lastHash,
final long lastBlockNumber,
final int segmentLength,
final int skip,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetHeadersFromPeerByHashTask(
protocolSchedule,
ethContext,
lastHash,
lastBlockNumber,
segmentLength,
skip,
true,
ethTasksTimer);
}
public static AbstractGetHeadersFromPeerTask forSingleHash(
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,

@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
package tech.pegasys.pantheon.ethereum.eth.sync.fullsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@ -36,6 +36,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
@ -333,8 +334,8 @@ public class FullSyncDownloaderTest {
peerB.getEthPeer().chainState().update(gen.hash(), 100);
// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.currentTask;
while (downloader.currentTask == firstTask) {
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
while (downloader.getCurrentTask() == firstTask) {
RespondingEthPeer.respondOnce(responder, peerA, peerB);
}
@ -373,8 +374,8 @@ public class FullSyncDownloaderTest {
otherPeer.getEthPeer().chainState().update(gen.hash(), 100);
// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.currentTask;
while (downloader.currentTask == firstTask) {
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
while (downloader.getCurrentTask() == firstTask) {
RespondingEthPeer.respondOnce(responder, bestPeer, otherPeer);
}
@ -416,8 +417,8 @@ public class FullSyncDownloaderTest {
.update(gen.header(), syncState.chainHeadTotalDifficulty().plus(300));
// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.currentTask;
while (downloader.currentTask == firstTask) {
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
while (downloader.getCurrentTask() == firstTask) {
RespondingEthPeer.respondOnce(responder, peerA, peerB);
}
@ -467,8 +468,8 @@ public class FullSyncDownloaderTest {
.update(gen.header(1000), syncState.chainHeadTotalDifficulty().plus(300));
// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.currentTask;
while (downloader.currentTask == firstTask) {
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
while (downloader.getCurrentTask() == firstTask) {
RespondingEthPeer.respondOnce(responder, bestPeer, otherPeer);
}
Loading…
Cancel
Save