Block prop on first final (#4265)

* start filtering peers after 1 finalized instead of 2
* stops counting finalized, and starts filtering on first finalized
* DefaultSynchronizer now listens to Forkhoice messages so it can stop block propagation at finalization, as opposed to TTD (previous behavior)

Signed-off-by: Justin Florentine <justin+github@florentine.us>
pull/4277/head
Justin Florentine 2 years ago committed by GitHub
parent 18078337b2
commit 63331cd55d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 49
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  2. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/MergePeerFilter.java
  3. 16
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java
  4. 16
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  5. 23
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java

@ -411,19 +411,14 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final PivotBlockSelector pivotBlockSelector = createPivotSelector(protocolContext);
final Synchronizer synchronizer =
new DefaultSynchronizer(
syncConfig,
createSynchronizer(
protocolSchedule,
protocolContext,
worldStateStorage,
ethProtocolManager.getBlockBroadcaster(),
protocolContext,
maybePruner,
ethContext,
syncState,
dataDirectory,
clock,
metricsSystem,
getFullSyncTerminationCondition(protocolContext.getBlockchain()),
ethProtocolManager,
pivotBlockSelector);
final MiningCoordinator miningCoordinator =
@ -469,6 +464,44 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
additionalPluginServices);
}
private Synchronizer createSynchronizer(
final ProtocolSchedule protocolSchedule,
final WorldStateStorage worldStateStorage,
final ProtocolContext protocolContext,
final Optional<Pruner> maybePruner,
final EthContext ethContext,
final SyncState syncState,
final EthProtocolManager ethProtocolManager,
final PivotBlockSelector pivotBlockSelector) {
final GenesisConfigOptions maybeForTTD = configOptionsSupplier.get();
DefaultSynchronizer toUse =
new DefaultSynchronizer(
syncConfig,
protocolSchedule,
protocolContext,
worldStateStorage,
ethProtocolManager.getBlockBroadcaster(),
maybePruner,
ethContext,
syncState,
dataDirectory,
clock,
metricsSystem,
getFullSyncTerminationCondition(protocolContext.getBlockchain()),
pivotBlockSelector);
if (maybeForTTD.getTerminalTotalDifficulty().isPresent()) {
LOG.info(
"TTD present, creating DefaultSynchronizer that stops propagating after finalization");
protocolContext
.getConsensusContext(MergeContext.class)
.addNewForkchoiceMessageListener(toUse);
}
return toUse;
}
private PivotBlockSelector createPivotSelector(final ProtocolContext protocolContext) {
final PivotSelectorFromPeers pivotSelectorFromPeers = new PivotSelectorFromPeers(syncConfig);

@ -26,7 +26,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.StampedLock;
import org.slf4j.Logger;
@ -36,8 +36,7 @@ public class MergePeerFilter implements MergeStateHandler, ForkchoiceMessageList
private Optional<Difficulty> powTerminalDifficulty = Optional.of(Difficulty.MAX_VALUE);
private final StampedLock powTerminalDifficultyLock = new StampedLock();
private Hash lastFinalized = Hash.ZERO;
private final AtomicLong numFinalizedSeen = new AtomicLong(0);
private final AtomicBoolean finalized = new AtomicBoolean(false);
private static final Logger LOG = LoggerFactory.getLogger(MergePeerFilter.class);
public boolean disconnectIfPoW(final StatusMessage status, final EthPeer peer) {
@ -70,7 +69,7 @@ public class MergePeerFilter implements MergeStateHandler, ForkchoiceMessageList
}
private boolean isFinalized() {
return this.numFinalizedSeen.get() > 1;
return this.finalized.get();
}
@Override
@ -79,10 +78,12 @@ public class MergePeerFilter implements MergeStateHandler, ForkchoiceMessageList
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash) {
if (maybeFinalizedBlockHash.isPresent()
&& !maybeFinalizedBlockHash.get().equals(this.lastFinalized)) {
this.lastFinalized = maybeFinalizedBlockHash.get();
this.numFinalizedSeen.getAndIncrement();
LOG.debug("have seen {} finalized blocks", this.numFinalizedSeen);
&& !maybeFinalizedBlockHash
.get()
.equals(
Hash.ZERO)) { // forkchoices send finalized as 0 after ttd, but before an epoch is
// finalized
this.finalized.set(true);
}
}

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.consensus.merge.ForkchoiceMessageListener;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
@ -64,7 +65,7 @@ import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BlockPropagationManager {
public class BlockPropagationManager implements ForkchoiceMessageListener {
private static final Logger LOG = LoggerFactory.getLogger(BlockPropagationManager.class);
private final SynchronizerConfiguration config;
private final ProtocolSchedule protocolSchedule;
@ -582,8 +583,7 @@ public class BlockPropagationManager {
private void reactToTTDReachedEvent(final boolean ttdReached) {
if (started.get() && ttdReached) {
LOG.info("Block propagation was running, then ttd reached, stopping");
stop();
LOG.info("Block propagation was running, then ttd reached");
} else if (!started.get()) {
start();
}
@ -602,4 +602,14 @@ public class BlockPropagationManager {
+ pendingBlocksManager
+ '}';
}
@Override
public void onNewForkchoiceMessage(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash) {
if (maybeFinalizedBlockHash.isPresent() && !maybeFinalizedBlockHash.get().equals(Hash.ZERO)) {
stop();
}
}
}

@ -16,6 +16,8 @@ package org.hyperledger.besu.ethereum.eth.sync;
import static com.google.common.base.Preconditions.checkNotNull;
import org.hyperledger.besu.consensus.merge.ForkchoiceMessageListener;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
@ -45,7 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultSynchronizer implements Synchronizer {
public class DefaultSynchronizer implements Synchronizer, ForkchoiceMessageListener {
private static final Logger LOG = LoggerFactory.getLogger(DefaultSynchronizer.class);
@ -302,4 +304,16 @@ public class DefaultSynchronizer implements Synchronizer {
running.set(false);
return null;
}
@Override
public void onNewForkchoiceMessage(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash) {
if (this.blockPropagationManager.isPresent()) {
this.blockPropagationManager
.get()
.onNewForkchoiceMessage(headBlockHash, maybeFinalizedBlockHash, safeBlockHash);
}
}
}

@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ConsensusContext;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
@ -61,6 +62,7 @@ import org.hyperledger.besu.testutil.TestClock;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
@ -87,6 +89,7 @@ public abstract class AbstractBlockPropagationManagerTest {
SynchronizerConfiguration.builder().blockPropagationRange(-10, 30).build()));
protected SyncState syncState;
protected final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final Hash finalizedHash = Hash.fromHexStringLenient("0x1337");
protected void setup(final DataStorageFormat dataStorageFormat) {
blockchainUtil = BlockchainSetupUtil.forTesting(dataStorageFormat);
@ -842,25 +845,27 @@ public abstract class AbstractBlockPropagationManagerTest {
}
@Test
public void shouldStopWhenTTDReached() {
public void shouldStopWhenFinalized() {
blockPropagationManager.start();
syncState.setReachedTerminalDifficulty(true);
// syncState.setReachedTerminalDifficulty(true);
blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null);
assertThat(blockPropagationManager.isRunning()).isFalse();
assertThat(ethProtocolManager.ethContext().getEthMessages().messageCodesHandled())
.doesNotContain(EthPV62.NEW_BLOCK_HASHES, EthPV62.NEW_BLOCK);
}
@Test
public void shouldRestartWhenTTDReachedReturnsFalse() {
public void shouldRestartWhenTTDReachedReturnsFalseAfterFinalizing() {
blockPropagationManager.start();
syncState.setReachedTerminalDifficulty(true);
blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null);
assertThat(blockPropagationManager.isRunning()).isFalse();
syncState.setReachedTerminalDifficulty(false);
assertThat(blockPropagationManager.isRunning()).isTrue();
}
@Test
public void shouldNotListenToNewBlockHashesAnnouncementsWhenTTDReached() {
public void shouldNotListenToNewBlockHashesAnnouncementsWhenTTDReachedAndFinal() {
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
@ -878,7 +883,7 @@ public abstract class AbstractBlockPropagationManagerTest {
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());
syncState.setReachedTerminalDifficulty(true);
blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null);
// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement);
peer.respondWhile(responder, peer::hasOutstandingRequests);
@ -888,7 +893,7 @@ public abstract class AbstractBlockPropagationManagerTest {
}
@Test
public void shouldNotListenToNewBlockAnnouncementsWhenTTDReached() {
public void shouldNotListenToNewBlockAnnouncementsWhenTTDReachedAndFinal() {
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
@ -904,7 +909,7 @@ public abstract class AbstractBlockPropagationManagerTest {
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());
syncState.setReachedTerminalDifficulty(true);
blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null);
// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement);
peer.respondWhile(responder, peer::hasOutstandingRequests);
@ -914,13 +919,13 @@ public abstract class AbstractBlockPropagationManagerTest {
}
@Test
public void shouldNotListenToBlockAddedEventsWhenTTDReached() {
public void shouldNotListenToBlockAddedEventsWhenTTDReachedAndFinal() {
blockchainUtil.importFirstBlocks(2);
blockPropagationManager.start();
syncState.setReachedTerminalDifficulty(true);
blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null);
blockchainUtil.importBlockAtIndex(2);
assertThat(blockPropagationManager.isRunning()).isFalse();

Loading…
Cancel
Save