Migrating mining coordinator duplicate event (#3098)

Unsubscribe the active delegate mining coordinator from BlockAddedEvents

The MigratingMiningCoordinator acts as the observer on its behalf

Signed-off-by: Simon Dudley <simon.dudley@consensys.net>
pull/3151/head
Simon Dudley 3 years ago committed by GitHub
parent d7c58ec8f5
commit 9147b67fad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/MigratingMiningCoordinator.java
  2. 5
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinator.java
  3. 84
      consensus/common/src/test/java/org/hyperledger/besu/consensus/common/MigratingMiningCoordinatorTest.java
  4. 2
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BlockAddedObserver.java

@ -54,7 +54,14 @@ public class MigratingMiningCoordinator implements MiningCoordinator, BlockAdded
@Override
public void start() {
blockAddedObserverId = blockchain.observeBlockAdded(this);
startActiveMiningCoordinator();
}
private void startActiveMiningCoordinator() {
activeMiningCoordinator.start();
if (activeMiningCoordinator instanceof BlockAddedObserver) {
((BlockAddedObserver) activeMiningCoordinator).removeObserver();
}
}
@Override
@ -118,13 +125,13 @@ public class MigratingMiningCoordinator implements MiningCoordinator, BlockAdded
miningCoordinatorSchedule.getFork(currentBlock + 1).getValue();
if (activeMiningCoordinator != nextMiningCoordinator) {
LOG.trace(
"Switching mining coordinator at block {} from {} to {}",
"Migrating mining coordinator at block {} from {} to {}",
currentBlock,
activeMiningCoordinator.getClass().getSimpleName(),
nextMiningCoordinator.getClass().getSimpleName());
activeMiningCoordinator.stop();
nextMiningCoordinator.start();
activeMiningCoordinator = nextMiningCoordinator;
startActiveMiningCoordinator();
}
if (activeMiningCoordinator instanceof BlockAddedObserver) {
((BlockAddedObserver) activeMiningCoordinator).onBlockAdded(event);

@ -156,4 +156,9 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv
eventQueue.add(new NewChainHead(event.getBlock().getHeader()));
}
}
@Override
public void removeObserver() {
blockchain.removeObserver(blockAddedObserverId);
}
}

