diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/EventMultiplexer.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/EventMultiplexer.java new file mode 100644 index 0000000000..5f43f959d0 --- /dev/null +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/EventMultiplexer.java @@ -0,0 +1,61 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft; + +import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry; +import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent; +import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent; +import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead; +import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry; +import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class EventMultiplexer { + + private static final Logger LOG = LogManager.getLogger(); + + private final IbftController ibftController; + + public EventMultiplexer(final IbftController ibftController) { + this.ibftController = ibftController; + } + + public void handleIbftEvent(final IbftEvent ibftEvent) { + try { + switch (ibftEvent.getType()) { + case MESSAGE: + final IbftReceivedMessageEvent rxEvent = (IbftReceivedMessageEvent) ibftEvent; + ibftController.handleMessageEvent(rxEvent); + break; + case ROUND_EXPIRY: + final RoundExpiry roundExpiryEvent = (RoundExpiry) ibftEvent; + ibftController.handleRoundExpiry(roundExpiryEvent); + break; + case NEW_CHAIN_HEAD: + final NewChainHead newChainHead = (NewChainHead) ibftEvent; + ibftController.handleNewBlockEvent(newChainHead); + break; + case BLOCK_TIMER_EXPIRY: + final BlockTimerExpiry blockTimerExpiry = (BlockTimerExpiry) ibftEvent; + ibftController.handleBlockTimerExpiry(blockTimerExpiry); + break; + default: + throw new RuntimeException("Illegal event in queue."); + } + } catch (final Exception e) { + LOG.error("State machine threw exception while processing event {" + ibftEvent + "}", e); + } + } +} diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessor.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessor.java index cfb0361393..6dc9c37576 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessor.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessor.java @@ -12,12 +12,7 @@ */ package tech.pegasys.pantheon.consensus.ibft; -import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry; import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent; -import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent; -import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead; -import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry; -import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController; import java.util.Optional; import java.util.concurrent.Executors; @@ -30,39 +25,37 @@ import org.apache.logging.log4j.Logger; /** Execution context for draining queued ibft events and applying them to a maintained state */ public class IbftProcessor implements Runnable { + private static final Logger LOG = LogManager.getLogger(); private final IbftEventQueue incomingQueue; private final ScheduledExecutorService roundTimerExecutor; private volatile boolean shutdown = false; - private final IbftController ibftController; + private final EventMultiplexer eventMultiplexer; /** * Construct a new IbftProcessor * * @param incomingQueue The event queue from which to drain new events - * @param ibftController an object capable of handling any/all IBFT events + * @param eventMultiplexer an object capable of handling any/all IBFT events */ - public IbftProcessor(final IbftEventQueue incomingQueue, final IbftController ibftController) { + public IbftProcessor( + final IbftEventQueue incomingQueue, final EventMultiplexer eventMultiplexer) { // Spawning the round timer with a single thread as we should never have more than 1 timer in // flight at a time - this(incomingQueue, ibftController, Executors.newSingleThreadScheduledExecutor()); + this(incomingQueue, eventMultiplexer, Executors.newSingleThreadScheduledExecutor()); } @VisibleForTesting IbftProcessor( final IbftEventQueue incomingQueue, - final IbftController ibftController, + final EventMultiplexer eventMultiplexer, final ScheduledExecutorService roundTimerExecutor) { this.incomingQueue = incomingQueue; - this.ibftController = ibftController; + this.eventMultiplexer = eventMultiplexer; this.roundTimerExecutor = roundTimerExecutor; } - public void start() { - ibftController.start(); - } - /** Indicate to the processor that it should gracefully stop at its next opportunity */ public void stop() { shutdown = true; @@ -71,39 +64,12 @@ public class IbftProcessor implements Runnable { @Override public void run() { while (!shutdown) { - nextIbftEvent().ifPresent(this::handleIbftEvent); + nextIbftEvent().ifPresent(event -> eventMultiplexer.handleIbftEvent(event)); } // Clean up the executor service the round timer has been utilising roundTimerExecutor.shutdownNow(); } - private void handleIbftEvent(final IbftEvent ibftEvent) { - try { - switch (ibftEvent.getType()) { - case MESSAGE: - final IbftReceivedMessageEvent rxEvent = (IbftReceivedMessageEvent) ibftEvent; - ibftController.handleMessageEvent(rxEvent); - break; - case ROUND_EXPIRY: - final RoundExpiry roundExpiryEvent = (RoundExpiry) ibftEvent; - ibftController.handleRoundExpiry(roundExpiryEvent); - break; - case NEW_CHAIN_HEAD: - final NewChainHead newChainHead = (NewChainHead) ibftEvent; - ibftController.handleNewBlockEvent(newChainHead); - break; - case BLOCK_TIMER_EXPIRY: - final BlockTimerExpiry blockTimerExpiry = (BlockTimerExpiry) ibftEvent; - ibftController.handleBlockTimerExpiry(blockTimerExpiry); - break; - default: - throw new RuntimeException("Illegal event in queue."); - } - } catch (final Exception e) { - LOG.error("State machine threw exception while processing event {" + ibftEvent + "}", e); - } - } - private Optional nextIbftEvent() { try { return Optional.ofNullable(incomingQueue.poll(500, TimeUnit.MILLISECONDS)); diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinator.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinator.java index 54adc989e6..2f75979ec0 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinator.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinator.java @@ -48,10 +48,7 @@ public class IbftMiningCoordinator implements MiningCoordinator, BlockAddedObser } @Override - public void enable() { - ibftProcessor.start(); - // IbftProcessor is implicitly running (but maybe should have a discard" all) - } + public void enable() {} @Override public void disable() { diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessorTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessorTest.java index 5287f2da06..94e2b1a2e0 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessorTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessorTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry; -import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -40,12 +39,12 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.StrictStubs.class) public class IbftProcessorTest { private ScheduledExecutorService mockExecutorService; - private IbftController mockIbftController; + private EventMultiplexer mockeEventMultiplexer; @Before public void initialise() { mockExecutorService = mock(ScheduledExecutorService.class); - mockIbftController = mock(IbftController.class); + mockeEventMultiplexer = mock(EventMultiplexer.class); } @Test @@ -53,7 +52,7 @@ public class IbftProcessorTest { final IbftEventQueue mockQueue = mock(IbftEventQueue.class); Mockito.when(mockQueue.poll(anyLong(), any())).thenReturn(null); final IbftProcessor processor = - new IbftProcessor(mockQueue, mockIbftController, mockExecutorService); + new IbftProcessor(mockQueue, mockeEventMultiplexer, mockExecutorService); // Start the IbftProcessor final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); @@ -82,7 +81,7 @@ public class IbftProcessorTest { @Test public void cleanupExecutorsAfterShutdownNow() throws InterruptedException { final IbftProcessor processor = - new IbftProcessor(new IbftEventQueue(), mockIbftController, mockExecutorService); + new IbftProcessor(new IbftEventQueue(), mockeEventMultiplexer, mockExecutorService); // Start the IbftProcessor final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); @@ -112,7 +111,7 @@ public class IbftProcessorTest { Mockito.when(mockQueue.poll(anyLong(), any())).thenThrow(new InterruptedException()); final IbftProcessor processor = - new IbftProcessor(mockQueue, mockIbftController, mockExecutorService); + new IbftProcessor(mockQueue, mockeEventMultiplexer, mockExecutorService); // Start the IbftProcessor final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); @@ -145,7 +144,7 @@ public class IbftProcessorTest { public void drainEventsIntoStateMachine() throws InterruptedException { final IbftEventQueue queue = new IbftEventQueue(); final IbftProcessor processor = - new IbftProcessor(queue, mockIbftController, mockExecutorService); + new IbftProcessor(queue, mockeEventMultiplexer, mockExecutorService); // Start the IbftProcessor final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); @@ -161,6 +160,6 @@ public class IbftProcessorTest { processor.stop(); processorExecutor.shutdown(); - verify(mockIbftController, times(2)).handleRoundExpiry(eq(roundExpiryEvent)); + verify(mockeEventMultiplexer, times(2)).handleIbftEvent(eq(roundExpiryEvent)); } } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinatorTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinatorTest.java index ef371e51fb..563d7d6a7f 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinatorTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinatorTest.java @@ -59,7 +59,6 @@ public class IbftMiningCoordinatorTest { @Test public void enablesMining() { ibftMiningCoordinator.enable(); - verify(ibftProcessor).start(); } @Test diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java index d604bf1443..90057534c6 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java @@ -22,6 +22,7 @@ import tech.pegasys.pantheon.consensus.common.VoteProposer; import tech.pegasys.pantheon.consensus.common.VoteTally; import tech.pegasys.pantheon.consensus.common.VoteTallyUpdater; import tech.pegasys.pantheon.consensus.ibft.BlockTimer; +import tech.pegasys.pantheon.consensus.ibft.EventMultiplexer; import tech.pegasys.pantheon.consensus.ibft.IbftBlockInterface; import tech.pegasys.pantheon.consensus.ibft.IbftContext; import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue; @@ -236,8 +237,10 @@ public class IbftPantheonController implements PantheonController { new IbftRoundFactory( finalState, protocolContext, protocolSchedule, minedBlockObservers), messageValidatorFactory)); + ibftController.start(); - final IbftProcessor ibftProcessor = new IbftProcessor(ibftEventQueue, ibftController); + final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController); + final IbftProcessor ibftProcessor = new IbftProcessor(ibftEventQueue, eventMultiplexer); final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); processorExecutor.submit(ibftProcessor);