From 63331cd55db96b13a09fbe3bbed4ef3b09d542d6 Mon Sep 17 00:00:00 2001 From: Justin Florentine Date: Wed, 17 Aug 2022 15:48:53 -0400 Subject: [PATCH] Block prop on first final (#4265) * start filtering peers after 1 finalized instead of 2 * stops counting finalized, and starts filtering on first finalized * DefaultSynchronizer now listens to Forkhoice messages so it can stop block propagation at finalization, as opposed to TTD (previous behavior) Signed-off-by: Justin Florentine --- .../controller/BesuControllerBuilder.java | 49 ++++++++++++++++--- .../ethereum/eth/manager/MergePeerFilter.java | 17 ++++--- .../eth/sync/BlockPropagationManager.java | 16 ++++-- .../eth/sync/DefaultSynchronizer.java | 16 +++++- .../AbstractBlockPropagationManagerTest.java | 23 +++++---- 5 files changed, 92 insertions(+), 29 deletions(-) diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index ae7490b613..c9da9fe9f2 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -411,19 +411,14 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides final PivotBlockSelector pivotBlockSelector = createPivotSelector(protocolContext); final Synchronizer synchronizer = - new DefaultSynchronizer( - syncConfig, + createSynchronizer( protocolSchedule, - protocolContext, worldStateStorage, - ethProtocolManager.getBlockBroadcaster(), + protocolContext, maybePruner, ethContext, syncState, - dataDirectory, - clock, - metricsSystem, - getFullSyncTerminationCondition(protocolContext.getBlockchain()), + ethProtocolManager, pivotBlockSelector); final MiningCoordinator miningCoordinator = @@ -469,6 +464,44 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides additionalPluginServices); } + private Synchronizer createSynchronizer( + final ProtocolSchedule protocolSchedule, + final WorldStateStorage worldStateStorage, + final ProtocolContext protocolContext, + final Optional maybePruner, + final EthContext ethContext, + final SyncState syncState, + final EthProtocolManager ethProtocolManager, + final PivotBlockSelector pivotBlockSelector) { + + final GenesisConfigOptions maybeForTTD = configOptionsSupplier.get(); + + DefaultSynchronizer toUse = + new DefaultSynchronizer( + syncConfig, + protocolSchedule, + protocolContext, + worldStateStorage, + ethProtocolManager.getBlockBroadcaster(), + maybePruner, + ethContext, + syncState, + dataDirectory, + clock, + metricsSystem, + getFullSyncTerminationCondition(protocolContext.getBlockchain()), + pivotBlockSelector); + if (maybeForTTD.getTerminalTotalDifficulty().isPresent()) { + LOG.info( + "TTD present, creating DefaultSynchronizer that stops propagating after finalization"); + protocolContext + .getConsensusContext(MergeContext.class) + .addNewForkchoiceMessageListener(toUse); + } + + return toUse; + } + private PivotBlockSelector createPivotSelector(final ProtocolContext protocolContext) { final PivotSelectorFromPeers pivotSelectorFromPeers = new PivotSelectorFromPeers(syncConfig); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/MergePeerFilter.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/MergePeerFilter.java index a807785f94..bedf48aeaf 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/MergePeerFilter.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/MergePeerFilter.java @@ -26,7 +26,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.StampedLock; import org.slf4j.Logger; @@ -36,8 +36,7 @@ public class MergePeerFilter implements MergeStateHandler, ForkchoiceMessageList private Optional powTerminalDifficulty = Optional.of(Difficulty.MAX_VALUE); private final StampedLock powTerminalDifficultyLock = new StampedLock(); - private Hash lastFinalized = Hash.ZERO; - private final AtomicLong numFinalizedSeen = new AtomicLong(0); + private final AtomicBoolean finalized = new AtomicBoolean(false); private static final Logger LOG = LoggerFactory.getLogger(MergePeerFilter.class); public boolean disconnectIfPoW(final StatusMessage status, final EthPeer peer) { @@ -70,7 +69,7 @@ public class MergePeerFilter implements MergeStateHandler, ForkchoiceMessageList } private boolean isFinalized() { - return this.numFinalizedSeen.get() > 1; + return this.finalized.get(); } @Override @@ -79,10 +78,12 @@ public class MergePeerFilter implements MergeStateHandler, ForkchoiceMessageList final Optional maybeFinalizedBlockHash, final Hash safeBlockHash) { if (maybeFinalizedBlockHash.isPresent() - && !maybeFinalizedBlockHash.get().equals(this.lastFinalized)) { - this.lastFinalized = maybeFinalizedBlockHash.get(); - this.numFinalizedSeen.getAndIncrement(); - LOG.debug("have seen {} finalized blocks", this.numFinalizedSeen); + && !maybeFinalizedBlockHash + .get() + .equals( + Hash.ZERO)) { // forkchoices send finalized as 0 after ttd, but before an epoch is + // finalized + this.finalized.set(true); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java index f57a448695..eec2c4b45a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java @@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync; import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; +import org.hyperledger.besu.consensus.merge.ForkchoiceMessageListener; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.BadBlockManager; @@ -64,7 +65,7 @@ import org.apache.tuweni.bytes.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BlockPropagationManager { +public class BlockPropagationManager implements ForkchoiceMessageListener { private static final Logger LOG = LoggerFactory.getLogger(BlockPropagationManager.class); private final SynchronizerConfiguration config; private final ProtocolSchedule protocolSchedule; @@ -582,8 +583,7 @@ public class BlockPropagationManager { private void reactToTTDReachedEvent(final boolean ttdReached) { if (started.get() && ttdReached) { - LOG.info("Block propagation was running, then ttd reached, stopping"); - stop(); + LOG.info("Block propagation was running, then ttd reached"); } else if (!started.get()) { start(); } @@ -602,4 +602,14 @@ public class BlockPropagationManager { + pendingBlocksManager + '}'; } + + @Override + public void onNewForkchoiceMessage( + final Hash headBlockHash, + final Optional maybeFinalizedBlockHash, + final Hash safeBlockHash) { + if (maybeFinalizedBlockHash.isPresent() && !maybeFinalizedBlockHash.get().equals(Hash.ZERO)) { + stop(); + } + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 34d31764bc..091feca3d5 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -16,6 +16,8 @@ package org.hyperledger.besu.ethereum.eth.sync; import static com.google.common.base.Preconditions.checkNotNull; +import org.hyperledger.besu.consensus.merge.ForkchoiceMessageListener; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.ethereum.eth.manager.EthContext; @@ -45,7 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DefaultSynchronizer implements Synchronizer { +public class DefaultSynchronizer implements Synchronizer, ForkchoiceMessageListener { private static final Logger LOG = LoggerFactory.getLogger(DefaultSynchronizer.class); @@ -302,4 +304,16 @@ public class DefaultSynchronizer implements Synchronizer { running.set(false); return null; } + + @Override + public void onNewForkchoiceMessage( + final Hash headBlockHash, + final Optional maybeFinalizedBlockHash, + final Hash safeBlockHash) { + if (this.blockPropagationManager.isPresent()) { + this.blockPropagationManager + .get() + .onNewForkchoiceMessage(headBlockHash, maybeFinalizedBlockHash, safeBlockHash); + } + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java index 2aec4c302b..ac6aa6550a 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ConsensusContext; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.BadBlockManager; @@ -61,6 +62,7 @@ import org.hyperledger.besu.testutil.TestClock; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -87,6 +89,7 @@ public abstract class AbstractBlockPropagationManagerTest { SynchronizerConfiguration.builder().blockPropagationRange(-10, 30).build())); protected SyncState syncState; protected final MetricsSystem metricsSystem = new NoOpMetricsSystem(); + private final Hash finalizedHash = Hash.fromHexStringLenient("0x1337"); protected void setup(final DataStorageFormat dataStorageFormat) { blockchainUtil = BlockchainSetupUtil.forTesting(dataStorageFormat); @@ -842,25 +845,27 @@ public abstract class AbstractBlockPropagationManagerTest { } @Test - public void shouldStopWhenTTDReached() { + public void shouldStopWhenFinalized() { blockPropagationManager.start(); - syncState.setReachedTerminalDifficulty(true); + // syncState.setReachedTerminalDifficulty(true); + blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null); assertThat(blockPropagationManager.isRunning()).isFalse(); assertThat(ethProtocolManager.ethContext().getEthMessages().messageCodesHandled()) .doesNotContain(EthPV62.NEW_BLOCK_HASHES, EthPV62.NEW_BLOCK); } @Test - public void shouldRestartWhenTTDReachedReturnsFalse() { + public void shouldRestartWhenTTDReachedReturnsFalseAfterFinalizing() { blockPropagationManager.start(); syncState.setReachedTerminalDifficulty(true); + blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null); assertThat(blockPropagationManager.isRunning()).isFalse(); syncState.setReachedTerminalDifficulty(false); assertThat(blockPropagationManager.isRunning()).isTrue(); } @Test - public void shouldNotListenToNewBlockHashesAnnouncementsWhenTTDReached() { + public void shouldNotListenToNewBlockHashesAnnouncementsWhenTTDReachedAndFinal() { blockchainUtil.importFirstBlocks(2); final Block nextBlock = blockchainUtil.getBlock(2); @@ -878,7 +883,7 @@ public abstract class AbstractBlockPropagationManagerTest { final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain()); syncState.setReachedTerminalDifficulty(true); - + blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null); // Broadcast message EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement); peer.respondWhile(responder, peer::hasOutstandingRequests); @@ -888,7 +893,7 @@ public abstract class AbstractBlockPropagationManagerTest { } @Test - public void shouldNotListenToNewBlockAnnouncementsWhenTTDReached() { + public void shouldNotListenToNewBlockAnnouncementsWhenTTDReachedAndFinal() { blockchainUtil.importFirstBlocks(2); final Block nextBlock = blockchainUtil.getBlock(2); @@ -904,7 +909,7 @@ public abstract class AbstractBlockPropagationManagerTest { final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain()); syncState.setReachedTerminalDifficulty(true); - + blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null); // Broadcast message EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement); peer.respondWhile(responder, peer::hasOutstandingRequests); @@ -914,13 +919,13 @@ public abstract class AbstractBlockPropagationManagerTest { } @Test - public void shouldNotListenToBlockAddedEventsWhenTTDReached() { + public void shouldNotListenToBlockAddedEventsWhenTTDReachedAndFinal() { blockchainUtil.importFirstBlocks(2); blockPropagationManager.start(); syncState.setReachedTerminalDifficulty(true); - + blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null); blockchainUtil.importBlockAtIndex(2); assertThat(blockPropagationManager.isRunning()).isFalse();