@ -16,12 +16,18 @@ package org.hyperledger.besu.consensus.common;
import static java.util.Collections.emptyList;
import static org.hyperledger.besu.ethereum.core.BlockHeader.GENESIS_BLOCK_NUMBER;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.consensus.common.bft.BftEventQueue;
import org.hyperledger.besu.consensus.common.bft.BftExecutors;
import org.hyperledger.besu.consensus.common.bft.BftProcessor;
import org.hyperledger.besu.consensus.common.bft.blockcreation.BftBlockCreatorFactory;
import org.hyperledger.besu.consensus.common.bft.blockcreation.BftMiningCoordinator;
import org.hyperledger.besu.consensus.common.bft.statemachine.BftEventHandler;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator;
import org.hyperledger.besu.ethereum.blockcreation.NoopMiningCoordinator;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
@ -45,40 +51,68 @@ public class MigratingMiningCoordinatorTest {
@Mock private BftMiningCoordinator coordinator1;
@Mock private BftMiningCoordinator coordinator2;
@Mock private NoopMiningCoordinator noopCoordinator;
@Mock private Blockchain blockchain;
@Mock private BlockHeader blockHeader;
@Mock private BlockBody blockBody;
private Block block;
private BlockAddedEvent blockEvent;
private ForksSchedule<MiningCoordinator> miningCoordinatorSchedule;
private ForksSchedule<MiningCoordinator> coordinatorSchedule;
private static final long MIGRATION_BLOCK_NUMBER = 5L;
@Before
public void setup() {
coordinatorSchedule = createCoordinatorSchedule(coordinator1, coordinator2);
final Block block = new Block(blockHeader, blockBody);
blockEvent = BlockAddedEvent.createForHeadAdvancement(block, emptyList(), emptyList());
}
private ForksSchedule<MiningCoordinator> createCoordinatorSchedule(
final MiningCoordinator genesisCoordinator, final MiningCoordinator migrateToCoordinator) {
final ForkSpec<MiningCoordinator> genesisFork =
new ForkSpec<>(GENESIS_BLOCK_NUMBER, coordinator1);
new ForkSpec<>(GENESIS_BLOCK_NUMBER, genesisCoordinator);
final ForkSpec<MiningCoordinator> migrationFork =
new ForkSpec<>(MIGRATION_BLOCK_NUMBER, coordinator2);
miningCoordinatorSchedule = new ForksSchedule<>(List.of(genesisFork, migrationFork));
this.block = new Block(blockHeader, blockBody);
blockEvent = BlockAddedEvent.createForHeadAdvancement(this.block, emptyList(), emptyList());
new ForkSpec<>(MIGRATION_BLOCK_NUMBER, migrateToCoordinator);
return new ForksSchedule<>(List.of(genesisFork, migrationFork));
}
@Test
public void startShouldRegisterThisCoordinatorAsObserver() {
final MigratingMiningCoordinator coordinator =
new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain);
new MigratingMiningCoordinator(coordinatorSchedule, blockchain);
coordinator.start();
verify(blockchain).observeBlockAdded(coordinator);
}
@Test
public void startShouldUnregisterDelegateCoordinatorAsObserver() {
final BftMiningCoordinator delegateCoordinator = createDelegateCoordinator();
when(blockchain.observeBlockAdded(delegateCoordinator)).thenReturn(1L);
final MigratingMiningCoordinator coordinator =
new MigratingMiningCoordinator(
createCoordinatorSchedule(delegateCoordinator, coordinator2), blockchain);
coordinator.start();
verify(blockchain).observeBlockAdded(coordinator);
verify(blockchain).observeBlockAdded(delegateCoordinator);
verify(blockchain).removeObserver(1L);
}
private BftMiningCoordinator createDelegateCoordinator() {
return new BftMiningCoordinator(
mock(BftExecutors.class),
mock(BftEventHandler.class),
mock(BftProcessor.class),
mock(BftBlockCreatorFactory.class),
blockchain,
mock(BftEventQueue.class));
}
@Test
public void stopShouldUnregisterThisCoordinatorAsObserver() {
final MigratingMiningCoordinator coordinator =
new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain);
new MigratingMiningCoordinator(coordinatorSchedule, blockchain);
when(blockchain.observeBlockAdded(coordinator)).thenReturn(1L);
coordinator.start();
@ -87,11 +121,25 @@ public class MigratingMiningCoordinatorTest {
verify(blockchain).removeObserver(1L);
}
@Test
public void onBlockAddedShouldMigrateToNextDelegateAndRemoveItAsObserver() {
final BftMiningCoordinator delegateCoordinator = createDelegateCoordinator();
when(blockHeader.getNumber()).thenReturn(MIGRATION_BLOCK_NUMBER - 1);
when(blockchain.observeBlockAdded(delegateCoordinator)).thenReturn(2L);
new MigratingMiningCoordinator(
createCoordinatorSchedule(coordinator1, delegateCoordinator), blockchain)
.onBlockAdded(blockEvent);
verify(blockchain).observeBlockAdded(delegateCoordinator);
verify(blockchain).removeObserver(2L);
}
@Test
public void onBlockAddedShouldMigrateToNextMiningCoordinatorAndDelegate() {
when(blockHeader.getNumber()).thenReturn(MIGRATION_BLOCK_NUMBER - 1);
new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain).onBlockAdded(blockEvent);
new MigratingMiningCoordinator(coordinatorSchedule, blockchain).onBlockAdded(blockEvent);
verify(coordinator1).stop();
verify(coordinator2).start();
@ -100,13 +148,13 @@ public class MigratingMiningCoordinatorTest {
@Test
public void onBlockAddedShouldNotDelegateWhenDelegateIsNoop() {
ForksSchedule<MiningCoordinator> noopCoordinatorSchedule =
new ForksSchedule<>(List.of(new ForkSpec<>(GENESIS_BLOCK_NUMBER, noopCoordinator)));
NoopMiningCoordinator mockNoopCoordinator = mock(NoopMiningCoordinator.class);
coordinatorSchedule = createCoordinatorSchedule(mockNoopCoordinator, coordinator2);
when(blockHeader.getNumber()).thenReturn(GENESIS_BLOCK_NUMBER);
new MigratingMiningCoordinator(noopCoordinatorSchedule, blockchain).onBlockAdded(blockEvent);
new MigratingMiningCoordinator(coordinatorSchedule, blockchain).onBlockAdded(blockEvent);
verifyNoInteractions(noopCoordinator);
verifyNoInteractions(mockNoopCoordinator);
}
@Test
@ -178,7 +226,7 @@ public class MigratingMiningCoordinatorTest {
final MiningCoordinator expectedInactiveCoordinator) {
when(blockchain.getChainHeadBlockNumber()).thenReturn(blockHeight);
methodUnderTest.accept(new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain));
methodUnderTest.accept(new MigratingMiningCoordinator(coordinatorSchedule, blockchain));
methodUnderTest.accept(verify(expectedActiveCoordinator));
verifyNoInteractions(expectedInactiveCoordinator);
@ -198,7 +246,7 @@ public class MigratingMiningCoordinatorTest {
throws InterruptedException {
when(blockchain.getChainHeadBlockNumber()).thenReturn(blockHeight);
new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain).awaitStop();
new MigratingMiningCoordinator(coordinatorSchedule, blockchain).awaitStop();
verify(expectedActiveCoordinator).awaitStop();
verifyNoInteractions(expectedInactiveCoordinator);
@ -218,7 +266,7 @@ public class MigratingMiningCoordinatorTest {
when(blockchain.getChainHeadBlockNumber()).thenReturn(blockHeight);
when(blockHeader.getNumber()).thenReturn(blockHeight);
new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain).onBlockAdded(blockEvent);
new MigratingMiningCoordinator(coordinatorSchedule, blockchain).onBlockAdded(blockEvent);
verify(expectedActiveCoordinator).onBlockAdded(blockEvent);
verifyNoInteractions(expectedInactiveCoordinator);

@ -17,4 +17,6 @@ package org.hyperledger.besu.ethereum.chain;
@FunctionalInterface
public interface BlockAddedObserver {
void onBlockAdded(BlockAddedEvent event);
default void removeObserver() {}
}

Loading…
Cancel
Save