Remove SyncState from SyncTargetManager (#1188)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 20ebbf3337
commit f897761951
  1. 23
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainDownloader.java
  2. 23
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SyncTargetManager.java
  3. 1
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java
  4. 8
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncTargetManager.java
  5. 6
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloader.java
  6. 10
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManager.java
  7. 20
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java
  8. 9
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java
  9. 9
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java

@ -96,7 +96,8 @@ public class ChainDownloader<C> {
// Find target, pull checkpoint headers, import, repeat
currentTask =
waitForPeers()
.thenCompose(r -> syncTargetManager.findSyncTarget())
.thenCompose(r -> syncTargetManager.findSyncTarget(syncState.syncTarget()))
.thenApply(this::updateSyncState)
.thenCompose(this::pullCheckpointHeaders)
.thenCompose(this::importBlocks)
.thenCompose(r -> checkSyncTarget())
@ -128,6 +129,20 @@ public class ChainDownloader<C> {
});
}
private SyncTarget updateSyncState(final SyncTarget newTarget) {
if (isSameAsCurrentTarget(newTarget)) {
return syncState.syncTarget().get();
}
return syncState.setSyncTarget(newTarget.peer(), newTarget.commonAncestor());
}
private Boolean isSameAsCurrentTarget(final SyncTarget newTarget) {
return syncState
.syncTarget()
.map(currentTarget -> currentTarget.equals(newTarget))
.orElse(false);
}
private CompletableFuture<List<BlockHeader>> pullCheckpointHeaders(final SyncTarget syncTarget) {
return syncTargetManager.isSyncTargetDisconnected()
? CompletableFuture.completedFuture(emptyList())
@ -151,7 +166,7 @@ public class ChainDownloader<C> {
clearSyncTarget(syncTarget);
return CompletableFuture.completedFuture(null);
}
if (finishedSyncingToCurrentTarget()) {
if (finishedSyncingToCurrentTarget(syncTarget)) {
LOG.info("Finished syncing to target: {}.", syncTarget);
clearSyncTarget(syncTarget);
// Wait a bit before checking for a new sync target
@ -164,8 +179,8 @@ public class ChainDownloader<C> {
return CompletableFuture.completedFuture(null);
}
private boolean finishedSyncingToCurrentTarget() {
return !syncTargetManager.syncTargetCanProvideMoreBlocks()
private boolean finishedSyncingToCurrentTarget(final SyncTarget syncTarget) {
return !syncTargetManager.syncTargetCanProvideMoreBlocks(syncTarget)
|| checkpointHeaderManager.checkpointsHaveTimedOut()
|| chainSegmentsHaveTimedOut();
}

@ -18,7 +18,6 @@ import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.task.WaitForPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DetermineCommonAncestorTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
@ -35,8 +34,6 @@ public abstract class SyncTargetManager<C> {
private static final Logger LOG = LogManager.getLogger();
protected final SyncState syncState;
private volatile long syncTargetDisconnectListenerId;
private volatile boolean syncTargetDisconnected = false;
private final SynchronizerConfiguration config;
@ -50,19 +47,17 @@ public abstract class SyncTargetManager<C> {
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem) {
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.syncState = syncState;
this.metricsSystem = metricsSystem;
}
public CompletableFuture<SyncTarget> findSyncTarget() {
return syncState
.syncTarget()
public CompletableFuture<SyncTarget> findSyncTarget(
final Optional<SyncTarget> currentSyncTarget) {
return currentSyncTarget
.map(CompletableFuture::completedFuture) // Return an existing sync target if present
.orElseGet(this::selectNewSyncTarget);
}
@ -93,7 +88,7 @@ public abstract class SyncTargetManager<C> {
if (target == null) {
return waitForPeerAndThenSetSyncTarget();
}
final SyncTarget syncTarget = syncState.setSyncTarget(bestPeer, target);
final SyncTarget syncTarget = new SyncTarget(bestPeer, target);
LOG.info(
"Found common ancestor with peer {} at block {}",
bestPeer,
@ -120,7 +115,7 @@ public abstract class SyncTargetManager<C> {
protected abstract CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget();
private CompletableFuture<SyncTarget> waitForPeerAndThenSetSyncTarget() {
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget());
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> selectNewSyncTarget());
}
private CompletableFuture<?> waitForNewPeer() {
@ -149,12 +144,8 @@ public abstract class SyncTargetManager<C> {
public abstract boolean isSyncTargetReached(final EthPeer peer);
public boolean syncTargetCanProvideMoreBlocks() {
if (!syncState.syncTarget().isPresent()) {
LOG.warn("SyncTarget should be set, but is not.");
return false;
}
final EthPeer currentSyncingPeer = syncState.syncTarget().get().peer();
public boolean syncTargetCanProvideMoreBlocks(final SyncTarget syncTarget) {
final EthPeer currentSyncingPeer = syncTarget.peer();
return !isSyncTargetDisconnected() && !isSyncTargetReached(currentSyncingPeer);
}
}

@ -66,7 +66,6 @@ public class FastSyncChainDownloader<C> {
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
pivotBlockHeader),
new FastSyncCheckpointHeaderManager<>(

@ -16,12 +16,12 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.PivotBlockRetriever.MAX_PIVOT_BLOCK_RETRIES;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.SyncTargetManager;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByNumberTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
@ -49,10 +49,9 @@ class FastSyncTargetManager<C> extends SyncTargetManager<C> {
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem,
final BlockHeader pivotBlockHeader) {
super(config, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
super(config, protocolSchedule, protocolContext, ethContext, metricsSystem);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
@ -121,6 +120,7 @@ class FastSyncTargetManager<C> extends SyncTargetManager<C> {
@Override
public boolean isSyncTargetReached(final EthPeer peer) {
return syncState.chainHeadNumber() >= pivotBlockHeader.getNumber();
final MutableBlockchain blockchain = protocolContext.getBlockchain();
return blockchain.getChainHeadBlockNumber() >= pivotBlockHeader.getNumber();
}
}

@ -61,7 +61,7 @@ public class FullSyncDownloader<C> {
ethContext,
syncState,
new FullSyncTargetManager<>(
config, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem),
config, protocolSchedule, protocolContext, ethContext, metricsSystem),
new CheckpointHeaderManager<>(
config, protocolContext, ethContext, syncState, protocolSchedule, metricsSystem),
this::importBlocksForCheckpoints,
@ -111,6 +111,8 @@ public class FullSyncDownloader<C> {
public TrailingPeerRequirements calculateTrailingPeerRequirements() {
return syncState.isInSync()
? TrailingPeerRequirements.UNRESTRICTED
: new TrailingPeerRequirements(syncState.chainHeadNumber(), config.getMaxTrailingPeers());
: new TrailingPeerRequirements(
protocolContext.getBlockchain().getChainHeadBlockNumber(),
config.getMaxTrailingPeers());
}
}

@ -15,6 +15,7 @@ package tech.pegasys.pantheon.ethereum.eth.sync.fullsync;
import static java.util.concurrent.CompletableFuture.completedFuture;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.ChainState;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
@ -22,7 +23,6 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.eth.sync.SyncTargetManager;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
@ -47,9 +47,8 @@ class FullSyncTargetManager<C> extends SyncTargetManager<C> {
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem) {
super(config, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
super(config, protocolSchedule, protocolContext, ethContext, metricsSystem);
this.config = config;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
@ -93,9 +92,10 @@ class FullSyncTargetManager<C> extends SyncTargetManager<C> {
public boolean isSyncTargetReached(final EthPeer peer) {
final long peerHeight = peer.chainState().getEstimatedHeight();
final UInt256 peerTd = peer.chainState().getBestBlock().getTotalDifficulty();
final MutableBlockchain blockchain = protocolContext.getBlockchain();
return peerTd.compareTo(syncState.chainHeadTotalDifficulty()) <= 0
&& peerHeight <= syncState.chainHeadNumber();
return peerTd.compareTo(blockchain.getChainHead().getTotalDifficulty()) <= 0
&& peerHeight <= blockchain.getChainHeadBlockNumber();
}
@Override

@ -19,7 +19,6 @@ import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.Optional;
@ -38,7 +37,7 @@ public class SyncState {
public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {
this.blockchain = blockchain;
this.ethPeers = ethPeers;
this.startingBlock = chainHeadNumber();
this.startingBlock = this.blockchain.getChainHeadBlockNumber();
blockchain.observeBlockAdded(
(event, chain) -> {
if (event.isNewCanonicalHead()) {
@ -62,21 +61,15 @@ public class SyncState {
}
public SyncStatus syncStatus() {
return new SyncStatus(startingBlock(), chainHeadNumber(), bestChainHeight());
final long chainHeadBlockNumber = blockchain.getChainHeadBlockNumber();
return new SyncStatus(
startingBlock(), chainHeadBlockNumber, bestChainHeight(chainHeadBlockNumber));
}
public long startingBlock() {
return startingBlock;
}
public long chainHeadNumber() {
return blockchain.getChainHeadBlockNumber();
}
public UInt256 chainHeadTotalDifficulty() {
return blockchain.getChainHead().getTotalDifficulty();
}
public Optional<SyncTarget> syncTarget() {
return syncTarget;
}
@ -118,11 +111,6 @@ public class SyncState {
target.addPeerChainEstimatedHeightListener(estimatedHeight -> checkInSync());
}
public long bestChainHeight() {
final long localChainHeight = blockchain.getChainHeadBlockNumber();
return bestChainHeight(localChainHeight);
}
public long bestChainHeight(final long localChainHeight) {
return Math.max(
localChainHeight,

@ -382,7 +382,8 @@ public class FullSyncDownloaderTest {
peerB
.getEthPeer()
.chainState()
.updateForAnnouncedBlock(gen.header(), syncState.chainHeadTotalDifficulty().plus(300));
.updateForAnnouncedBlock(
gen.header(), localBlockchain.getChainHead().getTotalDifficulty().plus(300));
// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
@ -426,11 +427,13 @@ public class FullSyncDownloaderTest {
bestPeer
.getEthPeer()
.chainState()
.updateForAnnouncedBlock(gen.header(1000), syncState.chainHeadTotalDifficulty().plus(201));
.updateForAnnouncedBlock(
gen.header(1000), localBlockchain.getChainHead().getTotalDifficulty().plus(201));
otherPeer
.getEthPeer()
.chainState()
.updateForAnnouncedBlock(gen.header(1000), syncState.chainHeadTotalDifficulty().plus(300));
.updateForAnnouncedBlock(
gen.header(1000), localBlockchain.getChainHead().getTotalDifficulty().plus(300));
// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.getCurrentTask();

@ -27,13 +27,13 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.junit.Before;
@ -64,8 +64,6 @@ public class FullSyncTargetManagerTest {
EthProtocolManagerTestUtil.create(
localBlockchain, localWorldState, new EthScheduler(1, 1, 1, new NoOpMetricsSystem()));
final EthContext ethContext = ethProtocolManager.ethContext();
final SyncState syncState =
new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers());
localBlockchainSetup.importFirstBlocks(5);
otherBlockchainSetup.importFirstBlocks(20);
syncTargetManager =
@ -74,7 +72,6 @@ public class FullSyncTargetManagerTest {
protocolSchedule,
protocolContext,
ethContext,
syncState,
new NoOpMetricsSystem());
}
@ -85,7 +82,7 @@ public class FullSyncTargetManagerTest {
final RespondingEthPeer bestPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 20);
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget();
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget(Optional.empty());
bestPeer.respond(responder);
@ -100,7 +97,7 @@ public class FullSyncTargetManagerTest {
final RespondingEthPeer bestPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 20);
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget();
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget(Optional.empty());
bestPeer.respond(responder);

Loading…
Cancel
Save