Ibft pantheon controller (#461)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Jason Frame 6 years ago committed by GitHub
parent 8913f46813
commit 1513e58d7d
  1. 82
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessor.java
  2. 40
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftStateMachine.java
  3. 50
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinator.java
  4. 15
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessorTest.java
  5. 108
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftMiningCoordinatorTest.java
  6. 8
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java
  7. 82
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java

@ -12,7 +12,12 @@
*/
package tech.pegasys.pantheon.consensus.ibft;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController;
import java.util.Optional;
import java.util.concurrent.Executors;
@ -25,45 +30,37 @@ import org.apache.logging.log4j.Logger;
/** Execution context for draining queued ibft events and applying them to a maintained state */
public class IbftProcessor implements Runnable {
private static final Logger LOG = LogManager.getLogger();
private final IbftEventQueue incomingQueue;
private final ScheduledExecutorService roundTimerExecutor;
private final RoundTimer roundTimer;
private final IbftStateMachine stateMachine;
private volatile boolean shutdown = false;
private final IbftController ibftController;
/**
* Construct a new IbftProcessor
*
* @param incomingQueue The event queue from which to drain new events
* @param baseRoundExpirySeconds The expiry time in milliseconds of round 0
* @param stateMachine an IbftStateMachine ready to process events and maintain state
* @param ibftController an object capable of handling any/all IBFT events
*/
public IbftProcessor(
final IbftEventQueue incomingQueue,
final int baseRoundExpirySeconds,
final IbftStateMachine stateMachine) {
public IbftProcessor(final IbftEventQueue incomingQueue, final IbftController ibftController) {
// Spawning the round timer with a single thread as we should never have more than 1 timer in
// flight at a time
this(
incomingQueue,
baseRoundExpirySeconds,
stateMachine,
Executors.newSingleThreadScheduledExecutor());
this(incomingQueue, ibftController, Executors.newSingleThreadScheduledExecutor());
}
@VisibleForTesting
IbftProcessor(
final IbftEventQueue incomingQueue,
final int baseRoundExpirySeconds,
final IbftStateMachine stateMachine,
final IbftController ibftController,
final ScheduledExecutorService roundTimerExecutor) {
this.incomingQueue = incomingQueue;
this.ibftController = ibftController;
this.roundTimerExecutor = roundTimerExecutor;
this.roundTimer = new RoundTimer(incomingQueue, baseRoundExpirySeconds, roundTimerExecutor);
this.stateMachine = stateMachine;
}
public void start() {
ibftController.start();
}
/** Indicate to the processor that it should gracefully stop at its next opportunity */
@ -74,25 +71,46 @@ public class IbftProcessor implements Runnable {
@Override
public void run() {
while (!shutdown) {
Optional<IbftEvent> newEvent = Optional.empty();
try {
newEvent = Optional.ofNullable(incomingQueue.poll(2, TimeUnit.SECONDS));
} catch (final InterruptedException interrupt) {
// If the queue was interrupted propagate it and spin to check our shutdown status
Thread.currentThread().interrupt();
nextIbftEvent().ifPresent(this::handleIbftEvent);
}
// Clean up the executor service the round timer has been utilising
roundTimerExecutor.shutdownNow();
}
newEvent.ifPresent(
ibftEvent -> {
private void handleIbftEvent(final IbftEvent ibftEvent) {
try {
stateMachine.processEvent(ibftEvent, roundTimer);
switch (ibftEvent.getType()) {
case MESSAGE:
final IbftReceivedMessageEvent rxEvent = (IbftReceivedMessageEvent) ibftEvent;
ibftController.handleMessageEvent(rxEvent);
break;
case ROUND_EXPIRY:
final RoundExpiry roundExpiryEvent = (RoundExpiry) ibftEvent;
ibftController.handleRoundExpiry(roundExpiryEvent);
break;
case NEW_CHAIN_HEAD:
final NewChainHead newChainHead = (NewChainHead) ibftEvent;
ibftController.handleNewBlockEvent(newChainHead);
break;
case BLOCK_TIMER_EXPIRY:
final BlockTimerExpiry blockTimerExpiry = (BlockTimerExpiry) ibftEvent;
ibftController.handleBlockTimerExpiry(blockTimerExpiry);
break;
default:
throw new RuntimeException("Illegal event in queue.");
}
} catch (final Exception e) {
LOG.error(
"State machine threw exception while processing event {" + ibftEvent + "}", e);
LOG.error("State machine threw exception while processing event {" + ibftEvent + "}", e);
}
});
}
// Clean up the executor service the round timer has been utilising
roundTimerExecutor.shutdownNow();
private Optional<IbftEvent> nextIbftEvent() {
try {
return Optional.ofNullable(incomingQueue.poll(500, TimeUnit.MILLISECONDS));
} catch (final InterruptedException interrupt) {
// If the queue was interrupted propagate it and spin to check our shutdown status
Thread.currentThread().interrupt();
return Optional.empty();
}
}
}

@ -1,40 +0,0 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreatorFactory;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent;
/** Stateful evaluator for ibft events */
public class IbftStateMachine {
private final IbftBlockCreatorFactory blockCreatorFactory;
public IbftStateMachine(final IbftBlockCreatorFactory blockCreatorFactory) {
this.blockCreatorFactory = blockCreatorFactory;
}
/**
* Attempt to consume the event and update the maintained state
*
* @param event the external action that has occurred
* @param roundTimer timer that will fire expiry events that are expected to be received back into
* this machine
* @return whether this event was consumed or requires reprocessing later once the state machine
* catches up
*/
public boolean processEvent(final IbftEvent event, final RoundTimer roundTimer) {
// TODO: don't just discard the event, do some logic
return true;
}
}

@ -12,23 +12,51 @@
*/
package tech.pegasys.pantheon.consensus.ibft.blockcreation;
import static org.apache.logging.log4j.LogManager.getLogger;
import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
import tech.pegasys.pantheon.consensus.ibft.IbftProcessor;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
import tech.pegasys.pantheon.ethereum.blockcreation.MiningCoordinator;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedObserver;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.util.bytes.BytesValue;
public class IbftMiningCoordinator implements MiningCoordinator {
import org.apache.logging.log4j.Logger;
public class IbftMiningCoordinator implements MiningCoordinator, BlockAddedObserver {
private final IbftBlockCreatorFactory blockCreatorFactory;
private static final Logger LOG = getLogger();
protected final Blockchain blockchain;
private final IbftEventQueue eventQueue;
private final IbftProcessor ibftProcessor;
public IbftMiningCoordinator(final IbftBlockCreatorFactory blockCreatorFactory) {
public IbftMiningCoordinator(
final IbftProcessor ibftProcessor,
final IbftBlockCreatorFactory blockCreatorFactory,
final Blockchain blockchain,
final IbftEventQueue eventQueue) {
this.ibftProcessor = ibftProcessor;
this.blockCreatorFactory = blockCreatorFactory;
this.eventQueue = eventQueue;
this.blockchain = blockchain;
this.blockchain.observeBlockAdded(this);
}
@Override
public void enable() {}
public void enable() {
ibftProcessor.start();
// IbftProcessor is implicitly running (but maybe should have a discard" all)
}
@Override
public void disable() {}
public void disable() {
ibftProcessor.stop();
}
@Override
public boolean isRunning() {
@ -36,15 +64,25 @@ public class IbftMiningCoordinator implements MiningCoordinator {
}
@Override
public void setMinTransactionGasPrice(final Wei minGasPrice) {}
public void setMinTransactionGasPrice(final Wei minGasPrice) {
blockCreatorFactory.setMinTransactionGasPrice(minGasPrice);
}
@Override
public Wei getMinTransactionGasPrice() {
return null;
return blockCreatorFactory.getMinTransactionGasPrice();
}
@Override
public void setExtraData(final BytesValue extraData) {
blockCreatorFactory.setExtraData(extraData);
}
@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) {
LOG.info("New canonical head detected. {} ", event.isNewCanonicalHead());
if (event.isNewCanonicalHead()) {
eventQueue.add(new NewChainHead(event.getBlock().getHeader()));
}
}
}

@ -23,6 +23,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -39,12 +40,12 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class IbftProcessorTest {
private ScheduledExecutorService mockExecutorService;
private IbftStateMachine mockStateMachine;
private IbftController mockIbftController;
@Before
public void initialise() {
mockExecutorService = mock(ScheduledExecutorService.class);
mockStateMachine = mock(IbftStateMachine.class);
mockIbftController = mock(IbftController.class);
}
@Test
@ -52,7 +53,7 @@ public class IbftProcessorTest {
final IbftEventQueue mockQueue = mock(IbftEventQueue.class);
Mockito.when(mockQueue.poll(anyLong(), any())).thenReturn(null);
final IbftProcessor processor =
new IbftProcessor(mockQueue, 1, mockStateMachine, mockExecutorService);
new IbftProcessor(mockQueue, mockIbftController, mockExecutorService);
// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
@ -81,7 +82,7 @@ public class IbftProcessorTest {
@Test
public void cleanupExecutorsAfterShutdownNow() throws InterruptedException {
final IbftProcessor processor =
new IbftProcessor(new IbftEventQueue(), 1, mockStateMachine, mockExecutorService);
new IbftProcessor(new IbftEventQueue(), mockIbftController, mockExecutorService);
// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
@ -111,7 +112,7 @@ public class IbftProcessorTest {
Mockito.when(mockQueue.poll(anyLong(), any())).thenThrow(new InterruptedException());
final IbftProcessor processor =
new IbftProcessor(mockQueue, 1, mockStateMachine, mockExecutorService);
new IbftProcessor(mockQueue, mockIbftController, mockExecutorService);
// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
@ -144,7 +145,7 @@ public class IbftProcessorTest {
public void drainEventsIntoStateMachine() throws InterruptedException {
final IbftEventQueue queue = new IbftEventQueue();
final IbftProcessor processor =
new IbftProcessor(queue, 1, mockStateMachine, mockExecutorService);
new IbftProcessor(queue, mockIbftController, mockExecutorService);
// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
@ -160,6 +161,6 @@ public class IbftProcessorTest {
processor.stop();
processorExecutor.shutdown();
verify(mockStateMachine, times(2)).processEvent(eq(roundExpiryEvent), any());
verify(mockIbftController, times(2)).handleRoundExpiry(eq(roundExpiryEvent));
}
}

@ -0,0 +1,108 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft.blockcreation;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
import tech.pegasys.pantheon.consensus.ibft.IbftProcessor;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockBody;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.concurrent.TimeUnit;
import org.assertj.core.util.Lists;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class IbftMiningCoordinatorTest {
@Mock private IbftProcessor ibftProcessor;
@Mock private IbftBlockCreatorFactory ibftBlockCreatorFactory;
@Mock private Blockchain blockChain;
@Mock private Block block;
@Mock private BlockBody blockBody;
@Mock private BlockHeader blockHeader;
private final IbftEventQueue eventQueue = new IbftEventQueue();
private IbftMiningCoordinator ibftMiningCoordinator;
@Before
public void setup() {
ibftMiningCoordinator =
new IbftMiningCoordinator(ibftProcessor, ibftBlockCreatorFactory, blockChain, eventQueue);
when(block.getBody()).thenReturn(blockBody);
when(block.getHeader()).thenReturn(blockHeader);
when(blockBody.getTransactions()).thenReturn(Lists.emptyList());
}
@Test
public void enablesMining() {
ibftMiningCoordinator.enable();
verify(ibftProcessor).start();
}
@Test
public void disablesMining() {
ibftMiningCoordinator.disable();
verify(ibftProcessor).stop();
}
@Test
public void setsMinTransactionGasPrice() {
final Wei minGasPrice = Wei.of(10);
ibftMiningCoordinator.setMinTransactionGasPrice(minGasPrice);
verify(ibftBlockCreatorFactory).setMinTransactionGasPrice(minGasPrice);
}
@Test
public void getsMinTransactionGasPrice() {
final Wei minGasPrice = Wei.of(10);
when(ibftBlockCreatorFactory.getMinTransactionGasPrice()).thenReturn(minGasPrice);
assertThat(ibftMiningCoordinator.getMinTransactionGasPrice()).isEqualTo(minGasPrice);
}
@Test
public void setsTheExtraData() {
final BytesValue extraData = BytesValue.fromHexStringLenient("0x1234");
ibftMiningCoordinator.setExtraData(extraData);
verify(ibftBlockCreatorFactory).setExtraData(extraData);
}
@Test
public void addsNewChainHeadEventWhenNewCanonicalHeadBlockEventReceived() throws Exception {
BlockAddedEvent headAdvancement = BlockAddedEvent.createForHeadAdvancement(block);
ibftMiningCoordinator.onBlockAdded(headAdvancement, blockChain);
assertThat(eventQueue.size()).isEqualTo(1);
final NewChainHead ibftEvent = (NewChainHead) eventQueue.poll(1, TimeUnit.SECONDS);
assertThat(ibftEvent.getNewChainHeadHeader()).isEqualTo(blockHeader);
}
@Test
public void doesntAddNewChainHeadEventWhenNotACanonicalHeadBlockEvent() {
final BlockAddedEvent fork = BlockAddedEvent.createForFork(block);
ibftMiningCoordinator.onBlockAdded(fork, blockChain);
assertThat(eventQueue.isEmpty()).isTrue();
}
}

@ -77,7 +77,7 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class IbftBlockHeightManagerTest {
private KeyPair localNodeKeys = KeyPair.generate();
private final KeyPair localNodeKeys = KeyPair.generate();
private final MessageFactory messageFactory = new MessageFactory(localNodeKeys);
private final BlockHeaderTestFixture headerTestFixture = new BlockHeaderTestFixture();
@ -96,11 +96,11 @@ public class IbftBlockHeightManagerTest {
@Captor private ArgumentCaptor<Optional<PreparedCertificate>> preparedCaptor;
private final List<KeyPair> validatorKeys = Lists.newArrayList();
private List<Address> validators = Lists.newArrayList();
private List<MessageFactory> validatorMessageFactory = Lists.newArrayList();
private final List<Address> validators = Lists.newArrayList();
private final List<MessageFactory> validatorMessageFactory = Lists.newArrayList();
private ProtocolContext<IbftContext> protocolContext;
private ConsensusRoundIdentifier roundIdentifier = new ConsensusRoundIdentifier(1, 0);
private final ConsensusRoundIdentifier roundIdentifier = new ConsensusRoundIdentifier(1, 0);
private Block createdBlock;
private void buildCreatedBlock() {

@ -16,23 +16,32 @@ import static org.apache.logging.log4j.LogManager.getLogger;
import tech.pegasys.pantheon.config.GenesisConfigFile;
import tech.pegasys.pantheon.config.IbftConfigOptions;
import tech.pegasys.pantheon.consensus.common.BlockInterface;
import tech.pegasys.pantheon.consensus.common.EpochManager;
import tech.pegasys.pantheon.consensus.common.VoteProposer;
import tech.pegasys.pantheon.consensus.common.VoteTally;
import tech.pegasys.pantheon.consensus.common.VoteTallyUpdater;
import tech.pegasys.pantheon.consensus.ibft.IbftChainObserver;
import tech.pegasys.pantheon.consensus.ibft.BlockTimer;
import tech.pegasys.pantheon.consensus.ibft.IbftBlockHeaderValidationRulesetFactory;
import tech.pegasys.pantheon.consensus.ibft.IbftBlockInterface;
import tech.pegasys.pantheon.consensus.ibft.IbftContext;
import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
import tech.pegasys.pantheon.consensus.ibft.IbftProcessor;
import tech.pegasys.pantheon.consensus.ibft.IbftStateMachine;
import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreatorFactory;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftMiningCoordinator;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.ProposerSelector;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory;
import tech.pegasys.pantheon.consensus.ibft.jsonrpc.IbftJsonRpcMethodsFactory;
import tech.pegasys.pantheon.consensus.ibft.network.IbftNetworkPeers;
import tech.pegasys.pantheon.consensus.ibft.protocol.IbftProtocolManager;
import tech.pegasys.pantheon.consensus.ibft.protocol.IbftSubProtocol;
import tech.pegasys.pantheon.consensus.ibftlegacy.IbftLegacyBlockInterface;
import tech.pegasys.pantheon.consensus.ibftlegacy.IbftProtocolSchedule;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftBlockHeightManagerFactory;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftFinalState;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftRoundFactory;
import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidatorFactory;
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.blockcreation.MiningCoordinator;
@ -55,6 +64,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPoolFactory;
import tech.pegasys.pantheon.ethereum.jsonrpc.RpcApi;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager;
import tech.pegasys.pantheon.ethereum.p2p.config.SubProtocolConfiguration;
@ -65,6 +75,7 @@ import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@ -131,12 +142,13 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
final WorldStateArchive worldStateArchive = new WorldStateArchive(worldStateStorage);
genesisState.writeStateTo(worldStateArchive.getMutable(Hash.EMPTY_TRIE_HASH));
final IbftConfigOptions ibftConfig = genesisConfig.getConfigOptions().getIbftConfigOptions();
final IbftConfigOptions ibftConfig =
genesisConfig.getConfigOptions().getRevisedIbftConfigOptions();
final EpochManager epochManager = new EpochManager(ibftConfig.getEpochLength());
final BlockInterface blockInterface = new IbftBlockInterface();
final VoteTally voteTally =
new VoteTallyUpdater(epochManager, new IbftLegacyBlockInterface())
.buildVoteTallyFromBlockchain(blockchain);
new VoteTallyUpdater(epochManager, blockInterface).buildVoteTallyFromBlockchain(blockchain);
final VoteProposer voteProposer = new VoteProposer();
@ -174,8 +186,6 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
final IbftEventQueue ibftEventQueue = new IbftEventQueue();
blockchain.observeBlockAdded(new IbftChainObserver(ibftEventQueue));
final IbftBlockCreatorFactory blockCreatorFactory =
new IbftBlockCreatorFactory(
(gasLimit) -> gasLimit,
@ -185,13 +195,56 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
miningParams,
Util.publicKeyToAddress(nodeKeys.getPublicKey()));
final IbftStateMachine ibftStateMachine = new IbftStateMachine(blockCreatorFactory);
final IbftProcessor ibftProcessor =
new IbftProcessor(ibftEventQueue, ibftConfig.getRequestTimeoutSeconds(), ibftStateMachine);
final ProposerSelector proposerSelector =
new ProposerSelector(blockchain, voteTally, blockInterface, true);
final IbftNetworkPeers peers =
new IbftNetworkPeers(protocolContext.getConsensusState().getVoteTally());
final BlockHeaderValidator<IbftContext> blockHeaderValidator =
IbftBlockHeaderValidationRulesetFactory.ibftProposedBlockValidator(
ibftConfig.getBlockPeriodSeconds());
final IbftFinalState finalState =
new IbftFinalState(
voteTally,
nodeKeys,
Util.publicKeyToAddress(nodeKeys.getPublicKey()),
proposerSelector,
peers,
new RoundTimer(
ibftEventQueue,
ibftConfig.getRequestTimeoutSeconds(),
Executors.newScheduledThreadPool(1)),
new BlockTimer(
ibftEventQueue,
ibftConfig.getBlockPeriodSeconds(),
Executors.newScheduledThreadPool(1),
Clock.systemUTC()),
blockCreatorFactory,
new MessageFactory(nodeKeys),
blockHeaderValidator,
Clock.systemUTC());
final MessageValidatorFactory messageValidatorFactory =
new MessageValidatorFactory(proposerSelector, blockHeaderValidator, protocolContext);
final IbftController ibftController =
new IbftController(
blockchain,
finalState,
new IbftBlockHeightManagerFactory(
finalState,
new IbftRoundFactory(finalState, protocolContext, protocolSchedule),
messageValidatorFactory,
protocolContext));
final IbftProcessor ibftProcessor = new IbftProcessor(ibftEventQueue, ibftController);
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
processorExecutor.submit(ibftProcessor);
final MiningCoordinator ibftMiningCoordinator = new IbftMiningCoordinator(blockCreatorFactory);
final MiningCoordinator ibftMiningCoordinator =
new IbftMiningCoordinator(ibftProcessor, blockCreatorFactory, blockchain, ibftEventQueue);
ibftMiningCoordinator.enable();
final Runnable closer =
() -> {
@ -210,9 +263,6 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
}
};
final IbftNetworkPeers peers =
new IbftNetworkPeers(protocolContext.getConsensusState().getVoteTally());
return new IbftPantheonController(
protocolSchedule,
protocolContext,

Loading…
Cancel
Save