Split IbftProcessor into looping and event processing (#612)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
tmohay 6 years ago committed by GitHub
parent 300556c5fd
commit f0f8c54501
  1. 61
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/EventMultiplexer.java
  2. 52
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessor.java
  3. 5
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinator.java
  4. 15
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessorTest.java
  5. 1
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinatorTest.java
  6. 5
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java

@ -0,0 +1,61 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class EventMultiplexer {
private static final Logger LOG = LogManager.getLogger();
private final IbftController ibftController;
public EventMultiplexer(final IbftController ibftController) {
this.ibftController = ibftController;
}
public void handleIbftEvent(final IbftEvent ibftEvent) {
try {
switch (ibftEvent.getType()) {
case MESSAGE:
final IbftReceivedMessageEvent rxEvent = (IbftReceivedMessageEvent) ibftEvent;
ibftController.handleMessageEvent(rxEvent);
break;
case ROUND_EXPIRY:
final RoundExpiry roundExpiryEvent = (RoundExpiry) ibftEvent;
ibftController.handleRoundExpiry(roundExpiryEvent);
break;
case NEW_CHAIN_HEAD:
final NewChainHead newChainHead = (NewChainHead) ibftEvent;
ibftController.handleNewBlockEvent(newChainHead);
break;
case BLOCK_TIMER_EXPIRY:
final BlockTimerExpiry blockTimerExpiry = (BlockTimerExpiry) ibftEvent;
ibftController.handleBlockTimerExpiry(blockTimerExpiry);
break;
default:
throw new RuntimeException("Illegal event in queue.");
}
} catch (final Exception e) {
LOG.error("State machine threw exception while processing event {" + ibftEvent + "}", e);
}
}
}

@ -12,12 +12,7 @@
*/ */
package tech.pegasys.pantheon.consensus.ibft; package tech.pegasys.pantheon.consensus.ibft;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent; import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -30,39 +25,37 @@ import org.apache.logging.log4j.Logger;
/** Execution context for draining queued ibft events and applying them to a maintained state */ /** Execution context for draining queued ibft events and applying them to a maintained state */
public class IbftProcessor implements Runnable { public class IbftProcessor implements Runnable {
private static final Logger LOG = LogManager.getLogger(); private static final Logger LOG = LogManager.getLogger();
private final IbftEventQueue incomingQueue; private final IbftEventQueue incomingQueue;
private final ScheduledExecutorService roundTimerExecutor; private final ScheduledExecutorService roundTimerExecutor;
private volatile boolean shutdown = false; private volatile boolean shutdown = false;
private final IbftController ibftController; private final EventMultiplexer eventMultiplexer;
/** /**
* Construct a new IbftProcessor * Construct a new IbftProcessor
* *
* @param incomingQueue The event queue from which to drain new events * @param incomingQueue The event queue from which to drain new events
* @param ibftController an object capable of handling any/all IBFT events * @param eventMultiplexer an object capable of handling any/all IBFT events
*/ */
public IbftProcessor(final IbftEventQueue incomingQueue, final IbftController ibftController) { public IbftProcessor(
final IbftEventQueue incomingQueue, final EventMultiplexer eventMultiplexer) {
// Spawning the round timer with a single thread as we should never have more than 1 timer in // Spawning the round timer with a single thread as we should never have more than 1 timer in
// flight at a time // flight at a time
this(incomingQueue, ibftController, Executors.newSingleThreadScheduledExecutor()); this(incomingQueue, eventMultiplexer, Executors.newSingleThreadScheduledExecutor());
} }
@VisibleForTesting @VisibleForTesting
IbftProcessor( IbftProcessor(
final IbftEventQueue incomingQueue, final IbftEventQueue incomingQueue,
final IbftController ibftController, final EventMultiplexer eventMultiplexer,
final ScheduledExecutorService roundTimerExecutor) { final ScheduledExecutorService roundTimerExecutor) {
this.incomingQueue = incomingQueue; this.incomingQueue = incomingQueue;
this.ibftController = ibftController; this.eventMultiplexer = eventMultiplexer;
this.roundTimerExecutor = roundTimerExecutor; this.roundTimerExecutor = roundTimerExecutor;
} }
public void start() {
ibftController.start();
}
/** Indicate to the processor that it should gracefully stop at its next opportunity */ /** Indicate to the processor that it should gracefully stop at its next opportunity */
public void stop() { public void stop() {
shutdown = true; shutdown = true;
@ -71,39 +64,12 @@ public class IbftProcessor implements Runnable {
@Override @Override
public void run() { public void run() {
while (!shutdown) { while (!shutdown) {
nextIbftEvent().ifPresent(this::handleIbftEvent); nextIbftEvent().ifPresent(event -> eventMultiplexer.handleIbftEvent(event));
} }
// Clean up the executor service the round timer has been utilising // Clean up the executor service the round timer has been utilising
roundTimerExecutor.shutdownNow(); roundTimerExecutor.shutdownNow();
} }
private void handleIbftEvent(final IbftEvent ibftEvent) {
try {
switch (ibftEvent.getType()) {
case MESSAGE:
final IbftReceivedMessageEvent rxEvent = (IbftReceivedMessageEvent) ibftEvent;
ibftController.handleMessageEvent(rxEvent);
break;
case ROUND_EXPIRY:
final RoundExpiry roundExpiryEvent = (RoundExpiry) ibftEvent;
ibftController.handleRoundExpiry(roundExpiryEvent);
break;
case NEW_CHAIN_HEAD:
final NewChainHead newChainHead = (NewChainHead) ibftEvent;
ibftController.handleNewBlockEvent(newChainHead);
break;
case BLOCK_TIMER_EXPIRY:
final BlockTimerExpiry blockTimerExpiry = (BlockTimerExpiry) ibftEvent;
ibftController.handleBlockTimerExpiry(blockTimerExpiry);
break;
default:
throw new RuntimeException("Illegal event in queue.");
}
} catch (final Exception e) {
LOG.error("State machine threw exception while processing event {" + ibftEvent + "}", e);
}
}
private Optional<IbftEvent> nextIbftEvent() { private Optional<IbftEvent> nextIbftEvent() {
try { try {
return Optional.ofNullable(incomingQueue.poll(500, TimeUnit.MILLISECONDS)); return Optional.ofNullable(incomingQueue.poll(500, TimeUnit.MILLISECONDS));

@ -48,10 +48,7 @@ public class IbftMiningCoordinator implements MiningCoordinator, BlockAddedObser
} }
@Override @Override
public void enable() { public void enable() {}
ibftProcessor.start();
// IbftProcessor is implicitly running (but maybe should have a discard" all)
}
@Override @Override
public void disable() { public void disable() {

@ -23,7 +23,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry; import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -40,12 +39,12 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.StrictStubs.class) @RunWith(MockitoJUnitRunner.StrictStubs.class)
public class IbftProcessorTest { public class IbftProcessorTest {
private ScheduledExecutorService mockExecutorService; private ScheduledExecutorService mockExecutorService;
private IbftController mockIbftController; private EventMultiplexer mockeEventMultiplexer;
@Before @Before
public void initialise() { public void initialise() {
mockExecutorService = mock(ScheduledExecutorService.class); mockExecutorService = mock(ScheduledExecutorService.class);
mockIbftController = mock(IbftController.class); mockeEventMultiplexer = mock(EventMultiplexer.class);
} }
@Test @Test
@ -53,7 +52,7 @@ public class IbftProcessorTest {
final IbftEventQueue mockQueue = mock(IbftEventQueue.class); final IbftEventQueue mockQueue = mock(IbftEventQueue.class);
Mockito.when(mockQueue.poll(anyLong(), any())).thenReturn(null); Mockito.when(mockQueue.poll(anyLong(), any())).thenReturn(null);
final IbftProcessor processor = final IbftProcessor processor =
new IbftProcessor(mockQueue, mockIbftController, mockExecutorService); new IbftProcessor(mockQueue, mockeEventMultiplexer, mockExecutorService);
// Start the IbftProcessor // Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
@ -82,7 +81,7 @@ public class IbftProcessorTest {
@Test @Test
public void cleanupExecutorsAfterShutdownNow() throws InterruptedException { public void cleanupExecutorsAfterShutdownNow() throws InterruptedException {
final IbftProcessor processor = final IbftProcessor processor =
new IbftProcessor(new IbftEventQueue(), mockIbftController, mockExecutorService); new IbftProcessor(new IbftEventQueue(), mockeEventMultiplexer, mockExecutorService);
// Start the IbftProcessor // Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
@ -112,7 +111,7 @@ public class IbftProcessorTest {
Mockito.when(mockQueue.poll(anyLong(), any())).thenThrow(new InterruptedException()); Mockito.when(mockQueue.poll(anyLong(), any())).thenThrow(new InterruptedException());
final IbftProcessor processor = final IbftProcessor processor =
new IbftProcessor(mockQueue, mockIbftController, mockExecutorService); new IbftProcessor(mockQueue, mockeEventMultiplexer, mockExecutorService);
// Start the IbftProcessor // Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
@ -145,7 +144,7 @@ public class IbftProcessorTest {
public void drainEventsIntoStateMachine() throws InterruptedException { public void drainEventsIntoStateMachine() throws InterruptedException {
final IbftEventQueue queue = new IbftEventQueue(); final IbftEventQueue queue = new IbftEventQueue();
final IbftProcessor processor = final IbftProcessor processor =
new IbftProcessor(queue, mockIbftController, mockExecutorService); new IbftProcessor(queue, mockeEventMultiplexer, mockExecutorService);
// Start the IbftProcessor // Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
@ -161,6 +160,6 @@ public class IbftProcessorTest {
processor.stop(); processor.stop();
processorExecutor.shutdown(); processorExecutor.shutdown();
verify(mockIbftController, times(2)).handleRoundExpiry(eq(roundExpiryEvent)); verify(mockeEventMultiplexer, times(2)).handleIbftEvent(eq(roundExpiryEvent));
} }
} }

@ -59,7 +59,6 @@ public class IbftMiningCoordinatorTest {
@Test @Test
public void enablesMining() { public void enablesMining() {
ibftMiningCoordinator.enable(); ibftMiningCoordinator.enable();
verify(ibftProcessor).start();
} }
@Test @Test

@ -22,6 +22,7 @@ import tech.pegasys.pantheon.consensus.common.VoteProposer;
import tech.pegasys.pantheon.consensus.common.VoteTally; import tech.pegasys.pantheon.consensus.common.VoteTally;
import tech.pegasys.pantheon.consensus.common.VoteTallyUpdater; import tech.pegasys.pantheon.consensus.common.VoteTallyUpdater;
import tech.pegasys.pantheon.consensus.ibft.BlockTimer; import tech.pegasys.pantheon.consensus.ibft.BlockTimer;
import tech.pegasys.pantheon.consensus.ibft.EventMultiplexer;
import tech.pegasys.pantheon.consensus.ibft.IbftBlockInterface; import tech.pegasys.pantheon.consensus.ibft.IbftBlockInterface;
import tech.pegasys.pantheon.consensus.ibft.IbftContext; import tech.pegasys.pantheon.consensus.ibft.IbftContext;
import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue; import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
@ -236,8 +237,10 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
new IbftRoundFactory( new IbftRoundFactory(
finalState, protocolContext, protocolSchedule, minedBlockObservers), finalState, protocolContext, protocolSchedule, minedBlockObservers),
messageValidatorFactory)); messageValidatorFactory));
ibftController.start();
final IbftProcessor ibftProcessor = new IbftProcessor(ibftEventQueue, ibftController); final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController);
final IbftProcessor ibftProcessor = new IbftProcessor(ibftEventQueue, eventMultiplexer);
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
processorExecutor.submit(ibftProcessor); processorExecutor.submit(ibftProcessor);

Loading…
Cancel
Save