From 03a0cfad4b0b931ce85177517f92a278f5a10b46 Mon Sep 17 00:00:00 2001 From: Matt Whitehead Date: Wed, 9 Oct 2024 12:07:20 +0100 Subject: [PATCH] Support BFT mining coordinator being temporarily stopped while syncing (#7657) * Support BFT mining coordinator being temporarily stopped while syncing happens Signed-off-by: Matthew Whitehead * Apply same change to IbftBesuControllerBuilder Signed-off-by: Matthew Whitehead * Add changelog entry Signed-off-by: Matthew Whitehead * Add event queue start/stop test Signed-off-by: Matthew Whitehead * Add BFT mining coordinator tests Signed-off-by: Matthew Whitehead * Typo Signed-off-by: Matthew Whitehead * Update consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java Co-authored-by: Sally MacFarlane Signed-off-by: Matt Whitehead * Update consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java Co-authored-by: Sally MacFarlane Signed-off-by: Matt Whitehead --------- Signed-off-by: Matthew Whitehead Signed-off-by: Matt Whitehead Signed-off-by: Matt Whitehead Co-authored-by: Sally MacFarlane --- CHANGELOG.md | 1 + .../controller/IbftBesuControllerBuilder.java | 15 ++++++-- .../controller/QbftBesuControllerBuilder.java | 15 ++++++-- .../consensus/common/bft/BftEventQueue.java | 5 +++ .../consensus/common/bft/BftExecutors.java | 2 +- .../consensus/common/bft/BftProcessor.java | 9 ++++- .../blockcreation/BftMiningCoordinator.java | 13 +++++-- .../common/bft/BftEventQueueTest.java | 34 +++++++++++++++++++ .../BftMiningCoordinatorTest.java | 23 +++++++++++++ 9 files changed, 106 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9e5beb331..762ae66b16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ - Fix reading `tx-pool-min-score` option from configuration file [#7623](https://github.com/hyperledger/besu/pull/7623) - Fix an unhandled PeerTable exception [#7733](https://github.com/hyperledger/besu/issues/7733) - Fix RocksDBException: Busy leading to MerkleTrieException: Unable to load trie node value [#7745](https://github.com/hyperledger/besu/pull/7745) +- If a BFT validator node is syncing, pause block production until sync has completed [#7657](https://github.com/hyperledger/besu/pull/7657) ## 24.9.1 diff --git a/besu/src/main/java/org/hyperledger/besu/controller/IbftBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/IbftBesuControllerBuilder.java index 58412029fc..738dcfc596 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/IbftBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/IbftBesuControllerBuilder.java @@ -253,9 +253,18 @@ public class IbftBesuControllerBuilder extends BftBesuControllerBuilder { .getValue() .getBlockPeriodSeconds())); - if (syncState.isInitialSyncPhaseDone()) { - ibftMiningCoordinator.enable(); - } + syncState.subscribeSyncStatus( + syncStatus -> { + if (syncState.syncTarget().isPresent()) { + // We're syncing so stop doing other stuff + LOG.info("Stopping IBFT mining coordinator while we are syncing"); + ibftMiningCoordinator.stop(); + } else { + LOG.info("Starting IBFT mining coordinator following sync"); + ibftMiningCoordinator.enable(); + ibftMiningCoordinator.start(); + } + }); syncState.subscribeCompletionReached( new BesuEvents.InitialSyncCompletionListener() { diff --git a/besu/src/main/java/org/hyperledger/besu/controller/QbftBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/QbftBesuControllerBuilder.java index 60eac17996..498435e4af 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/QbftBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/QbftBesuControllerBuilder.java @@ -301,9 +301,18 @@ public class QbftBesuControllerBuilder extends BftBesuControllerBuilder { .getEmptyBlockPeriodSeconds()); }); - if (syncState.isInitialSyncPhaseDone()) { - miningCoordinator.enable(); - } + syncState.subscribeSyncStatus( + syncStatus -> { + if (syncState.syncTarget().isPresent()) { + // We're syncing so stop doing other stuff + LOG.info("Stopping QBFT mining coordinator while we are syncing"); + miningCoordinator.stop(); + } else { + LOG.info("Starting QBFT mining coordinator following sync"); + miningCoordinator.enable(); + miningCoordinator.start(); + } + }); syncState.subscribeCompletionReached( new BesuEvents.InitialSyncCompletionListener() { diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java index 4221fd1fac..8f6190c4cf 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java @@ -48,6 +48,11 @@ public class BftEventQueue { started.set(true); } + /** Stop the event queue. No events will be queued for processing until it is started. */ + public void stop() { + started.set(false); + } + private boolean isStarted() { return started.get(); } diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftExecutors.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftExecutors.java index 30038add3d..709d65faaa 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftExecutors.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftExecutors.java @@ -78,7 +78,7 @@ public class BftExecutors { /** Start. */ public synchronized void start() { - if (state != State.IDLE) { + if (state != State.IDLE && state != State.STOPPED) { // Nothing to do return; } diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java index 93df77ea77..81be83b469 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java @@ -44,8 +44,13 @@ public class BftProcessor implements Runnable { this.eventMultiplexer = eventMultiplexer; } + /** Indicate to the processor that it can be started */ + public synchronized void start() { + shutdown = false; + } + /** Indicate to the processor that it should gracefully stop at its next opportunity */ - public void stop() { + public synchronized void stop() { shutdown = true; } @@ -67,6 +72,8 @@ public class BftProcessor implements Runnable { while (!shutdown) { nextEvent().ifPresent(eventMultiplexer::handleBftEvent); } + + incomingQueue.stop(); } catch (final Throwable t) { LOG.error("BFT Mining thread has suffered a fatal error, mining has been halted", t); } diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinator.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinator.java index 83cd61beb3..795a064b6b 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinator.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinator.java @@ -93,7 +93,9 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv @Override public void start() { - if (state.compareAndSet(State.IDLE, State.RUNNING)) { + if (state.compareAndSet(State.IDLE, State.RUNNING) + || state.compareAndSet(State.STOPPED, State.RUNNING)) { + bftProcessor.start(); bftExecutors.start(); blockAddedObserverId = blockchain.observeBlockAdded(this); eventHandler.start(); @@ -110,7 +112,7 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv try { bftProcessor.awaitStop(); } catch (final InterruptedException e) { - LOG.debug("Interrupted while waiting for IbftProcessor to stop.", e); + LOG.debug("Interrupted while waiting for BftProcessor to stop.", e); Thread.currentThread().interrupt(); } bftExecutors.stop(); @@ -135,12 +137,17 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv @Override public boolean disable() { + if (state.get() == State.PAUSED + || state.compareAndSet(State.IDLE, State.PAUSED) + || state.compareAndSet(State.RUNNING, State.PAUSED)) { + return true; + } return false; } @Override public boolean isMining() { - return true; + return state.get() == State.RUNNING; } @Override diff --git a/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/BftEventQueueTest.java b/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/BftEventQueueTest.java index b39bef3a43..6fbf701bc1 100644 --- a/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/BftEventQueueTest.java +++ b/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/BftEventQueueTest.java @@ -134,6 +134,40 @@ public class BftEventQueueTest { assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull(); } + @Test + public void supportsStopAndRestart() throws InterruptedException { + final BftEventQueue queue = new BftEventQueue(MAX_QUEUE_SIZE); + queue.start(); + + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull(); + final DummyBftEvent dummyMessageEvent = new DummyBftEvent(); + final DummyRoundExpiryBftEvent dummyRoundTimerEvent = new DummyRoundExpiryBftEvent(); + final DummyNewChainHeadBftEvent dummyNewChainHeadEvent = new DummyNewChainHeadBftEvent(); + + queue.add(dummyMessageEvent); + queue.add(dummyRoundTimerEvent); + queue.add(dummyNewChainHeadEvent); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull(); + + queue.stop(); + queue.add(dummyMessageEvent); + queue.add(dummyRoundTimerEvent); + queue.add(dummyNewChainHeadEvent); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull(); + + queue.start(); + queue.add(dummyMessageEvent); + queue.add(dummyRoundTimerEvent); + queue.add(dummyNewChainHeadEvent); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull(); + } + @Test public void alwaysAddBlockTimerExpiryEvents() throws InterruptedException { final BftEventQueue queue = new BftEventQueue(MAX_QUEUE_SIZE); diff --git a/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinatorTest.java b/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinatorTest.java index cb0dffba9b..328f9fd7b7 100644 --- a/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinatorTest.java +++ b/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinatorTest.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; import org.hyperledger.besu.consensus.common.bft.BftEventQueue; import org.hyperledger.besu.consensus.common.bft.BftExecutors; @@ -82,6 +83,28 @@ public class BftMiningCoordinatorTest { verify(bftProcessor).stop(); } + @Test + public void restartsMiningAfterStop() { + assertThat(bftMiningCoordinator.isMining()).isFalse(); + bftMiningCoordinator.stop(); + verify(bftProcessor, never()).stop(); + + bftMiningCoordinator.enable(); + bftMiningCoordinator.start(); + assertThat(bftMiningCoordinator.isMining()).isTrue(); + + bftMiningCoordinator.stop(); + assertThat(bftMiningCoordinator.isMining()).isFalse(); + verify(bftProcessor).stop(); + + bftMiningCoordinator.start(); + assertThat(bftMiningCoordinator.isMining()).isTrue(); + + // BFT processor should be started once for every time the mining + // coordinator is restarted + verify(bftProcessor, times(2)).start(); + } + @Test public void getsMinTransactionGasPrice() { final Wei minGasPrice = Wei.of(10);