[PIE-2026] Separate in-sync from sync-status listeners (#100)

Signed-off-by: Meredith Baxter <meredith.baxter@consensys.net>
Signed-off-by: Lucas Saldanha <lucas.saldanha@consensys.net>
pull/109/head
mbaxter 5 years ago committed by Lucas Saldanha
parent fb265f510a
commit 127b92a1ad
  1. 4
      besu/src/main/java/org/hyperledger/besu/services/BesuEventsImpl.java
  2. 2
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java
  3. 2
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java
  4. 2
      ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/AbstractMiningCoordinator.java
  5. 43
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java
  6. 13
      ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java
  7. 46
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainHeadEstimate.java
  8. 29
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainState.java
  9. 38
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainStateSnapshot.java
  10. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java
  11. 26
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  12. 107
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/InSyncTracker.java
  13. 111
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java
  14. 559
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncStateTest.java
  15. 34
      ethereum/permissioning/src/main/java/org/hyperledger/besu/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProvider.java
  16. 79
      ethereum/permissioning/src/test/java/org/hyperledger/besu/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProviderTest.java

@ -75,12 +75,12 @@ public class BesuEventsImpl implements BesuEvents {
@Override
public long addSyncStatusListener(final SyncStatusListener syncStatusListener) {
return syncState.addSyncStatusListener(syncStatusListener);
return syncState.subscribeSyncStatus(syncStatusListener);
}
@Override
public void removeSyncStatusListener(final long listenerIdentifier) {
syncState.removeSyncStatusListener(listenerIdentifier);
syncState.unsubscribeSyncStatus(listenerIdentifier);
}
private static PropagatedBlockContext blockPropagatedContext(

@ -31,7 +31,7 @@ public class SyncingSubscriptionService {
public SyncingSubscriptionService(
final SubscriptionManager subscriptionManager, final Synchronizer synchronizer) {
this.subscriptionManager = subscriptionManager;
synchronizer.observeSyncStatus(this::sendSyncingToMatchingSubscriptions);
synchronizer.subscribeSyncStatus(this::sendSyncingToMatchingSubscriptions);
}
private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) {

@ -54,7 +54,7 @@ public class SyncingSubscriptionServiceTest {
public void before() {
final ArgumentCaptor<SyncStatusListener> captor =
ArgumentCaptor.forClass(SyncStatusListener.class);
when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(1L);
when(synchronizer.subscribeSyncStatus(captor.capture())).thenReturn(1L);
new SyncingSubscriptionService(subscriptionManager, synchronizer);
syncStatusListener = captor.getValue();
}

@ -55,7 +55,7 @@ public abstract class AbstractMiningCoordinator<
this.blockchain = blockchain;
this.syncState = syncState;
this.blockchain.observeBlockAdded(this);
syncState.addInSyncListener(this::inSyncChanged);
syncState.subscribeInSync(this::inSyncChanged);
}
@Override

@ -22,6 +22,9 @@ import java.util.Optional;
/** Provides an interface to block synchronization processes. */
public interface Synchronizer {
// Default tolerance used to determine whether or not this node is "in sync"
long DEFAULT_IN_SYNC_TOLERANCE = 5;
void start();
void stop();
@ -32,7 +35,43 @@ public interface Synchronizer {
*/
Optional<SyncStatus> getSyncStatus();
long observeSyncStatus(final BesuEvents.SyncStatusListener listener);
long subscribeSyncStatus(final BesuEvents.SyncStatusListener listener);
boolean unsubscribeSyncStatus(long observerId);
/**
* Add a listener that will be notified when this node's sync status changes. A node is considered
* in-sync if the local chain height is no more than {@code DEFAULT_IN_SYNC_TOLERANCE} behind the
* highest estimated remote chain height.
*
* @param listener The callback to invoke when the sync status changes
* @return A subscription id that can be used to unsubscribe from these events
*/
long subscribeInSync(final InSyncListener listener);
/**
* Add a listener that will be notified when this node's sync status changes. A node is considered
* in-sync if the local chain height is no more than {@code syncTolerance} behind the highest
* estimated remote chain height.
*
* @param listener The callback to invoke when the sync status changes
* @param syncTolerance The tolerance used to determine whether this node is in-sync. A value of
* zero means that the node is considered in-sync only when the local chain height is greater
* than or equal to the best estimated remote chain height.
* @return A subscription id that can be used to unsubscribe from these events
*/
long subscribeInSync(final InSyncListener listener, final long syncTolerance);
/**
* Unsubscribe from in sync events.
*
* @param listenerId The id returned when subscribing
* @return {@code true} if a subscription was cancelled
*/
boolean unsubscribeInSync(final long listenerId);
boolean removeObserver(long observerId);
@FunctionalInterface
interface InSyncListener {
void onInSyncStatusChange(boolean newSyncStatus);
}
}

@ -203,11 +203,14 @@ public class BlockDataGenerator {
}
public Block genesisBlock() {
final BlockOptions options =
new BlockOptions()
.setBlockNumber(BlockHeader.GENESIS_BLOCK_NUMBER)
.setStateRoot(Hash.EMPTY_TRIE_HASH)
.setParentHash(Hash.ZERO);
return genesisBlock(new BlockOptions());
}
public Block genesisBlock(final BlockOptions options) {
options
.setBlockNumber(BlockHeader.GENESIS_BLOCK_NUMBER)
.setStateRoot(Hash.EMPTY_TRIE_HASH)
.setParentHash(Hash.ZERO);
return block(options);
}

@ -0,0 +1,46 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.chain.ChainHead;
import org.hyperledger.besu.util.uint.UInt256;
public interface ChainHeadEstimate {
UInt256 getEstimatedTotalDifficulty();
long getEstimatedHeight();
/**
* Returns true if this chain state represents a chain that is "better" than the chain represented
* by the supplied {@link ChainHead}. "Better" currently means that this chain is longer or
* heavier than the supplied {@code chainToCheck}.
*
* @param chainToCheck The chain being compared.
* @return true if this {@link ChainState} represents a better chain than {@code chainToCheck}.
*/
default boolean chainIsBetterThan(final ChainHead chainToCheck) {
return hasHigherDifficultyThan(chainToCheck) || hasLongerChainThan(chainToCheck);
}
default boolean hasHigherDifficultyThan(final ChainHead chainToCheck) {
return getEstimatedTotalDifficulty().compareTo(chainToCheck.getTotalDifficulty()) > 0;
}
default boolean hasLongerChainThan(final ChainHead chainToCheck) {
return getEstimatedHeight() > chainToCheck.getHeight();
}
}

@ -14,7 +14,6 @@
*/
package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.chain.ChainHead;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.util.Subscribers;
@ -22,7 +21,7 @@ import org.hyperledger.besu.util.uint.UInt256;
import com.google.common.base.MoreObjects;
public class ChainState {
public class ChainState implements ChainHeadEstimate {
// The best block by total difficulty that we know about
private final BestBlock bestBlock = new BestBlock();
// The highest block that we've seen
@ -40,14 +39,20 @@ public class ChainState {
estimatedHeightListeners.unsubscribe(listenerId);
}
public ChainStateSnapshot getSnapshot() {
return new ChainStateSnapshot(getEstimatedTotalDifficulty(), getEstimatedHeight());
}
public boolean hasEstimatedHeight() {
return estimatedHeightKnown;
}
@Override
public long getEstimatedHeight() {
return estimatedHeight;
}
@Override
public UInt256 getEstimatedTotalDifficulty() {
return bestBlock.getTotalDifficulty();
}
@ -106,26 +111,6 @@ public class ChainState {
}
}
/**
* Returns true if this chain state represents a chain that is "better" than the chain represented
* by the supplied {@link ChainHead}. "Better" currently means that this chain is longer or
* heavier than the supplied {@code chainToCheck}.
*
* @param chainToCheck The chain being compared.
* @return true if this {@link ChainState} represents a better chain than {@code chainToCheck}.
*/
public boolean chainIsBetterThan(final ChainHead chainToCheck) {
return hasHigherDifficultyThan(chainToCheck) || hasLongerChainThan(chainToCheck);
}
private boolean hasHigherDifficultyThan(final ChainHead chainToCheck) {
return bestBlock.getTotalDifficulty().compareTo(chainToCheck.getTotalDifficulty()) > 0;
}
private boolean hasLongerChainThan(final ChainHead chainToCheck) {
return estimatedHeight > chainToCheck.getHeight();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)

@ -0,0 +1,38 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.util.uint.UInt256;
public class ChainStateSnapshot implements ChainHeadEstimate {
private final UInt256 totalDifficulty;
private final long chainHeight;
public ChainStateSnapshot(final UInt256 totalDifficulty, final long chainHeight) {
this.totalDifficulty = totalDifficulty;
this.chainHeight = chainHeight;
}
@Override
public UInt256 getEstimatedTotalDifficulty() {
return totalDifficulty;
}
@Override
public long getEstimatedHeight() {
return chainHeight;
}
}

@ -326,10 +326,16 @@ public class EthPeer {
return knownBlocks.contains(hash);
}
/** @return This peer's current chain state. */
public ChainState chainState() {
return chainHeadState;
}
/** @return A read-only snapshot of this peer's current {@code chainState} } */
public ChainHeadEstimate chainStateSnapshot() {
return chainHeadState.getSnapshot();
}
public void registerHeight(final Hash blockHash, final long height) {
chainHeadState.update(blockHash, height);
}

@ -34,7 +34,6 @@ import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.ExceptionUtils;
import org.hyperledger.besu.util.Subscribers;
import java.nio.file.Path;
import java.time.Clock;
@ -51,7 +50,6 @@ public class DefaultSynchronizer<C> implements Synchronizer {
private final Optional<Pruner> maybePruner;
private final SyncState syncState;
private final AtomicBoolean running = new AtomicBoolean(false);
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
private final BlockPropagationManager<C> blockPropagationManager;
private final Optional<FastSyncDownloader<C>> fastSyncDownloader;
private final FullSyncDownloader<C> fullSyncDownloader;
@ -126,7 +124,6 @@ public class DefaultSynchronizer<C> implements Synchronizer {
public void start() {
if (running.compareAndSet(false, true)) {
LOG.info("Starting synchronizer.");
syncState.addSyncStatusListener(this::syncStatusCallback);
blockPropagationManager.start();
if (fastSyncDownloader.isPresent()) {
fastSyncDownloader.get().start().whenComplete(this::handleFastSyncResult);
@ -187,17 +184,28 @@ public class DefaultSynchronizer<C> implements Synchronizer {
}
@Override
public long observeSyncStatus(final SyncStatusListener listener) {
public long subscribeSyncStatus(final SyncStatusListener listener) {
checkNotNull(listener);
return syncStatusListeners.subscribe(listener);
return syncState.subscribeSyncStatus(listener);
}
@Override
public boolean removeObserver(final long observerId) {
return syncStatusListeners.unsubscribe(observerId);
public boolean unsubscribeSyncStatus(final long subscriberId) {
return syncState.unsubscribeSyncStatus(subscriberId);
}
private void syncStatusCallback(final SyncStatus status) {
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(status));
@Override
public long subscribeInSync(final InSyncListener listener) {
return syncState.subscribeInSync(listener);
}
@Override
public long subscribeInSync(final InSyncListener listener, final long syncTolerance) {
return syncState.subscribeInSync(listener, syncTolerance);
}
@Override
public boolean unsubscribeInSync(final long listenerId) {
return syncState.unsubscribeSyncStatus(listenerId);
}
}

@ -0,0 +1,107 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.state;
import org.hyperledger.besu.ethereum.chain.ChainHead;
import org.hyperledger.besu.ethereum.core.Synchronizer.InSyncListener;
import org.hyperledger.besu.ethereum.eth.manager.ChainHeadEstimate;
import java.util.Optional;
import java.util.function.Consumer;
/** Tracks the sync status of this node within the specified {@code syncTolerance}. */
class InSyncTracker {
private InSyncState state = InSyncState.UNKNOWN;
// If the local chain is no more than {@code syncTolerance} behind the estimated highest chain,
// then the tracker considers this local node to be in sync
private final long syncTolerance;
private final InSyncListener listener;
private InSyncTracker(final InSyncListener listener, final long syncTolerance) {
this.listener = listener;
this.syncTolerance = syncTolerance;
}
public static InSyncTracker create(final InSyncListener listener, final long syncTolerance) {
return new InSyncTracker(listener, syncTolerance);
}
public static boolean isInSync(
final ChainHead localChain, final ChainHeadEstimate remoteChain, final long syncTolerance) {
final boolean inSyncByHeight =
remoteChain.getEstimatedHeight() - localChain.getHeight() <= syncTolerance;
return inSyncByHeight || !remoteChain.chainIsBetterThan(localChain);
}
synchronized void checkState(
final ChainHead localChain,
final Optional<ChainHeadEstimate> syncTargetChain,
final Optional<ChainHeadEstimate> bestPeerChain) {
final boolean currentSyncStatus =
currentSyncStatus(localChain, syncTargetChain, bestPeerChain).orElse(true);
final InSyncState newState = InSyncState.fromInSync(currentSyncStatus);
if (state != newState) {
// Sync status has changed, notify listener
state = newState;
state.ifKnown(listener::onInSyncStatusChange);
}
}
private Optional<Boolean> currentSyncStatus(
final ChainHead localChain,
final Optional<ChainHeadEstimate> syncTargetChain,
final Optional<ChainHeadEstimate> bestPeerChain) {
final Optional<Boolean> inSyncWithSyncTarget =
syncTargetChain.map(remote -> isInSync(localChain, remote));
final Optional<Boolean> inSyncWithBestPeer =
bestPeerChain.map(remote -> isInSync(localChain, remote));
// If we're out of sync with either peer, we're out of sync
if (inSyncWithSyncTarget.isPresent() && !inSyncWithSyncTarget.get()) {
return Optional.of(false);
}
if (inSyncWithBestPeer.isPresent() && !inSyncWithBestPeer.get()) {
return Optional.of(false);
}
// Otherwise, if either peer is in sync, we're in sync
return inSyncWithSyncTarget.or(() -> inSyncWithBestPeer);
}
private boolean isInSync(final ChainHead localChain, final ChainHeadEstimate remoteChain) {
return isInSync(localChain, remoteChain, syncTolerance);
}
private enum InSyncState {
UNKNOWN(Optional.empty()),
IN_SYNC(Optional.of(true)),
OUT_OF_SYNC(Optional.of(false));
private final Optional<Boolean> inSync;
InSyncState(final Optional<Boolean> inSync) {
this.inSync = inSync;
}
static InSyncState fromInSync(final boolean inSync) {
return inSync ? IN_SYNC : OUT_OF_SYNC;
}
public void ifKnown(final Consumer<Boolean> handler) {
inSync.ifPresent(handler::accept);
}
}
}

@ -18,23 +18,30 @@ import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.ChainHead;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.SyncStatus;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.core.Synchronizer.InSyncListener;
import org.hyperledger.besu.ethereum.eth.manager.ChainHeadEstimate;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.util.Subscribers;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
public class SyncState {
private static final long SYNC_TOLERANCE = 5;
private final Blockchain blockchain;
private final EthPeers ethPeers;
private final Subscribers<InSyncListener> inSyncListeners = Subscribers.create();
private final AtomicLong inSyncSubscriberId = new AtomicLong();
private final Map<Long, InSyncTracker> inSyncTrackers = new ConcurrentHashMap<>();
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
private volatile long chainHeightListenerId;
private volatile Optional<SyncTarget> syncTarget = Optional.empty();
@ -79,16 +86,47 @@ public class SyncState {
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus));
}
public void addInSyncListener(final InSyncListener observer) {
inSyncListeners.subscribe(observer);
/**
* Add a listener that will be notified when this node's sync status changes. A node is considered
* in-sync if the local chain height is no more than {@code SYNC_TOLERANCE} behind the highest
* estimated remote chain height.
*
* @param listener The callback to invoke when the sync status changes
* @return An {@code Unsubscriber} that can be used to stop listening for these events
*/
public long subscribeInSync(final InSyncListener listener) {
return subscribeInSync(listener, Synchronizer.DEFAULT_IN_SYNC_TOLERANCE);
}
/**
* Add a listener that will be notified when this node's sync status changes. A node is considered
* in-sync if the local chain height is no more than {@code syncTolerance} behind the highest
* estimated remote chain height.
*
* @param listener The callback to invoke when the sync status changes
* @param syncTolerance The tolerance used to determine whether this node is in-sync. A value of
* zero means that the node is considered in-sync only when the local chain height is greater
* than or equal to the best estimated remote chain height.
* @return An {@code Unsubscriber} that can be used to stop listening for these events
*/
public long subscribeInSync(final InSyncListener listener, final long syncTolerance) {
final InSyncTracker inSyncTracker = InSyncTracker.create(listener, syncTolerance);
final long id = inSyncSubscriberId.incrementAndGet();
inSyncTrackers.put(id, inSyncTracker);
return id;
}
public long addSyncStatusListener(final SyncStatusListener observer) {
return syncStatusListeners.subscribe(observer);
public boolean unsubscribeInSync(final long subscriberId) {
return inSyncTrackers.remove(subscriberId) != null;
}
public void removeSyncStatusListener(final long listenerId) {
syncStatusListeners.unsubscribe(listenerId);
public long subscribeSyncStatus(final SyncStatusListener listener) {
return syncStatusListeners.subscribe(listener);
}
public boolean unsubscribeSyncStatus(final long listenerId) {
return syncStatusListeners.unsubscribe(listenerId);
}
public SyncStatus syncStatus() {
@ -107,29 +145,44 @@ public class SyncState {
}
public boolean isInSync() {
return isInSync(SYNC_TOLERANCE);
return isInSync(Synchronizer.DEFAULT_IN_SYNC_TOLERANCE);
}
public boolean isInSync(final long syncTolerance) {
return isInSync(
getLocalChainHead(), getSyncTargetChainHead(), getBestPeerChainHead(), syncTolerance);
}
private boolean isInSync(
final ChainHead localChain,
final Optional<ChainHeadEstimate> syncTargetChain,
final Optional<ChainHeadEstimate> bestPeerChain,
final long syncTolerance) {
// Sync target may be temporarily empty while we switch sync targets during a sync, so
// check both the sync target and our best peer to determine if we're in sync or not
return isInSyncWithTarget(syncTolerance) && isInSyncWithBestPeer(syncTolerance);
return isInSync(localChain, syncTargetChain, syncTolerance)
&& isInSync(localChain, bestPeerChain, syncTolerance);
}
private boolean isInSyncWithTarget(final long syncTolerance) {
return syncTarget
.map(t -> t.estimatedTargetHeight() - blockchain.getChainHeadBlockNumber() <= syncTolerance)
private boolean isInSync(
final ChainHead localChain,
final Optional<ChainHeadEstimate> remoteChain,
final long syncTolerance) {
return remoteChain
.map(remoteState -> InSyncTracker.isInSync(localChain, remoteState, syncTolerance))
.orElse(true);
}
private boolean isInSyncWithBestPeer(final long syncTolerance) {
final ChainHead chainHead = blockchain.getChainHead();
return ethPeers
.bestPeerWithHeightEstimate()
.filter(peer -> peer.chainState().chainIsBetterThan(chainHead))
.map(EthPeer::chainState)
.map(chainState -> chainState.getEstimatedHeight() - chainHead.getHeight() <= syncTolerance)
.orElse(true);
private ChainHead getLocalChainHead() {
return blockchain.getChainHead();
}
private Optional<ChainHeadEstimate> getSyncTargetChainHead() {
return syncTarget.map(SyncTarget::peer).map(EthPeer::chainStateSnapshot);
}
private Optional<ChainHeadEstimate> getBestPeerChainHead() {
return ethPeers.bestPeerWithHeightEstimate().map(EthPeer::chainStateSnapshot);
}
public void disconnectSyncTarget(final DisconnectReason reason) {
@ -166,18 +219,20 @@ public class SyncState {
}
private synchronized void checkInSync() {
final boolean currentInSync = isInSync();
ChainHead localChain = getLocalChainHead();
Optional<ChainHeadEstimate> syncTargetChain = getSyncTargetChainHead();
Optional<ChainHeadEstimate> bestPeerChain = getBestPeerChainHead();
final boolean currentInSync = isInSync(localChain, syncTargetChain, bestPeerChain, 0);
if (lastInSync.compareAndSet(!currentInSync, currentInSync)) {
if (!currentInSync) {
// when we fall out of sync change our starting block
startingBlock = blockchain.getChainHeadBlockNumber();
startingBlock = localChain.getHeight();
}
inSyncListeners.forEach(c -> c.onSyncStatusChanged(currentInSync));
}
}
@FunctionalInterface
public interface InSyncListener {
void onSyncStatusChanged(boolean newSyncStatus);
inSyncTrackers
.values()
.forEach(
(syncTracker) -> syncTracker.checkState(localChain, syncTargetChain, bestPeerChain));
}
}

@ -16,32 +16,39 @@ package org.hyperledger.besu.ethereum.eth.sync.state;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.chain.BlockAddedObserver;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.ChainHead;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.InMemoryStorageProvider;
import org.hyperledger.besu.ethereum.core.SyncStatus;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.core.Synchronizer.InSyncListener;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.ChainHeadEstimate;
import org.hyperledger.besu.ethereum.eth.manager.ChainState;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.util.uint.UInt256;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.junit.Before;
@ -50,52 +57,61 @@ import org.mockito.ArgumentCaptor;
public class SyncStateTest {
private static final long OUR_CHAIN_HEAD_NUMBER = 500;
private static final UInt256 OUR_CHAIN_DIFFICULTY = UInt256.of(500);
private static final long TARGET_CHAIN_DELTA = 100;
private static final UInt256 standardDifficultyPerBlock = UInt256.of(1L);
private static final long OUR_CHAIN_HEAD_NUMBER = 20;
private static final UInt256 OUR_CHAIN_DIFFICULTY =
standardDifficultyPerBlock.times(OUR_CHAIN_HEAD_NUMBER);
private static final long TARGET_CHAIN_DELTA = 20;
private static final long TARGET_CHAIN_HEIGHT = OUR_CHAIN_HEAD_NUMBER + TARGET_CHAIN_DELTA;
private static final UInt256 TARGET_DIFFICULTY = OUR_CHAIN_DIFFICULTY.plus(TARGET_CHAIN_DELTA);
private static final UInt256 TARGET_DIFFICULTY =
standardDifficultyPerBlock.times(TARGET_CHAIN_HEIGHT);
private final Blockchain blockchain = mock(Blockchain.class);
private final EthPeers ethPeers = mock(EthPeers.class);
private final SyncState.InSyncListener inSyncListener = mock(SyncState.InSyncListener.class);
private final InSyncListener inSyncListener = mock(InSyncListener.class);
private final InSyncListener inSyncListenerExact = mock(InSyncListener.class);
private final SyncStatusListener syncStatusListener = mock(SyncStatusListener.class);
private final EthPeer syncTargetPeer = mock(EthPeer.class);
private final ChainState syncTargetPeerChainState = spy(new ChainState());
private final EthPeer otherPeer = mock(EthPeer.class);
private final ChainState otherPeerChainState = spy(new ChainState());
private final BlockDataGenerator gen = new BlockDataGenerator(1);
private final Block genesisBlock =
gen.genesisBlock(new BlockOptions().setDifficulty(UInt256.ZERO));
private final MutableBlockchain blockchain =
InMemoryStorageProvider.createInMemoryBlockchain(genesisBlock);
private EthProtocolManager ethProtocolManager;
private EthPeers ethPeers;
private RespondingEthPeer syncTargetPeer;
private RespondingEthPeer otherPeer;
private SyncState syncState;
private BlockAddedObserver blockAddedObserver;
@Before
public void setUp() {
final ArgumentCaptor<BlockAddedObserver> captor =
ArgumentCaptor.forClass(BlockAddedObserver.class);
ethProtocolManager =
EthProtocolManagerTestUtil.create(
blockchain, InMemoryStorageProvider.createInMemoryWorldStateArchive());
ethPeers = spy(ethProtocolManager.ethContext().getEthPeers());
syncTargetPeer = createPeer(TARGET_DIFFICULTY, TARGET_CHAIN_HEIGHT);
otherPeer = createPeer(UInt256.ZERO, 0);
final ChainHead ourChainHead =
new ChainHead(Hash.ZERO, OUR_CHAIN_DIFFICULTY, OUR_CHAIN_HEAD_NUMBER);
advanceLocalChain(OUR_CHAIN_HEAD_NUMBER);
when(blockchain.observeBlockAdded(captor.capture())).thenReturn(1L);
when(blockchain.getChainHeadBlockNumber()).thenReturn(OUR_CHAIN_HEAD_NUMBER);
when(blockchain.getChainHead()).thenReturn(ourChainHead);
when(syncTargetPeer.chainState()).thenReturn(syncTargetPeerChainState);
when(otherPeer.chainState()).thenReturn(otherPeerChainState);
syncState = new SyncState(blockchain, ethPeers);
blockAddedObserver = captor.getValue();
syncState.addInSyncListener(inSyncListener);
syncState.addSyncStatusListener(syncStatusListener);
syncState.subscribeInSync(inSyncListener);
syncState.subscribeInSync(inSyncListenerExact, 0);
syncState.subscribeSyncStatus(syncStatusListener);
}
@Test
public void isInSync_noPeers() {
otherPeer.disconnect(DisconnectReason.REQUESTED);
syncTargetPeer.disconnect(DisconnectReason.REQUESTED);
syncState.clearSyncTarget();
assertThat(syncState.isInSync()).isTrue();
}
@Test
public void isInSync_singlePeerWithWorseChainBetterHeight() {
updateChainState(otherPeerChainState, TARGET_CHAIN_HEIGHT, OUR_CHAIN_DIFFICULTY.minus(1L));
when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer));
doReturn(false).when(otherPeerChainState).chainIsBetterThan(any());
updateChainState(otherPeer.getEthPeer(), TARGET_CHAIN_HEIGHT, OUR_CHAIN_DIFFICULTY.minus(1L));
final EthPeer peer = mockWorseChain(otherPeer.getEthPeer());
doReturn(Optional.of(peer)).when(ethPeers).bestPeerWithHeightEstimate();
assertThat(syncState.syncTarget()).isEmpty(); // Sanity check
assertThat(syncState.isInSync()).isTrue();
@ -105,9 +121,9 @@ public class SyncStateTest {
@Test
public void isInSync_singlePeerWithWorseChainWorseHeight() {
updateChainState(
otherPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, OUR_CHAIN_DIFFICULTY.minus(1L));
when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer));
doReturn(false).when(otherPeerChainState).chainIsBetterThan(any());
otherPeer.getEthPeer(), OUR_CHAIN_HEAD_NUMBER - 1L, OUR_CHAIN_DIFFICULTY.minus(1L));
final EthPeer peer = mockWorseChain(otherPeer.getEthPeer());
doReturn(Optional.of(peer)).when(ethPeers).bestPeerWithHeightEstimate();
assertThat(syncState.syncTarget()).isEmpty(); // Sanity check
assertThat(syncState.isInSync()).isTrue();
@ -116,9 +132,9 @@ public class SyncStateTest {
@Test
public void isInSync_singlePeerWithBetterChainWorseHeight() {
updateChainState(otherPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, TARGET_DIFFICULTY);
when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer));
doReturn(true).when(otherPeerChainState).chainIsBetterThan(any());
updateChainState(otherPeer.getEthPeer(), OUR_CHAIN_HEAD_NUMBER - 1L, TARGET_DIFFICULTY);
final EthPeer peer = mockBetterChain(otherPeer.getEthPeer());
doReturn(Optional.of(peer)).when(ethPeers).bestPeerWithHeightEstimate();
assertThat(syncState.syncTarget()).isEmpty(); // Sanity check
assertThat(syncState.isInSync()).isTrue();
@ -127,9 +143,9 @@ public class SyncStateTest {
@Test
public void isInSync_singlePeerWithBetterChainBetterHeight() {
updateChainState(otherPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY);
when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer));
doReturn(true).when(otherPeerChainState).chainIsBetterThan(any());
updateChainState(otherPeer.getEthPeer(), TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY);
final EthPeer peer = mockBetterChain(otherPeer.getEthPeer());
doReturn(Optional.of(peer)).when(ethPeers).bestPeerWithHeightEstimate();
assertThat(syncState.syncTarget()).isEmpty(); // Sanity check
assertThat(syncState.isInSync()).isFalse();
@ -140,8 +156,8 @@ public class SyncStateTest {
@Test
public void isInSync_syncTargetWithBetterHeight() {
updateChainState(syncTargetPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY);
syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L));
otherPeer.disconnect(DisconnectReason.REQUESTED);
setupOutOfSyncState();
assertThat(syncState.syncTarget()).isPresent(); // Sanity check
assertThat(syncState.isInSync()).isFalse();
@ -152,8 +168,10 @@ public class SyncStateTest {
@Test
public void isInSync_syncTargetWithWorseHeight() {
updateChainState(syncTargetPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, TARGET_DIFFICULTY);
syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L));
otherPeer.disconnect(DisconnectReason.REQUESTED);
final long heightDifference = 20L;
advanceLocalChain(TARGET_CHAIN_HEIGHT + heightDifference);
setupOutOfSyncState();
assertThat(syncState.syncTarget()).isPresent(); // Sanity check
assertThat(syncState.isInSync()).isTrue();
@ -162,11 +180,9 @@ public class SyncStateTest {
@Test
public void isInSync_outOfSyncWithTargetAndOutOfSyncWithBestPeer() {
updateChainState(syncTargetPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY);
syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L));
updateChainState(otherPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY);
when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer));
doReturn(true).when(otherPeerChainState).chainIsBetterThan(any());
setupOutOfSyncState();
updateChainState(otherPeer.getEthPeer(), TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY);
doReturn(Optional.of(otherPeer.getEthPeer())).when(ethPeers).bestPeerWithHeightEstimate();
assertThat(syncState.isInSync()).isFalse();
assertThat(syncState.isInSync(0)).isFalse();
@ -177,112 +193,431 @@ public class SyncStateTest {
@Test
public void isInSync_inSyncWithTargetOutOfSyncWithBestPeer() {
setupOutOfSyncState();
advanceLocalChain(TARGET_CHAIN_HEIGHT);
final long heightDifference = 20L;
updateChainState(
syncTargetPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, OUR_CHAIN_DIFFICULTY.minus(1L));
syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L));
updateChainState(otherPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY);
when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer));
doReturn(true).when(otherPeerChainState).chainIsBetterThan(any());
otherPeer.getEthPeer(),
TARGET_CHAIN_HEIGHT + heightDifference,
TARGET_DIFFICULTY.plus(heightDifference));
doReturn(Optional.of(otherPeer.getEthPeer())).when(ethPeers).bestPeerWithHeightEstimate();
assertThat(syncState.isInSync()).isFalse();
assertThat(syncState.isInSync(0)).isFalse();
assertThat(syncState.isInSync(TARGET_CHAIN_DELTA - 1)).isFalse();
assertThat(syncState.isInSync(TARGET_CHAIN_DELTA)).isTrue();
assertThat(syncState.isInSync(TARGET_CHAIN_DELTA + 1)).isTrue();
assertThat(syncState.isInSync(heightDifference - 1)).isFalse();
assertThat(syncState.isInSync(heightDifference)).isTrue();
assertThat(syncState.isInSync(heightDifference + 1)).isTrue();
}
@Test
public void isInSync_inSyncWithTargetInSyncWithBestPeer() {
updateChainState(
syncTargetPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, OUR_CHAIN_DIFFICULTY.minus(1L));
syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L));
updateChainState(
otherPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, OUR_CHAIN_DIFFICULTY.minus(1L));
when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer));
doReturn(false).when(otherPeerChainState).chainIsBetterThan(any());
setupOutOfSyncState();
advanceLocalChain(TARGET_CHAIN_HEIGHT);
updateChainState(otherPeer.getEthPeer(), TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY);
doReturn(Optional.of(otherPeer.getEthPeer())).when(ethPeers).bestPeerWithHeightEstimate();
assertThat(syncState.isInSync()).isTrue();
assertThat(syncState.isInSync(0)).isTrue();
}
@Test
public void shouldSwitchToInSyncWhenSyncTargetCleared() {
public void shouldSwitchToInSyncWhenNoBetterPeersAreAvailable() {
setupOutOfSyncState();
otherPeer.disconnect(DisconnectReason.REQUESTED);
syncTargetPeer.disconnect(DisconnectReason.REQUESTED);
syncState.clearSyncTarget();
verify(inSyncListener).onSyncStatusChanged(true);
verify(inSyncListener).onInSyncStatusChange(true);
verify(inSyncListenerExact).onInSyncStatusChange(true);
verifyNoMoreInteractions(inSyncListener);
verifyNoMoreInteractions(inSyncListenerExact);
}
@Test
public void shouldBecomeInSyncWhenOurBlockchainCatchesUp() {
setupOutOfSyncState();
when(blockchain.getChainHeadBlockNumber()).thenReturn(TARGET_CHAIN_HEIGHT);
blockAddedObserver.onBlockAdded(
BlockAddedEvent.createForHeadAdvancement(
new Block(
targetBlockHeader(),
new BlockBody(Collections.emptyList(), Collections.emptyList()))),
blockchain);
// Update to just within the default sync threshold
advanceLocalChain(TARGET_CHAIN_HEIGHT - Synchronizer.DEFAULT_IN_SYNC_TOLERANCE);
// We should register as in-sync with default tolerance, out-of-sync with exact tolerance
assertThat(syncState.isInSync()).isTrue();
assertThat(syncState.isInSync(0)).isFalse();
verify(inSyncListener).onInSyncStatusChange(true);
verify(inSyncListenerExact, never()).onInSyncStatusChange(true);
// Advance one more block
advanceLocalChain(TARGET_CHAIN_HEIGHT - Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1);
// We should register as in-sync with default tolerance, out-of-sync with exact tolerance
assertThat(syncState.isInSync()).isTrue();
assertThat(syncState.isInSync(0)).isFalse();
verifyNoMoreInteractions(inSyncListener);
verify(inSyncListenerExact, never()).onInSyncStatusChange(true);
// Catch all the way up
advanceLocalChain(TARGET_CHAIN_HEIGHT);
// We should register as in-sync
assertThat(syncState.isInSync()).isTrue();
verify(inSyncListener).onSyncStatusChanged(true);
assertThat(syncState.isInSync(0)).isTrue();
verifyNoMoreInteractions(inSyncListener);
verify(inSyncListenerExact).onInSyncStatusChange(true);
}
@Test
public void addInSyncListener_whileOutOfSync() {
setupOutOfSyncState();
// Add listener
InSyncListener newListener = mock(InSyncListener.class);
syncState.subscribeInSync(newListener);
verify(newListener, never()).onInSyncStatusChange(false);
verify(newListener, never()).onInSyncStatusChange(true);
// Catch all the way up
advanceLocalChain(TARGET_CHAIN_HEIGHT);
// Fall out of sync
updateChainState(
syncTargetPeer.getEthPeer(),
TARGET_CHAIN_HEIGHT + Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1L,
TARGET_DIFFICULTY.plus(10L));
final ArgumentCaptor<Boolean> inSyncEventCaptor = ArgumentCaptor.forClass(Boolean.class);
verify(newListener, times(3)).onInSyncStatusChange(inSyncEventCaptor.capture());
final List<Boolean> syncChanges = inSyncEventCaptor.getAllValues();
assertThat(syncChanges.get(0)).isEqualTo(false);
assertThat(syncChanges.get(1)).isEqualTo(true);
assertThat(syncChanges.get(2)).isEqualTo(false);
}
@Test
public void addInSyncListener_whileOutOfSync_withDistinctSyncTolerance() {
setupOutOfSyncState();
// Add listener
final long syncTolerance = Synchronizer.DEFAULT_IN_SYNC_TOLERANCE * 2;
InSyncListener newListener = mock(InSyncListener.class);
syncState.subscribeInSync(newListener, syncTolerance);
verify(newListener, never()).onInSyncStatusChange(false);
verify(newListener, never()).onInSyncStatusChange(true);
// Catch all the way up
advanceLocalChain(TARGET_CHAIN_HEIGHT);
// Fall out of sync
updateChainState(
syncTargetPeer.getEthPeer(),
TARGET_CHAIN_HEIGHT + syncTolerance + 1L,
TARGET_DIFFICULTY.plus(10L));
final ArgumentCaptor<Boolean> inSyncEventCaptor = ArgumentCaptor.forClass(Boolean.class);
verify(newListener, times(3)).onInSyncStatusChange(inSyncEventCaptor.capture());
final List<Boolean> syncChanges = inSyncEventCaptor.getAllValues();
assertThat(syncChanges.get(0)).isEqualTo(false);
assertThat(syncChanges.get(1)).isEqualTo(true);
assertThat(syncChanges.get(2)).isEqualTo(false);
}
@Test
public void addInSyncListener_whileInSync() {
setupOutOfSyncState();
// Catch all the way up
advanceLocalChain(TARGET_CHAIN_HEIGHT);
// Add listener
InSyncListener newListener = mock(InSyncListener.class);
syncState.subscribeInSync(newListener);
verify(newListener, never()).onInSyncStatusChange(false);
verify(newListener, never()).onInSyncStatusChange(true);
// Fall out of sync
updateChainState(
syncTargetPeer.getEthPeer(),
TARGET_CHAIN_HEIGHT + Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1L,
TARGET_DIFFICULTY.plus(10L));
verify(newListener).onInSyncStatusChange(false);
verify(newListener, never()).onInSyncStatusChange(true);
// Catch up
advanceLocalChain(TARGET_CHAIN_HEIGHT + 1L);
verify(newListener).onInSyncStatusChange(false);
verify(newListener).onInSyncStatusChange(true);
}
@Test
public void addInSyncListener_whileInSync_withDistinctSyncTolerance() {
final long syncTolerance = Synchronizer.DEFAULT_IN_SYNC_TOLERANCE * 2;
setupOutOfSyncState();
// Catch all the way up
advanceLocalChain(TARGET_CHAIN_HEIGHT);
// Add listener
InSyncListener newListener = mock(InSyncListener.class);
syncState.subscribeInSync(newListener, syncTolerance);
verify(newListener, never()).onInSyncStatusChange(false);
verify(newListener, never()).onInSyncStatusChange(true);
// Fall out of sync
updateChainState(
syncTargetPeer.getEthPeer(),
TARGET_CHAIN_HEIGHT + syncTolerance + 1L,
TARGET_DIFFICULTY.plus(10L));
verify(newListener).onInSyncStatusChange(false);
verify(newListener, never()).onInSyncStatusChange(true);
// Catch up
advanceLocalChain(TARGET_CHAIN_HEIGHT + 1L);
verify(newListener).onInSyncStatusChange(false);
verify(newListener).onInSyncStatusChange(true);
}
@Test
public void removeInSyncListener_doesntReceiveSubsequentEvents() {
final long syncTolerance = Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1L;
setupOutOfSyncState();
// Add listener
InSyncListener newListener = mock(InSyncListener.class);
final long subscriberId = syncState.subscribeInSync(newListener, syncTolerance);
verify(newListener, never()).onInSyncStatusChange(anyBoolean());
// Remove listener
syncState.unsubscribeInSync(subscriberId);
// Catch all the way up
advanceLocalChain(TARGET_CHAIN_HEIGHT);
// We should not register the in-sync event
verify(newListener, never()).onInSyncStatusChange(anyBoolean());
// Fall out of sync
updateChainState(
syncTargetPeer.getEthPeer(),
TARGET_CHAIN_HEIGHT + syncTolerance + 1L,
TARGET_DIFFICULTY.plus(10L));
// We should not register the sync event
verify(newListener, never()).onInSyncStatusChange(anyBoolean());
// Other listeners should keep running
verify(inSyncListenerExact, times(2)).onInSyncStatusChange(false);
verify(inSyncListenerExact).onInSyncStatusChange(true);
}
@Test
public void removeInSyncListener_addAdditionalListenerBeforeRemoving() {
final long syncTolerance = Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1L;
setupOutOfSyncState();
// Add listener
InSyncListener listenerToRemove = mock(InSyncListener.class);
InSyncListener otherListener = mock(InSyncListener.class);
final long subscriberId = syncState.subscribeInSync(listenerToRemove, syncTolerance);
syncState.subscribeInSync(otherListener, syncTolerance);
// Remove listener
syncState.unsubscribeInSync(subscriberId);
// Catch all the way up
advanceLocalChain(TARGET_CHAIN_HEIGHT);
// We should not register the in-sync event
verify(listenerToRemove, never()).onInSyncStatusChange(anyBoolean());
// Fall out of sync
updateChainState(
syncTargetPeer.getEthPeer(),
TARGET_CHAIN_HEIGHT + syncTolerance + 1L,
TARGET_DIFFICULTY.plus(10L));
// We should not register the in-sync event
verify(listenerToRemove, never()).onInSyncStatusChange(anyBoolean());
final ArgumentCaptor<Boolean> inSyncEventCaptor = ArgumentCaptor.forClass(Boolean.class);
verify(otherListener, times(3)).onInSyncStatusChange(inSyncEventCaptor.capture());
final List<Boolean> syncChanges = inSyncEventCaptor.getAllValues();
assertThat(syncChanges.get(0)).isEqualTo(false);
assertThat(syncChanges.get(1)).isEqualTo(true);
assertThat(syncChanges.get(2)).isEqualTo(false);
// Other listeners should keep running
verify(inSyncListenerExact).onInSyncStatusChange(true);
verify(inSyncListenerExact, times(2)).onInSyncStatusChange(false);
}
@Test
public void removeInSyncListener_addAdditionalListenerAfterRemoving() {
final long syncTolerance = Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1L;
setupOutOfSyncState();
// Add listener
InSyncListener listenerToRemove = mock(InSyncListener.class);
InSyncListener otherListener = mock(InSyncListener.class);
final long subscriberId = syncState.subscribeInSync(listenerToRemove, syncTolerance);
// Remove listener
syncState.unsubscribeInSync(subscriberId);
// Add new listener
syncState.subscribeInSync(otherListener, syncTolerance);
// Catch all the way up
advanceLocalChain(TARGET_CHAIN_HEIGHT);
// We should not register the sync event
verify(listenerToRemove, never()).onInSyncStatusChange(anyBoolean());
// Fall out of sync
updateChainState(
syncTargetPeer.getEthPeer(),
TARGET_CHAIN_HEIGHT + syncTolerance + 1L,
TARGET_DIFFICULTY.plus(10L));
// We should not register the sync event
verify(listenerToRemove, never()).onInSyncStatusChange(anyBoolean());
final ArgumentCaptor<Boolean> inSyncEventCaptor = ArgumentCaptor.forClass(Boolean.class);
verify(otherListener, times(3)).onInSyncStatusChange(inSyncEventCaptor.capture());
final List<Boolean> syncChanges = inSyncEventCaptor.getAllValues();
assertThat(syncChanges.get(0)).isEqualTo(false);
assertThat(syncChanges.get(1)).isEqualTo(true);
assertThat(syncChanges.get(2)).isEqualTo(false);
// Other listeners should keep running
verify(inSyncListenerExact, times(2)).onInSyncStatusChange(false);
verify(inSyncListenerExact).onInSyncStatusChange(true);
}
@Test
public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() {
final SyncStatusListener syncStatusListener = mock(SyncStatusListener.class);
syncState.addSyncStatusListener(syncStatusListener);
syncState.subscribeSyncStatus(syncStatusListener);
blockAddedObserver.onBlockAdded(
BlockAddedEvent.createForHeadAdvancement(
new Block(
targetBlockHeader(),
new BlockBody(Collections.emptyList(), Collections.emptyList()))),
blockchain);
advanceLocalChain(OUR_CHAIN_HEAD_NUMBER + 1L);
verify(syncStatusListener).onSyncStatusChanged(eq(syncState.syncStatus()));
}
@Test
public void shouldReportReorgEvents() {
when(blockchain.getChainHeadBlockNumber()).thenReturn(TARGET_CHAIN_HEIGHT);
blockAddedObserver.onBlockAdded(
BlockAddedEvent.createForChainReorg(
new Block(
targetBlockHeader(),
new BlockBody(Collections.emptyList(), Collections.emptyList())),
Collections.emptyList(),
Collections.emptyList()),
blockchain);
public void shouldHandleSyncThenReorg() {
// Sync up to the target
final int expectedSyncEvents = (int) TARGET_CHAIN_DELTA;
advanceLocalChain(TARGET_CHAIN_HEIGHT);
// Perform a shallow reorg
final int expectedReorgEvents = 2;
reorgLocalChain(TARGET_CHAIN_HEIGHT - 1, TARGET_CHAIN_HEIGHT, UInt256.of(2L));
assertThat(syncState.isInSync()).isTrue();
final ArgumentCaptor<SyncStatus> captor = ArgumentCaptor.forClass(SyncStatus.class);
verify(syncStatusListener, times(2)).onSyncStatusChanged(captor.capture());
assertThat(captor.getAllValues().get(0).inSync()).isFalse();
assertThat(captor.getAllValues().get(1).inSync()).isTrue();
verify(syncStatusListener, times(expectedSyncEvents + expectedReorgEvents))
.onSyncStatusChanged(captor.capture());
final List<SyncStatus> eventValues = captor.getAllValues();
// Check the initial set of events corresponding to block advancement while we're out of sync
for (int i = 0; i < eventValues.size(); i++) {
final SyncStatus syncStatus = eventValues.get(i);
if (i == eventValues.size() - 1) {
// Last event should be the in-sync reorg event
assertThat(syncStatus.inSync()).isTrue();
assertThat(syncStatus.getCurrentBlock()).isEqualTo(TARGET_CHAIN_HEIGHT);
// TODO - finalize the start, current, and highest block values should be
} else if (i == eventValues.size() - 2) {
// Second-to-last event should be the in-sync reorg event
assertThat(syncStatus.inSync()).isFalse();
assertThat(syncStatus.getCurrentBlock()).isEqualTo(TARGET_CHAIN_HEIGHT);
// TODO - finalize the start, current, and highest block values should be
} else if (i == eventValues.size() - 3) {
// Third-to-last event should be the event when the node finally reaches sync
assertThat(syncStatus.inSync()).isTrue();
assertThat(syncStatus.getCurrentBlock()).isEqualTo(TARGET_CHAIN_HEIGHT);
assertThat(syncStatus.getHighestBlock()).isEqualTo(TARGET_CHAIN_HEIGHT);
// TODO - verify desired startingBlock value
} else {
// All previous events should correspond to the initial sync
assertThat(syncStatus.inSync()).isFalse();
assertThat(syncStatus.getCurrentBlock()).isEqualTo(OUR_CHAIN_HEAD_NUMBER + i + 1);
assertThat(syncStatus.getHighestBlock()).isEqualTo(TARGET_CHAIN_HEIGHT);
// TODO - verify desired startingBlock value
}
}
}
private RespondingEthPeer createPeer(final UInt256 totalDifficulty, final long blockHeight) {
return EthProtocolManagerTestUtil.createPeer(ethProtocolManager, totalDifficulty, blockHeight);
}
private EthPeer mockWorseChain(final EthPeer peer) {
return mockChainIsBetterThan(peer, false);
}
private EthPeer mockBetterChain(final EthPeer peer) {
return mockChainIsBetterThan(peer, true);
}
private EthPeer mockChainIsBetterThan(final EthPeer peer, final boolean isBetter) {
final ChainState chainState = spy(peer.chainState());
final ChainHeadEstimate chainStateSnapshot = spy(peer.chainStateSnapshot());
doReturn(isBetter).when(chainState).chainIsBetterThan(any());
doReturn(isBetter).when(chainStateSnapshot).chainIsBetterThan(any());
final EthPeer mockedPeer = spy(peer);
doReturn(chainStateSnapshot).when(chainState).getSnapshot();
doReturn(chainStateSnapshot).when(mockedPeer).chainStateSnapshot();
doReturn(chainState).when(mockedPeer).chainState();
return mockedPeer;
}
private void setupOutOfSyncState() {
updateChainState(syncTargetPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY);
syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L));
assertThat(syncState.isInSync()).isFalse();
verify(inSyncListener).onSyncStatusChanged(false);
syncState.setSyncTarget(syncTargetPeer.getEthPeer(), blockchain.getGenesisBlock().getHeader());
verify(inSyncListener).onInSyncStatusChange(false);
verify(inSyncListenerExact).onInSyncStatusChange(false);
}
private void advanceLocalChain(final long newChainHeight) {
while (blockchain.getChainHeadBlockNumber() < newChainHeight) {
final BlockHeader parent = blockchain.getChainHeadHeader();
final Block block =
gen.block(
BlockOptions.create()
.setDifficulty(standardDifficultyPerBlock)
.setParentHash(parent.getHash())
.setBlockNumber(parent.getNumber() + 1L));
final List<TransactionReceipt> receipts = gen.receipts(block);
blockchain.appendBlock(block, receipts);
}
}
private void reorgLocalChain(
final long commonAncestor, final long newHeight, final UInt256 difficultyPerBlock) {
BlockHeader currentBlock = blockchain.getBlockHeader(commonAncestor).get();
while (currentBlock.getNumber() < newHeight) {
final Block block =
gen.block(
BlockOptions.create()
.setDifficulty(difficultyPerBlock)
.setParentHash(currentBlock.getHash())
.setBlockNumber(currentBlock.getNumber() + 1L));
final List<TransactionReceipt> receipts = gen.receipts(block);
blockchain.appendBlock(block, receipts);
currentBlock = block.getHeader();
}
}
/**
* Updates the chain state, such that the peer will end up with an estimated height of {@code
* blockHeight} and an estimated total difficulty of {@code totalDifficulty}
*
* @param chainState The chain state to update
* @param peer The peer whose chain should be updated
* @param blockHeight The target estimated block height
* @param totalDifficulty The total difficulty
*/
private void updateChainState(
final ChainState chainState, final long blockHeight, final UInt256 totalDifficulty) {
final EthPeer peer, final long blockHeight, final UInt256 totalDifficulty) {
// Chain state is updated based on the parent of the announced block
// So, increment block number by 1 and set block difficulty to zero
// in order to update to the values we want
@ -291,18 +626,10 @@ public class SyncStateTest {
.number(blockHeight + 1L)
.difficulty(UInt256.ZERO)
.buildHeader();
chainState.updateForAnnouncedBlock(header, totalDifficulty);
peer.chainState().updateForAnnouncedBlock(header, totalDifficulty);
// Sanity check this logic still holds
assertThat(chainState.getEstimatedHeight()).isEqualTo(blockHeight);
assertThat(chainState.getEstimatedTotalDifficulty()).isEqualTo(totalDifficulty);
}
private BlockHeader targetBlockHeader() {
return blockHeaderAt(TARGET_CHAIN_HEIGHT);
}
private BlockHeader blockHeaderAt(final long blockNumber) {
return new BlockHeaderTestFixture().number(blockNumber).buildHeader();
assertThat(peer.chainState().getEstimatedHeight()).isEqualTo(blockHeight);
assertThat(peer.chainState().getEstimatedTotalDifficulty()).isEqualTo(totalDifficulty);
}
}

@ -20,14 +20,13 @@ import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL;
import org.hyperledger.besu.ethereum.permissioning.node.NodePermissioningProvider;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.net.URI;
import java.util.Collection;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class SyncStatusNodePermissioningProvider implements NodePermissioningProvider {
@ -37,8 +36,8 @@ public class SyncStatusNodePermissioningProvider implements NodePermissioningPro
private final Counter checkCounter;
private final Counter checkCounterPermitted;
private final Counter checkCounterUnpermitted;
private OptionalLong syncStatusObserverId;
private boolean hasReachedSync = false;
private final long inSyncSubscriberId;
private final AtomicBoolean hasReachedSync = new AtomicBoolean(false);
public SyncStatusNodePermissioningProvider(
final Synchronizer synchronizer,
@ -46,8 +45,7 @@ public class SyncStatusNodePermissioningProvider implements NodePermissioningPro
final MetricsSystem metricsSystem) {
checkNotNull(synchronizer);
this.synchronizer = synchronizer;
long id = this.synchronizer.observeSyncStatus(this::handleSyncStatusUpdate);
this.syncStatusObserverId = OptionalLong.of(id);
this.inSyncSubscriberId = this.synchronizer.subscribeInSync(this::handleInSyncEvent, 0);
this.fixedNodes =
fixedNodes.stream().map(EnodeURL::toURIWithoutDiscoveryPort).collect(Collectors.toSet());
@ -55,7 +53,7 @@ public class SyncStatusNodePermissioningProvider implements NodePermissioningPro
BesuMetricCategory.PERMISSIONING,
"sync_status_node_sync_reached",
"Whether the sync status permissioning provider has realised sync yet",
() -> hasReachedSync ? 1 : 0);
() -> hasReachedSync.get() ? 1 : 0);
this.checkCounter =
metricsSystem.createCounter(
BesuMetricCategory.PERMISSIONING,
@ -73,20 +71,10 @@ public class SyncStatusNodePermissioningProvider implements NodePermissioningPro
"Number of times the sync status permissioning provider has been checked and returned unpermitted");
}
private void handleSyncStatusUpdate(final SyncStatus syncStatus) {
if (syncStatus != null) {
long blocksBehind = syncStatus.getHighestBlock() - syncStatus.getCurrentBlock();
if (blocksBehind <= 0) {
synchronized (this) {
if (!hasReachedSync) {
syncStatusObserverId.ifPresent(
id -> {
synchronizer.removeObserver(id);
syncStatusObserverId = OptionalLong.empty();
});
hasReachedSync = true;
}
}
private void handleInSyncEvent(final boolean isInSync) {
if (isInSync) {
if (hasReachedSync.compareAndSet(false, true)) {
synchronizer.unsubscribeInSync(inSyncSubscriberId);
}
}
}
@ -104,7 +92,7 @@ public class SyncStatusNodePermissioningProvider implements NodePermissioningPro
*/
@Override
public boolean isPermitted(final EnodeURL sourceEnode, final EnodeURL destinationEnode) {
if (hasReachedSync) {
if (hasReachedSync.get()) {
return true;
} else {
checkCounter.inc();
@ -119,6 +107,6 @@ public class SyncStatusNodePermissioningProvider implements NodePermissioningPro
}
public boolean hasReachedSync() {
return hasReachedSync;
return hasReachedSync.get();
}
}

@ -21,11 +21,10 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.core.SyncStatus;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.core.Synchronizer.InSyncListener;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
@ -62,14 +61,16 @@ public class SyncStatusNodePermissioningProviderTest {
@Mock private Synchronizer synchronizer;
private Collection<EnodeURL> bootnodes = new ArrayList<>();
private SyncStatusNodePermissioningProvider provider;
private SyncStatusListener syncStatusListener;
private long syncStatusObserverId = 1L;
private InSyncListener inSyncListener;
@Before
public void before() {
final ArgumentCaptor<SyncStatusListener> captor =
ArgumentCaptor.forClass(SyncStatusListener.class);
when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(syncStatusObserverId);
final ArgumentCaptor<InSyncListener> inSyncSubscriberCaptor =
ArgumentCaptor.forClass(InSyncListener.class);
final ArgumentCaptor<Long> syncToleranceCaptor = ArgumentCaptor.forClass(Long.class);
when(synchronizer.subscribeInSync(
inSyncSubscriberCaptor.capture(), syncToleranceCaptor.capture()))
.thenReturn(1L);
bootnodes.add(bootnode);
@SuppressWarnings("unchecked")
@ -92,7 +93,8 @@ public class SyncStatusNodePermissioningProviderTest {
"Number of times the sync status permissioning provider has been checked and returned unpermitted"))
.thenReturn(checkUnpermittedCounter);
this.provider = new SyncStatusNodePermissioningProvider(synchronizer, bootnodes, metricsSystem);
this.syncStatusListener = captor.getValue();
this.inSyncListener = inSyncSubscriberCaptor.getValue();
assertThat(syncToleranceCaptor.getValue()).isEqualTo(0);
verify(metricsSystem)
.createIntegerGauge(
eq(BesuMetricCategory.PERMISSIONING),
@ -101,12 +103,12 @@ public class SyncStatusNodePermissioningProviderTest {
syncGaugeCallbackCaptor.capture());
this.syncGauge = syncGaugeCallbackCaptor.getValue();
verify(synchronizer).observeSyncStatus(any());
verify(synchronizer).subscribeInSync(any(), eq(0L));
}
@Test
public void whenIsNotInSyncHasReachedSyncShouldReturnFalse() {
syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2));
inSyncListener.onInSyncStatusChange(false);
assertThat(provider.hasReachedSync()).isFalse();
assertThat(syncGauge.getAsInt()).isEqualTo(0);
@ -114,7 +116,7 @@ public class SyncStatusNodePermissioningProviderTest {
@Test
public void whenInSyncHasReachedSyncShouldReturnTrue() {
syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 1));
inSyncListener.onInSyncStatusChange(true);
assertThat(provider.hasReachedSync()).isTrue();
assertThat(syncGauge.getAsInt()).isEqualTo(1);
@ -122,22 +124,21 @@ public class SyncStatusNodePermissioningProviderTest {
@Test
public void whenInSyncChangesFromTrueToFalseHasReachedSyncShouldReturnTrue() {
syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2));
inSyncListener.onInSyncStatusChange(false);
assertThat(provider.hasReachedSync()).isFalse();
assertThat(syncGauge.getAsInt()).isEqualTo(0);
syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 2, 1));
inSyncListener.onInSyncStatusChange(true);
assertThat(provider.hasReachedSync()).isTrue();
assertThat(syncGauge.getAsInt()).isEqualTo(1);
syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 2, 3));
inSyncListener.onInSyncStatusChange(false);
assertThat(provider.hasReachedSync()).isTrue();
assertThat(syncGauge.getAsInt()).isEqualTo(1);
}
@Test
public void whenHasNotSyncedNonBootnodeShouldNotBePermitted() {
syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2));
assertThat(provider.hasReachedSync()).isFalse();
assertThat(syncGauge.getAsInt()).isEqualTo(0);
@ -151,7 +152,6 @@ public class SyncStatusNodePermissioningProviderTest {
@Test
public void whenHasNotSyncedBootnodeIncomingConnectionShouldNotBePermitted() {
syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2));
assertThat(provider.hasReachedSync()).isFalse();
assertThat(syncGauge.getAsInt()).isEqualTo(0);
@ -165,7 +165,48 @@ public class SyncStatusNodePermissioningProviderTest {
@Test
public void whenHasNotSyncedBootnodeOutgoingConnectionShouldBePermitted() {
syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2));
assertThat(provider.hasReachedSync()).isFalse();
assertThat(syncGauge.getAsInt()).isEqualTo(0);
boolean isPermitted = provider.isPermitted(enode1, bootnode);
assertThat(isPermitted).isTrue();
verify(checkCounter, times(1)).inc();
verify(checkPermittedCounter, times(1)).inc();
verify(checkUnpermittedCounter, times(0)).inc();
}
@Test
public void whenOutOfSyncNonBootnodeShouldNotBePermitted() {
inSyncListener.onInSyncStatusChange(false);
assertThat(provider.hasReachedSync()).isFalse();
assertThat(syncGauge.getAsInt()).isEqualTo(0);
boolean isPermitted = provider.isPermitted(enode1, enode2);
assertThat(isPermitted).isFalse();
verify(checkCounter, times(1)).inc();
verify(checkPermittedCounter, times(0)).inc();
verify(checkUnpermittedCounter, times(1)).inc();
}
@Test
public void whenOutOfSyncBootnodeIncomingConnectionShouldNotBePermitted() {
inSyncListener.onInSyncStatusChange(false);
assertThat(provider.hasReachedSync()).isFalse();
assertThat(syncGauge.getAsInt()).isEqualTo(0);
boolean isPermitted = provider.isPermitted(bootnode, enode1);
assertThat(isPermitted).isFalse();
verify(checkCounter, times(1)).inc();
verify(checkPermittedCounter, times(0)).inc();
verify(checkUnpermittedCounter, times(1)).inc();
}
@Test
public void whenOutOfSyncBootnodeOutgoingConnectionShouldBePermitted() {
inSyncListener.onInSyncStatusChange(false);
assertThat(provider.hasReachedSync()).isFalse();
assertThat(syncGauge.getAsInt()).isEqualTo(0);
@ -179,7 +220,7 @@ public class SyncStatusNodePermissioningProviderTest {
@Test
public void whenHasSyncedIsPermittedShouldReturnTrue() {
syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 1));
inSyncListener.onInSyncStatusChange(true);
assertThat(provider.hasReachedSync()).isTrue();
assertThat(syncGauge.getAsInt()).isEqualTo(1);
@ -193,7 +234,7 @@ public class SyncStatusNodePermissioningProviderTest {
@Test
public void syncStatusPermissioningCheckShouldIgnoreEnodeURLDiscoveryPort() {
syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2));
inSyncListener.onInSyncStatusChange(false);
assertThat(provider.hasReachedSync()).isFalse();
final EnodeURL bootnode =

Loading…
Cancel
Save