diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainDownloader.java similarity index 63% rename from ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloader.java rename to ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainDownloader.java index ac728d3b7e..4770d56d10 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainDownloader.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 ConsenSys AG. + * Copyright 2019 ConsenSys AG. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -17,18 +17,12 @@ import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult; -import tech.pegasys.pantheon.ethereum.eth.manager.ChainState; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget; -import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DetermineCommonAncestorTask; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetHeadersFromPeerByHashTask; -import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ImportBlocksTask; -import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask; -import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeersTask; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; @@ -36,7 +30,6 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.Discon import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.util.ExceptionUtils; -import tech.pegasys.pantheon.util.uint.UInt256; import java.time.Duration; import java.util.ArrayList; @@ -50,43 +43,47 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.collect.Lists; +import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class FullSyncDownloader { +public class ChainDownloader { private static final Logger LOG = LogManager.getLogger(); private final SynchronizerConfiguration config; - private final ProtocolSchedule protocolSchedule; private final ProtocolContext protocolContext; private final EthContext ethContext; private final SyncState syncState; + private final SyncTargetManager syncTargetManager; + private final BlockImportTaskFactory blockImportTaskFactory; + private final ProtocolSchedule protocolSchedule; private final LabelledMetric ethTasksTimer; private final Deque checkpointHeaders = new ConcurrentLinkedDeque<>(); private int checkpointTimeouts = 0; private int chainSegmentTimeouts = 0; - private volatile boolean syncTargetDisconnected = false; private final AtomicBoolean started = new AtomicBoolean(false); - private long syncTargetDisconnectListenerId; - protected CompletableFuture currentTask; + private CompletableFuture currentTask; - FullSyncDownloader( + public ChainDownloader( final SynchronizerConfiguration config, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, final SyncState syncState, - final LabelledMetric ethTasksTimer) { + final LabelledMetric ethTasksTimer, + final SyncTargetManager syncTargetManager, + final BlockImportTaskFactory blockImportTaskFactory) { + this.protocolSchedule = protocolSchedule; this.ethTasksTimer = ethTasksTimer; this.config = config; - this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; this.syncState = syncState; + this.syncTargetManager = syncTargetManager; + this.blockImportTaskFactory = blockImportTaskFactory; } public void start() { @@ -98,11 +95,16 @@ public class FullSyncDownloader { } } + @VisibleForTesting + public CompletableFuture getCurrentTask() { + return currentTask; + } + private CompletableFuture executeDownload() { // Find target, pull checkpoint headers, import, repeat currentTask = waitForPeers() - .thenCompose(r -> findSyncTarget()) + .thenCompose(r -> syncTargetManager.findSyncTarget()) .thenCompose(this::pullCheckpointHeaders) .thenCompose(r -> importBlocks()) .thenCompose(r -> checkSyncTarget()) @@ -132,75 +134,15 @@ public class FullSyncDownloader { return WaitForPeersTask.create(ethContext, 1, ethTasksTimer).run(); } - private CompletableFuture waitForNewPeer() { - return ethContext - .getScheduler() - .timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5)); - } - - private CompletableFuture findSyncTarget() { - final Optional maybeSyncTarget = syncState.syncTarget(); - if (maybeSyncTarget.isPresent()) { - // Nothing to do - return CompletableFuture.completedFuture(maybeSyncTarget.get()); - } - - final Optional maybeBestPeer = ethContext.getEthPeers().bestPeer(); - if (!maybeBestPeer.isPresent()) { - LOG.info("No sync target, wait for peers."); - return waitForPeerAndThenSetSyncTarget(); - } else { - final EthPeer bestPeer = maybeBestPeer.get(); - final long peerHeight = bestPeer.chainState().getEstimatedHeight(); - final UInt256 peerTd = bestPeer.chainState().getBestBlock().getTotalDifficulty(); - if (peerTd.compareTo(syncState.chainHeadTotalDifficulty()) <= 0 - && peerHeight <= syncState.chainHeadNumber()) { - // We're caught up to our best peer, try again when a new peer connects - LOG.debug("Caught up to best peer: " + bestPeer.chainState().getEstimatedHeight()); - return waitForPeerAndThenSetSyncTarget(); - } - return DetermineCommonAncestorTask.create( - protocolSchedule, - protocolContext, - ethContext, - bestPeer, - config.downloaderHeaderRequestSize(), - ethTasksTimer) - .run() - .handle((r, t) -> r) - .thenCompose( - (target) -> { - if (target == null) { - return waitForPeerAndThenSetSyncTarget(); - } - final SyncTarget syncTarget = syncState.setSyncTarget(bestPeer, target); - LOG.info( - "Found common ancestor with peer {} at block {}", bestPeer, target.getNumber()); - syncTargetDisconnectListenerId = - bestPeer.subscribeDisconnect(this::onSyncTargetPeerDisconnect); - return CompletableFuture.completedFuture(syncTarget); - }); - } - } - - private CompletableFuture waitForPeerAndThenSetSyncTarget() { - return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget()); - } - - private void onSyncTargetPeerDisconnect(final EthPeer ethPeer) { - LOG.info("Sync target disconnected: {}", ethPeer); - syncTargetDisconnected = true; - } - private CompletableFuture checkSyncTarget() { final Optional maybeSyncTarget = syncState.syncTarget(); if (!maybeSyncTarget.isPresent()) { - // Nothing to do + // No sync target, so nothing to check. return CompletableFuture.completedFuture(null); } final SyncTarget syncTarget = maybeSyncTarget.get(); - if (shouldSwitchSyncTarget(syncTarget)) { + if (syncTargetManager.shouldSwitchSyncTarget(syncTarget)) { LOG.info("Better sync target found, clear current sync target: {}.", syncTarget); clearSyncTarget(syncTarget); return CompletableFuture.completedFuture(null); @@ -218,41 +160,10 @@ public class FullSyncDownloader { return CompletableFuture.completedFuture(null); } - private boolean shouldSwitchSyncTarget(final SyncTarget currentTarget) { - final EthPeer currentPeer = currentTarget.peer(); - final ChainState currentPeerChainState = currentPeer.chainState(); - final Optional maybeBestPeer = ethContext.getEthPeers().bestPeer(); - - return maybeBestPeer - .map( - bestPeer -> { - if (EthPeers.BEST_CHAIN.compare(bestPeer, currentPeer) <= 0) { - // Our current target is better or equal to the best peer - return false; - } - // Require some threshold to be exceeded before switching targets to keep some - // stability - // when multiple peers are in range of each other - final ChainState bestPeerChainState = bestPeer.chainState(); - final long heightDifference = - bestPeerChainState.getEstimatedHeight() - - currentPeerChainState.getEstimatedHeight(); - if (heightDifference == 0 && bestPeerChainState.getEstimatedHeight() == 0) { - // Only check td if we don't have a height metric - final UInt256 tdDifference = - bestPeerChainState - .getBestBlock() - .getTotalDifficulty() - .minus(currentPeerChainState.getBestBlock().getTotalDifficulty()); - return tdDifference.compareTo(config.downloaderChangeTargetThresholdByTd()) > 0; - } - return heightDifference > config.downloaderChangeTargetThresholdByHeight(); - }) - .orElse(false); - } - private boolean finishedSyncingToCurrentTarget() { - return syncTargetDisconnected || checkpointsHaveTimedOut() || chainSegmentsHaveTimedOut(); + return syncTargetManager.isSyncTargetDisconnected() + || checkpointsHaveTimedOut() + || chainSegmentsHaveTimedOut(); } private boolean checkpointsHaveTimedOut() { @@ -274,13 +185,12 @@ public class FullSyncDownloader { chainSegmentTimeouts = 0; checkpointTimeouts = 0; checkpointHeaders.clear(); - syncTarget.peer().unsubscribeDisconnect(syncTargetDisconnectListenerId); - syncTargetDisconnected = false; + syncTargetManager.clearSyncTarget(syncTarget); syncState.clearSyncTarget(); } private boolean shouldDownloadMoreCheckpoints() { - return !syncTargetDisconnected + return !syncTargetManager.isSyncTargetDisconnected() && checkpointHeaders.size() < config.downloaderHeaderRequestSize() && checkpointTimeouts < config.downloaderCheckpointTimeoutsPermitted(); } @@ -290,8 +200,10 @@ public class FullSyncDownloader { return CompletableFuture.completedFuture(null); } + final BlockHeader lastHeader = + checkpointHeaders.size() > 0 ? checkpointHeaders.getLast() : syncTarget.commonAncestor(); // Try to pull more checkpoint headers - return checkpointHeadersTask(syncTarget) + return checkpointHeadersTask(lastHeader, syncTarget) .run() .handle( (r, t) -> { @@ -321,9 +233,7 @@ public class FullSyncDownloader { } private EthTask>> checkpointHeadersTask( - final SyncTarget syncTarget) { - final BlockHeader lastHeader = - checkpointHeaders.size() > 0 ? checkpointHeaders.getLast() : syncTarget.commonAncestor(); + final BlockHeader lastHeader, final SyncTarget syncTarget) { LOG.debug("Requesting checkpoint headers from {}", lastHeader.getNumber()); return GetHeadersFromPeerByHashTask.startingAtHash( protocolSchedule, @@ -342,29 +252,8 @@ public class FullSyncDownloader { return CompletableFuture.completedFuture(Collections.emptyList()); } - final CompletableFuture> importedBlocks; - if (checkpointHeaders.size() < 2) { - // Download blocks without constraining the end block - final ImportBlocksTask importTask = - ImportBlocksTask.fromHeader( - protocolSchedule, - protocolContext, - ethContext, - checkpointHeaders.getFirst(), - config.downloaderChainSegmentSize(), - ethTasksTimer); - importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult); - } else { - final PipelinedImportChainSegmentTask importTask = - PipelinedImportChainSegmentTask.forCheckpoints( - protocolSchedule, - protocolContext, - ethContext, - config.downloaderParallelism(), - ethTasksTimer, - Lists.newArrayList(checkpointHeaders)); - importedBlocks = importTask.run(); - } + final CompletableFuture> importedBlocks = + blockImportTaskFactory.importBlocksForCheckpoints(checkpointHeaders); return importedBlocks.whenComplete( (r, t) -> { @@ -418,4 +307,9 @@ public class FullSyncDownloader { syncState.setCommonAncestor(lastImportedCheckpointHeader); return imported.size() > 1; } + + public interface BlockImportTaskFactory { + CompletableFuture> importBlocksForCheckpoints( + final Deque checkpointHeaders); + } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index 8bfaaeda21..0bcfcafbb9 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -22,6 +22,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncActions; import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncDownloader; import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncException; import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState; +import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullSyncDownloader; import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SyncTargetManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SyncTargetManager.java new file mode 100644 index 0000000000..87fe4bb4d6 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SyncTargetManager.java @@ -0,0 +1,123 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync; + +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DetermineCommonAncestorTask; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class SyncTargetManager { + + private static final Logger LOG = LogManager.getLogger(); + private volatile long syncTargetDisconnectListenerId; + private volatile boolean syncTargetDisconnected = false; + private final SynchronizerConfiguration config; + private final ProtocolSchedule protocolSchedule; + private final ProtocolContext protocolContext; + private final EthContext ethContext; + private final SyncState syncState; + private final LabelledMetric ethTasksTimer; + + public SyncTargetManager( + final SynchronizerConfiguration config, + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final EthContext ethContext, + final SyncState syncState, + final LabelledMetric ethTasksTimer) { + this.config = config; + this.protocolSchedule = protocolSchedule; + this.protocolContext = protocolContext; + this.ethContext = ethContext; + this.syncState = syncState; + this.ethTasksTimer = ethTasksTimer; + } + + public CompletableFuture findSyncTarget() { + final Optional maybeSyncTarget = syncState.syncTarget(); + if (maybeSyncTarget.isPresent()) { + // Nothing to do + return CompletableFuture.completedFuture(maybeSyncTarget.get()); + } + + final Optional maybeBestPeer = selectBestAvailableSyncTarget(); + if (maybeBestPeer.isPresent()) { + final EthPeer bestPeer = maybeBestPeer.get(); + return DetermineCommonAncestorTask.create( + protocolSchedule, + protocolContext, + ethContext, + bestPeer, + config.downloaderHeaderRequestSize(), + ethTasksTimer) + .run() + .handle((r, t) -> r) + .thenCompose( + (target) -> { + if (target == null) { + return waitForPeerAndThenSetSyncTarget(); + } + final SyncTarget syncTarget = syncState.setSyncTarget(bestPeer, target); + LOG.info( + "Found common ancestor with peer {} at block {}", bestPeer, target.getNumber()); + syncTargetDisconnectListenerId = + bestPeer.subscribeDisconnect(this::onSyncTargetPeerDisconnect); + return CompletableFuture.completedFuture(syncTarget); + }); + } else { + return waitForPeerAndThenSetSyncTarget(); + } + } + + protected abstract Optional selectBestAvailableSyncTarget(); + + private CompletableFuture waitForPeerAndThenSetSyncTarget() { + return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget()); + } + + private CompletableFuture waitForNewPeer() { + return ethContext + .getScheduler() + .timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5)); + } + + private void onSyncTargetPeerDisconnect(final EthPeer ethPeer) { + LOG.info("Sync target disconnected: {}", ethPeer); + syncTargetDisconnected = true; + } + + public boolean isSyncTargetDisconnected() { + return syncTargetDisconnected; + } + + public void clearSyncTarget(final SyncTarget syncTarget) { + syncTarget.peer().unsubscribeDisconnect(syncTargetDisconnectListenerId); + syncTargetDisconnected = false; + } + + public abstract boolean shouldSwitchSyncTarget(final SyncTarget currentTarget); +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloader.java new file mode 100644 index 0000000000..6643dada98 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloader.java @@ -0,0 +1,105 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fullsync; + +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.core.Block; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.sync.ChainDownloader; +import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ImportBlocksTask; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.Deque; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + +public class FullSyncDownloader { + private final ChainDownloader chainDownloader; + private final SynchronizerConfiguration config; + private final ProtocolSchedule protocolSchedule; + private final ProtocolContext protocolContext; + private final EthContext ethContext; + private final LabelledMetric ethTasksTimer; + + public FullSyncDownloader( + final SynchronizerConfiguration config, + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final EthContext ethContext, + final SyncState syncState, + final LabelledMetric ethTasksTimer) { + this.config = config; + this.protocolSchedule = protocolSchedule; + this.protocolContext = protocolContext; + this.ethContext = ethContext; + this.ethTasksTimer = ethTasksTimer; + chainDownloader = + new ChainDownloader<>( + config, + protocolSchedule, + protocolContext, + ethContext, + syncState, + ethTasksTimer, + new FullSyncTargetManager<>( + config, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer), + this::importBlocksForCheckpoints); + } + + public void start() { + chainDownloader.start(); + } + + @VisibleForTesting + CompletableFuture getCurrentTask() { + return chainDownloader.getCurrentTask(); + } + + private CompletableFuture> importBlocksForCheckpoints( + final Deque checkpointHeaders) { + final CompletableFuture> importedBlocks; + if (checkpointHeaders.size() < 2) { + // Download blocks without constraining the end block + final ImportBlocksTask importTask = + ImportBlocksTask.fromHeader( + protocolSchedule, + protocolContext, + ethContext, + checkpointHeaders.getFirst(), + config.downloaderChainSegmentSize(), + ethTasksTimer); + importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult); + } else { + final PipelinedImportChainSegmentTask importTask = + PipelinedImportChainSegmentTask.forCheckpoints( + protocolSchedule, + protocolContext, + ethContext, + config.downloaderParallelism(), + ethTasksTimer, + Lists.newArrayList(checkpointHeaders)); + importedBlocks = importTask.run(); + } + return importedBlocks; + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManager.java new file mode 100644 index 0000000000..2daa8d5172 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManager.java @@ -0,0 +1,107 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fullsync; + +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.eth.manager.ChainState; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; +import tech.pegasys.pantheon.ethereum.eth.sync.SyncTargetManager; +import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.util.uint.UInt256; + +import java.util.Optional; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +class FullSyncTargetManager extends SyncTargetManager { + + private static final Logger LOG = LogManager.getLogger(); + private final SynchronizerConfiguration config; + private final EthContext ethContext; + private final SyncState syncState; + + FullSyncTargetManager( + final SynchronizerConfiguration config, + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final EthContext ethContext, + final SyncState syncState, + final LabelledMetric ethTasksTimer) { + super(config, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer); + this.config = config; + this.ethContext = ethContext; + this.syncState = syncState; + } + + @Override + protected Optional selectBestAvailableSyncTarget() { + final Optional maybeBestPeer = ethContext.getEthPeers().bestPeer(); + if (!maybeBestPeer.isPresent()) { + LOG.info("No sync target, wait for peers."); + return Optional.empty(); + } else { + final EthPeer bestPeer = maybeBestPeer.get(); + final long peerHeight = bestPeer.chainState().getEstimatedHeight(); + final UInt256 peerTd = bestPeer.chainState().getBestBlock().getTotalDifficulty(); + if (peerTd.compareTo(syncState.chainHeadTotalDifficulty()) <= 0 + && peerHeight <= syncState.chainHeadNumber()) { + // We're caught up to our best peer, try again when a new peer connects + LOG.debug("Caught up to best peer: " + bestPeer.chainState().getEstimatedHeight()); + return Optional.empty(); + } + return maybeBestPeer; + } + } + + @Override + public boolean shouldSwitchSyncTarget(final SyncTarget currentTarget) { + final EthPeer currentPeer = currentTarget.peer(); + final ChainState currentPeerChainState = currentPeer.chainState(); + final Optional maybeBestPeer = ethContext.getEthPeers().bestPeer(); + + return maybeBestPeer + .map( + bestPeer -> { + if (EthPeers.BEST_CHAIN.compare(bestPeer, currentPeer) <= 0) { + // Our current target is better or equal to the best peer + return false; + } + // Require some threshold to be exceeded before switching targets to keep some + // stability + // when multiple peers are in range of each other + final ChainState bestPeerChainState = bestPeer.chainState(); + final long heightDifference = + bestPeerChainState.getEstimatedHeight() + - currentPeerChainState.getEstimatedHeight(); + if (heightDifference == 0 && bestPeerChainState.getEstimatedHeight() == 0) { + // Only check td if we don't have a height metric + final UInt256 tdDifference = + bestPeerChainState + .getBestBlock() + .getTotalDifficulty() + .minus(currentPeerChainState.getBestBlock().getTotalDifficulty()); + return tdDifference.compareTo(config.downloaderChangeTargetThresholdByTd()) > 0; + } + return heightDifference > config.downloaderChangeTargetThresholdByHeight(); + }) + .orElse(false); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncTarget.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncTarget.java index 3850907b85..6ee1ffb4ad 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncTarget.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncTarget.java @@ -24,7 +24,7 @@ public class SyncTarget { private final EthPeer peer; private BlockHeader commonAncestor; - SyncTarget(final EthPeer peer, final BlockHeader commonAncestor) { + public SyncTarget(final EthPeer peer, final BlockHeader commonAncestor) { this.peer = peer; this.commonAncestor = commonAncestor; } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTask.java index f45306fc50..a6cdffd6fb 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTask.java @@ -111,6 +111,25 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask ethTasksTimer); } + public static AbstractGetHeadersFromPeerTask endingAtHash( + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final Hash lastHash, + final long lastBlockNumber, + final int segmentLength, + final int skip, + final LabelledMetric ethTasksTimer) { + return new GetHeadersFromPeerByHashTask( + protocolSchedule, + ethContext, + lastHash, + lastBlockNumber, + segmentLength, + skip, + true, + ethTasksTimer); + } + public static AbstractGetHeadersFromPeerTask forSingleHash( final ProtocolSchedule protocolSchedule, final EthContext ethContext, diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java similarity index 97% rename from ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloaderTest.java rename to ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java index c596626a92..1087605178 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package tech.pegasys.pantheon.ethereum.eth.sync; +package tech.pegasys.pantheon.ethereum.eth.sync.fullsync; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -36,6 +36,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62; import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockHeadersMessage; +import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; @@ -333,8 +334,8 @@ public class FullSyncDownloaderTest { peerB.getEthPeer().chainState().update(gen.hash(), 100); // Process through first task cycle - final CompletableFuture firstTask = downloader.currentTask; - while (downloader.currentTask == firstTask) { + final CompletableFuture firstTask = downloader.getCurrentTask(); + while (downloader.getCurrentTask() == firstTask) { RespondingEthPeer.respondOnce(responder, peerA, peerB); } @@ -373,8 +374,8 @@ public class FullSyncDownloaderTest { otherPeer.getEthPeer().chainState().update(gen.hash(), 100); // Process through first task cycle - final CompletableFuture firstTask = downloader.currentTask; - while (downloader.currentTask == firstTask) { + final CompletableFuture firstTask = downloader.getCurrentTask(); + while (downloader.getCurrentTask() == firstTask) { RespondingEthPeer.respondOnce(responder, bestPeer, otherPeer); } @@ -416,8 +417,8 @@ public class FullSyncDownloaderTest { .update(gen.header(), syncState.chainHeadTotalDifficulty().plus(300)); // Process through first task cycle - final CompletableFuture firstTask = downloader.currentTask; - while (downloader.currentTask == firstTask) { + final CompletableFuture firstTask = downloader.getCurrentTask(); + while (downloader.getCurrentTask() == firstTask) { RespondingEthPeer.respondOnce(responder, peerA, peerB); } @@ -467,8 +468,8 @@ public class FullSyncDownloaderTest { .update(gen.header(1000), syncState.chainHeadTotalDifficulty().plus(300)); // Process through first task cycle - final CompletableFuture firstTask = downloader.currentTask; - while (downloader.currentTask == firstTask) { + final CompletableFuture firstTask = downloader.getCurrentTask(); + while (downloader.getCurrentTask() == firstTask) { RespondingEthPeer.respondOnce(responder, bestPeer, otherPeer); }