Support BFT mining coordinator being temporarily stopped while syncing (#7657)

* Support BFT mining coordinator being temporarily stopped while syncing happens

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Apply same change to IbftBesuControllerBuilder

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Add changelog entry

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Add event queue start/stop test

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Add BFT mining coordinator tests

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Typo

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Update consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java

Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
Signed-off-by: Matt Whitehead <matthew1001@hotmail.com>

* Update consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java

Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
Signed-off-by: Matt Whitehead <matthew1001@hotmail.com>

---------

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>
Signed-off-by: Matt Whitehead <matthew1001@hotmail.com>
Signed-off-by: Matt Whitehead <matthew.whitehead@kaleido.io>
Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
pull/7752/head
Matt Whitehead 1 month ago committed by GitHub
parent e4c1b5991c
commit 03a0cfad4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 11
      besu/src/main/java/org/hyperledger/besu/controller/IbftBesuControllerBuilder.java
  3. 11
      besu/src/main/java/org/hyperledger/besu/controller/QbftBesuControllerBuilder.java
  4. 5
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java
  5. 2
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftExecutors.java
  6. 9
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java
  7. 13
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinator.java
  8. 34
      consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/BftEventQueueTest.java
  9. 23
      consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinatorTest.java

@ -30,6 +30,7 @@
- Fix reading `tx-pool-min-score` option from configuration file [#7623](https://github.com/hyperledger/besu/pull/7623) - Fix reading `tx-pool-min-score` option from configuration file [#7623](https://github.com/hyperledger/besu/pull/7623)
- Fix an unhandled PeerTable exception [#7733](https://github.com/hyperledger/besu/issues/7733) - Fix an unhandled PeerTable exception [#7733](https://github.com/hyperledger/besu/issues/7733)
- Fix RocksDBException: Busy leading to MerkleTrieException: Unable to load trie node value [#7745](https://github.com/hyperledger/besu/pull/7745) - Fix RocksDBException: Busy leading to MerkleTrieException: Unable to load trie node value [#7745](https://github.com/hyperledger/besu/pull/7745)
- If a BFT validator node is syncing, pause block production until sync has completed [#7657](https://github.com/hyperledger/besu/pull/7657)
## 24.9.1 ## 24.9.1

@ -253,9 +253,18 @@ public class IbftBesuControllerBuilder extends BftBesuControllerBuilder {
.getValue() .getValue()
.getBlockPeriodSeconds())); .getBlockPeriodSeconds()));
if (syncState.isInitialSyncPhaseDone()) { syncState.subscribeSyncStatus(
syncStatus -> {
if (syncState.syncTarget().isPresent()) {
// We're syncing so stop doing other stuff
LOG.info("Stopping IBFT mining coordinator while we are syncing");
ibftMiningCoordinator.stop();
} else {
LOG.info("Starting IBFT mining coordinator following sync");
ibftMiningCoordinator.enable(); ibftMiningCoordinator.enable();
ibftMiningCoordinator.start();
} }
});
syncState.subscribeCompletionReached( syncState.subscribeCompletionReached(
new BesuEvents.InitialSyncCompletionListener() { new BesuEvents.InitialSyncCompletionListener() {

@ -301,9 +301,18 @@ public class QbftBesuControllerBuilder extends BftBesuControllerBuilder {
.getEmptyBlockPeriodSeconds()); .getEmptyBlockPeriodSeconds());
}); });
if (syncState.isInitialSyncPhaseDone()) { syncState.subscribeSyncStatus(
syncStatus -> {
if (syncState.syncTarget().isPresent()) {
// We're syncing so stop doing other stuff
LOG.info("Stopping QBFT mining coordinator while we are syncing");
miningCoordinator.stop();
} else {
LOG.info("Starting QBFT mining coordinator following sync");
miningCoordinator.enable(); miningCoordinator.enable();
miningCoordinator.start();
} }
});
syncState.subscribeCompletionReached( syncState.subscribeCompletionReached(
new BesuEvents.InitialSyncCompletionListener() { new BesuEvents.InitialSyncCompletionListener() {

@ -48,6 +48,11 @@ public class BftEventQueue {
started.set(true); started.set(true);
} }
/** Stop the event queue. No events will be queued for processing until it is started. */
public void stop() {
started.set(false);
}
private boolean isStarted() { private boolean isStarted() {
return started.get(); return started.get();
} }

@ -78,7 +78,7 @@ public class BftExecutors {
/** Start. */ /** Start. */
public synchronized void start() { public synchronized void start() {
if (state != State.IDLE) { if (state != State.IDLE && state != State.STOPPED) {
// Nothing to do // Nothing to do
return; return;
} }

@ -44,8 +44,13 @@ public class BftProcessor implements Runnable {
this.eventMultiplexer = eventMultiplexer; this.eventMultiplexer = eventMultiplexer;
} }
/** Indicate to the processor that it can be started */
public synchronized void start() {
shutdown = false;
}
/** Indicate to the processor that it should gracefully stop at its next opportunity */ /** Indicate to the processor that it should gracefully stop at its next opportunity */
public void stop() { public synchronized void stop() {
shutdown = true; shutdown = true;
} }
@ -67,6 +72,8 @@ public class BftProcessor implements Runnable {
while (!shutdown) { while (!shutdown) {
nextEvent().ifPresent(eventMultiplexer::handleBftEvent); nextEvent().ifPresent(eventMultiplexer::handleBftEvent);
} }
incomingQueue.stop();
} catch (final Throwable t) { } catch (final Throwable t) {
LOG.error("BFT Mining thread has suffered a fatal error, mining has been halted", t); LOG.error("BFT Mining thread has suffered a fatal error, mining has been halted", t);
} }

@ -93,7 +93,9 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv
@Override @Override
public void start() { public void start() {
if (state.compareAndSet(State.IDLE, State.RUNNING)) { if (state.compareAndSet(State.IDLE, State.RUNNING)
|| state.compareAndSet(State.STOPPED, State.RUNNING)) {
bftProcessor.start();
bftExecutors.start(); bftExecutors.start();
blockAddedObserverId = blockchain.observeBlockAdded(this); blockAddedObserverId = blockchain.observeBlockAdded(this);
eventHandler.start(); eventHandler.start();
@ -110,7 +112,7 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv
try { try {
bftProcessor.awaitStop(); bftProcessor.awaitStop();
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
LOG.debug("Interrupted while waiting for IbftProcessor to stop.", e); LOG.debug("Interrupted while waiting for BftProcessor to stop.", e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
bftExecutors.stop(); bftExecutors.stop();
@ -135,12 +137,17 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv
@Override @Override
public boolean disable() { public boolean disable() {
if (state.get() == State.PAUSED
|| state.compareAndSet(State.IDLE, State.PAUSED)
|| state.compareAndSet(State.RUNNING, State.PAUSED)) {
return true;
}
return false; return false;
} }
@Override @Override
public boolean isMining() { public boolean isMining() {
return true; return state.get() == State.RUNNING;
} }
@Override @Override

@ -134,6 +134,40 @@ public class BftEventQueueTest {
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull(); assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
} }
@Test
public void supportsStopAndRestart() throws InterruptedException {
final BftEventQueue queue = new BftEventQueue(MAX_QUEUE_SIZE);
queue.start();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
final DummyBftEvent dummyMessageEvent = new DummyBftEvent();
final DummyRoundExpiryBftEvent dummyRoundTimerEvent = new DummyRoundExpiryBftEvent();
final DummyNewChainHeadBftEvent dummyNewChainHeadEvent = new DummyNewChainHeadBftEvent();
queue.add(dummyMessageEvent);
queue.add(dummyRoundTimerEvent);
queue.add(dummyNewChainHeadEvent);
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
queue.stop();
queue.add(dummyMessageEvent);
queue.add(dummyRoundTimerEvent);
queue.add(dummyNewChainHeadEvent);
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
queue.start();
queue.add(dummyMessageEvent);
queue.add(dummyRoundTimerEvent);
queue.add(dummyNewChainHeadEvent);
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
}
@Test @Test
public void alwaysAddBlockTimerExpiryEvents() throws InterruptedException { public void alwaysAddBlockTimerExpiryEvents() throws InterruptedException {
final BftEventQueue queue = new BftEventQueue(MAX_QUEUE_SIZE); final BftEventQueue queue = new BftEventQueue(MAX_QUEUE_SIZE);

@ -19,6 +19,7 @@ import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import org.hyperledger.besu.consensus.common.bft.BftEventQueue; import org.hyperledger.besu.consensus.common.bft.BftEventQueue;
import org.hyperledger.besu.consensus.common.bft.BftExecutors; import org.hyperledger.besu.consensus.common.bft.BftExecutors;
@ -82,6 +83,28 @@ public class BftMiningCoordinatorTest {
verify(bftProcessor).stop(); verify(bftProcessor).stop();
} }
@Test
public void restartsMiningAfterStop() {
assertThat(bftMiningCoordinator.isMining()).isFalse();
bftMiningCoordinator.stop();
verify(bftProcessor, never()).stop();
bftMiningCoordinator.enable();
bftMiningCoordinator.start();
assertThat(bftMiningCoordinator.isMining()).isTrue();
bftMiningCoordinator.stop();
assertThat(bftMiningCoordinator.isMining()).isFalse();
verify(bftProcessor).stop();
bftMiningCoordinator.start();
assertThat(bftMiningCoordinator.isMining()).isTrue();
// BFT processor should be started once for every time the mining
// coordinator is restarted
verify(bftProcessor, times(2)).start();
}
@Test @Test
public void getsMinTransactionGasPrice() { public void getsMinTransactionGasPrice() {
final Wei minGasPrice = Wei.of(10); final Wei minGasPrice = Wei.of(10);

Loading…
Cancel
Save