From 605a6835a1a56f81265371ea8c2d1fe78ec0b896 Mon Sep 17 00:00:00 2001 From: mbaxter Date: Tue, 29 Oct 2019 10:30:10 -0400 Subject: [PATCH] [Refactor] Don't generate shutdown tasks in controller (#141) Signed-off-by: Meredith Baxter --- .../java/org/hyperledger/besu/Runner.java | 1 + .../besu/controller/BesuController.java | 25 ++++- .../controller/BesuControllerBuilder.java | 53 ++--------- .../worldstate/PrunerIntegrationTest.java | 13 +-- .../besu/ethereum/core/Synchronizer.java | 2 + .../besu/ethereum/worldstate/Pruner.java | 93 ++++++++++++++----- .../besu/ethereum/worldstate/PrunerTest.java | 42 ++++++--- .../eth/sync/DefaultSynchronizer.java | 8 ++ 8 files changed, 143 insertions(+), 94 deletions(-) diff --git a/besu/src/main/java/org/hyperledger/besu/Runner.java b/besu/src/main/java/org/hyperledger/besu/Runner.java index bb2b66a1f3..d7229c5661 100644 --- a/besu/src/main/java/org/hyperledger/besu/Runner.java +++ b/besu/src/main/java/org/hyperledger/besu/Runner.java @@ -113,6 +113,7 @@ public class Runner implements AutoCloseable { waitForServiceToStop("Mining Coordinator", besuController.getMiningCoordinator()::awaitStop); if (networkRunner.getNetwork().isP2pEnabled()) { besuController.getSynchronizer().stop(); + waitForServiceToStop("Synchronizer", besuController.getSynchronizer()::awaitStop); } networkRunner.stop(); diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuController.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuController.java index d0520f2743..0cb356ed87 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuController.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuController.java @@ -31,11 +31,18 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration; +import java.io.Closeable; +import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + public class BesuController implements java.io.Closeable { + private static final Logger LOG = LogManager.getLogger(); public static final String DATABASE_PATH = "database"; private final ProtocolSchedule protocolSchedule; @@ -50,7 +57,7 @@ public class BesuController implements java.io.Closeable { private final TransactionPool transactionPool; private final MiningCoordinator miningCoordinator; private final PrivacyParameters privacyParameters; - private final Runnable close; + private final List closeables; private final SyncState syncState; BesuController( @@ -64,9 +71,9 @@ public class BesuController implements java.io.Closeable { final TransactionPool transactionPool, final MiningCoordinator miningCoordinator, final PrivacyParameters privacyParameters, - final Runnable close, final JsonRpcMethodFactory additionalJsonRpcMethodsFactory, - final KeyPair keyPair) { + final KeyPair keyPair, + final List closeables) { this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethProtocolManager = ethProtocolManager; @@ -79,7 +86,7 @@ public class BesuController implements java.io.Closeable { this.transactionPool = transactionPool; this.miningCoordinator = miningCoordinator; this.privacyParameters = privacyParameters; - this.close = close; + this.closeables = closeables; } public ProtocolContext getProtocolContext() { @@ -120,7 +127,15 @@ public class BesuController implements java.io.Closeable { @Override public void close() { - close.run(); + closeables.forEach(this::tryClose); + } + + private void tryClose(final Closeable closeable) { + try { + closeable.close(); + } catch (IOException e) { + LOG.error("Unable to close resource.", e); + } } public PrivacyParameters getPrivacyParameters() { diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index b4baacbf6c..6437b759ff 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -52,6 +52,7 @@ import org.hyperledger.besu.ethereum.worldstate.PruningConfiguration; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import org.hyperledger.besu.metrics.ObservableMetricsSystem; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.math.BigInteger; @@ -63,16 +64,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; -import java.util.concurrent.Executors; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public abstract class BesuControllerBuilder { - - private static final Logger LOG = LogManager.getLogger(); - protected GenesisConfigFile genesisConfig; SynchronizerConfiguration syncConfig; EthProtocolConfiguration ethereumWireProtocolConfiguration; @@ -87,7 +80,6 @@ public abstract class BesuControllerBuilder { protected boolean isRevertReasonEnabled; GasLimitCalculator gasLimitCalculator; private StorageProvider storageProvider; - private final List shutdownActions = new ArrayList<>(); private boolean isPruningEnabled; private PruningConfiguration pruningConfiguration; Map genesisConfigOverrides; @@ -238,27 +230,9 @@ public abstract class BesuControllerBuilder { storageProvider.createPruningStorage(), metricsSystem), blockchain, - Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder() - .setDaemon(true) - .setPriority(Thread.MIN_PRIORITY) - .setNameFormat("StatePruning-%d") - .build()), pruningConfiguration)); } - final Optional finalMaybePruner = maybePruner; - addShutdownAction( - () -> - finalMaybePruner.ifPresent( - pruner -> { - try { - pruner.stop(); - } catch (final InterruptedException ie) { - throw new RuntimeException(ie); - } - })); - final boolean fastSyncEnabled = syncConfig.getSyncMode().equals(SyncMode.FAST); final EthProtocolManager ethProtocolManager = createEthProtocolManager( @@ -304,6 +278,13 @@ public abstract class BesuControllerBuilder { final JsonRpcMethodFactory additionalJsonRpcMethodFactory = createAdditionalJsonRpcMethodFactory(protocolContext); + + List closeables = new ArrayList<>(); + closeables.add(storageProvider); + if (privacyParameters.getPrivateStorageProvider() != null) { + closeables.add(privacyParameters.getPrivateStorageProvider()); + } + return new BesuController<>( protocolSchedule, protocolContext, @@ -315,19 +296,9 @@ public abstract class BesuControllerBuilder { transactionPool, miningCoordinator, privacyParameters, - () -> { - shutdownActions.forEach(Runnable::run); - try { - storageProvider.close(); - if (privacyParameters.getPrivateStorageProvider() != null) { - privacyParameters.getPrivateStorageProvider().close(); - } - } catch (final IOException e) { - LOG.error("Failed to close storage provider", e); - } - }, additionalJsonRpcMethodFactory, - nodeKeys); + nodeKeys, + closeables); } protected void prepForBuild() {} @@ -342,10 +313,6 @@ public abstract class BesuControllerBuilder { return new SubProtocolConfiguration().withSubProtocol(EthProtocol.get(), ethProtocolManager); } - final void addShutdownAction(final Runnable action) { - shutdownActions.add(action); - } - protected abstract MiningCoordinator createMiningCoordinator( ProtocolSchedule protocolSchedule, ProtocolContext protocolContext, diff --git a/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java b/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java index 053a4df3a3..92211163c3 100644 --- a/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java +++ b/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java @@ -31,6 +31,7 @@ import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage; import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStatePreimageKeyValueStorage; import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie; import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie; +import org.hyperledger.besu.ethereum.worldstate.Pruner.PruningPhase; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; import org.hyperledger.besu.testutil.MockExecutorService; @@ -107,8 +108,8 @@ public class PrunerIntegrationTest { new Pruner( markSweepPruner, blockchain, - new MockExecutorService(), - new PruningConfiguration(blockConfirmations, numBlocksToKeep)); + new PruningConfiguration(blockConfirmations, numBlocksToKeep), + MockExecutorService::new); pruner.start(); @@ -119,13 +120,9 @@ public class PrunerIntegrationTest { var fullyMarkedBlockNum = cycle * numBlockInCycle + 1; // This should cause a full mark and sweep cycle - assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE); + assertThat(pruner.getPruningPhase()).isEqualByComparingTo(PruningPhase.IDLE); generateBlockchainData(numBlockInCycle, accountsPerBlock); - assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE); - - // Restarting the Pruner shouldn't matter since we're idle - pruner.stop(); - pruner.start(); + assertThat(pruner.getPruningPhase()).isEqualByComparingTo(PruningPhase.IDLE); // Collect the nodes we expect to keep final Set expectedNodes = new HashSet<>(); diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java index 7e035ecec5..e74a71e5f1 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java @@ -29,6 +29,8 @@ public interface Synchronizer { void stop(); + void awaitStop() throws InterruptedException; + /** * @return the status, based on SyncingResult When actively synchronizing blocks, alternatively * empty diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java index aebd198830..17460d34d0 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java @@ -22,34 +22,43 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Hash; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class Pruner { + private static final Logger LOG = LogManager.getLogger(); private final MarkSweepPruner pruningStrategy; private final Blockchain blockchain; - private final ExecutorService executorService; private Long blockAddedObserverId; private final long blocksRetained; - private final AtomicReference state = new AtomicReference<>(State.IDLE); + private final AtomicReference pruningPhase = + new AtomicReference<>(PruningPhase.IDLE); private volatile long markBlockNumber = 0; private volatile BlockHeader markedBlockHeader; private long blockConfirmations; - public Pruner( + private AtomicReference state = new AtomicReference<>(State.IDLE); + private final Supplier executorServiceSupplier; + private ExecutorService executorService; + + @VisibleForTesting + Pruner( final MarkSweepPruner pruningStrategy, final Blockchain blockchain, - final ExecutorService executorService, - final PruningConfiguration pruningConfiguration) { + final PruningConfiguration pruningConfiguration, + final Supplier executorServiceSupplier) { this.pruningStrategy = pruningStrategy; - this.executorService = executorService; this.blockchain = blockchain; + this.executorServiceSupplier = executorServiceSupplier; this.blocksRetained = pruningConfiguration.getBlocksRetained(); this.blockConfirmations = pruningConfiguration.getBlockConfirmations(); checkArgument( @@ -57,17 +66,47 @@ public class Pruner { "blockConfirmations and blocksRetained must be non-negative. blockConfirmations must be less than blockRetained."); } + public Pruner( + final MarkSweepPruner pruningStrategy, + final Blockchain blockchain, + final PruningConfiguration pruningConfiguration) { + this(pruningStrategy, blockchain, pruningConfiguration, getDefaultExecutorSupplier()); + } + + private static Supplier getDefaultExecutorSupplier() { + return () -> + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MIN_PRIORITY) + .setNameFormat("StatePruning-%d") + .build()); + } + public void start() { - LOG.info("Starting Pruner."); - pruningStrategy.prepare(); - blockAddedObserverId = - blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event)); + + if (state.compareAndSet(State.IDLE, State.RUNNING)) { + LOG.info("Starting Pruner."); + executorService = executorServiceSupplier.get(); + pruningStrategy.prepare(); + blockAddedObserverId = + blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event)); + } } - public void stop() throws InterruptedException { - pruningStrategy.cleanup(); - blockchain.removeObserver(blockAddedObserverId); - executorService.awaitTermination(10, TimeUnit.SECONDS); + public void stop() { + if (state.compareAndSet(State.RUNNING, State.STOPPED)) { + LOG.info("Stopping Pruner."); + pruningStrategy.cleanup(); + blockchain.removeObserver(blockAddedObserverId); + executorService.shutdownNow(); + } + } + + public void awaitStop() throws InterruptedException { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.error("Failed to shutdown Pruner executor service."); + } } private void handleNewBlock(final BlockAddedEvent event) { @@ -76,15 +115,17 @@ public class Pruner { } final long blockNumber = event.getBlock().getHeader().getNumber(); - if (state.compareAndSet(State.IDLE, State.MARK_BLOCK_CONFIRMATIONS_AWAITING)) { + if (pruningPhase.compareAndSet( + PruningPhase.IDLE, PruningPhase.MARK_BLOCK_CONFIRMATIONS_AWAITING)) { markBlockNumber = blockNumber; } else if (blockNumber >= markBlockNumber + blockConfirmations - && state.compareAndSet(State.MARK_BLOCK_CONFIRMATIONS_AWAITING, State.MARKING)) { + && pruningPhase.compareAndSet( + PruningPhase.MARK_BLOCK_CONFIRMATIONS_AWAITING, PruningPhase.MARKING)) { markedBlockHeader = blockchain.getBlockHeader(markBlockNumber).get(); mark(markedBlockHeader); } else if (blockNumber >= markBlockNumber + blocksRetained && blockchain.blockIsOnCanonicalChain(markedBlockHeader.getHash()) - && state.compareAndSet(State.MARKING_COMPLETE, State.SWEEPING)) { + && pruningPhase.compareAndSet(PruningPhase.MARKING_COMPLETE, PruningPhase.SWEEPING)) { sweep(); } } @@ -98,7 +139,7 @@ public class Pruner { execute( () -> { pruningStrategy.mark(stateRoot); - state.compareAndSet(State.MARKING, State.MARKING_COMPLETE); + pruningPhase.compareAndSet(PruningPhase.MARKING, PruningPhase.MARKING_COMPLETE); }); } @@ -110,7 +151,7 @@ public class Pruner { execute( () -> { pruningStrategy.sweepBefore(markBlockNumber); - state.compareAndSet(State.SWEEPING, State.IDLE); + pruningPhase.compareAndSet(PruningPhase.SWEEPING, PruningPhase.IDLE); }); } @@ -120,20 +161,26 @@ public class Pruner { } catch (final Throwable t) { LOG.error("Pruning failed", t); pruningStrategy.cleanup(); - state.set(State.IDLE); + pruningPhase.set(PruningPhase.IDLE); } } @VisibleForTesting - State getState() { - return state.get(); + PruningPhase getPruningPhase() { + return pruningPhase.get(); } - enum State { + enum PruningPhase { IDLE, MARK_BLOCK_CONFIRMATIONS_AWAITING, MARKING, MARKING_COMPLETE, SWEEPING; } + + private enum State { + IDLE, + RUNNING, + STOPPED + } } diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java index b60fd54348..d155d06e66 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java @@ -37,6 +37,7 @@ import org.hyperledger.besu.testutil.MockExecutorService; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import org.junit.Test; import org.junit.runner.RunWith; @@ -52,11 +53,12 @@ public class PrunerTest { @Mock private MarkSweepPruner markSweepPruner; private final ExecutorService mockExecutorService = new MockExecutorService(); + private final Supplier mockExecutorServiceSupplier = () -> mockExecutorService; private final Block genesisBlock = gen.genesisBlock(); @Test - public void shouldMarkCorrectBlockAndSweep() throws InterruptedException { + public void shouldMarkCorrectBlockAndSweep() { final BlockchainStorage blockchainStorage = new KeyValueStoragePrefixedKeyBlockchainStorage( new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()); @@ -65,7 +67,10 @@ public class PrunerTest { final Pruner pruner = new Pruner( - markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1)); + markSweepPruner, + blockchain, + new PruningConfiguration(0, 1), + mockExecutorServiceSupplier); pruner.start(); final Block block1 = appendBlockWithParent(blockchain, genesisBlock); @@ -78,8 +83,7 @@ public class PrunerTest { } @Test - public void shouldOnlySweepAfterBlockConfirmationPeriodAndRetentionPeriodEnds() - throws InterruptedException { + public void shouldOnlySweepAfterBlockConfirmationPeriodAndRetentionPeriodEnds() { final BlockchainStorage blockchainStorage = new KeyValueStoragePrefixedKeyBlockchainStorage( new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()); @@ -88,7 +92,10 @@ public class PrunerTest { final Pruner pruner = new Pruner( - markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(1, 2)); + markSweepPruner, + blockchain, + new PruningConfiguration(1, 2), + mockExecutorServiceSupplier); pruner.start(); final Hash markBlockStateRootHash = @@ -106,8 +113,7 @@ public class PrunerTest { } @Test - public void abortsPruningWhenFullyMarkedBlockNoLongerOnCanonicalChain() - throws InterruptedException { + public void abortsPruningWhenFullyMarkedBlockNoLongerOnCanonicalChain() { final BlockchainStorage blockchainStorage = new KeyValueStoragePrefixedKeyBlockchainStorage( new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()); @@ -117,7 +123,10 @@ public class PrunerTest { // start pruner so it can start handling block added events final Pruner pruner = new Pruner( - markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1)); + markSweepPruner, + blockchain, + new PruningConfiguration(0, 1), + mockExecutorServiceSupplier); pruner.start(); /* @@ -156,24 +165,24 @@ public class PrunerTest { new Pruner( markSweepPruner, mockchain, - mockExecutorService, - new PruningConfiguration(-1, -2))) + new PruningConfiguration(-1, -2), + mockExecutorServiceSupplier)) .isInstanceOf(IllegalArgumentException.class); assertThatThrownBy( () -> new Pruner( markSweepPruner, mockchain, - mockExecutorService, - new PruningConfiguration(10, 8))) + new PruningConfiguration(10, 8), + mockExecutorServiceSupplier)) .isInstanceOf(IllegalArgumentException.class); assertThatThrownBy( () -> new Pruner( markSweepPruner, mockchain, - mockExecutorService, - new PruningConfiguration(10, 10))) + new PruningConfiguration(10, 10), + mockExecutorServiceSupplier)) .isInstanceOf(IllegalArgumentException.class); } @@ -187,7 +196,10 @@ public class PrunerTest { final Pruner pruner = new Pruner( - markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1)); + markSweepPruner, + blockchain, + new PruningConfiguration(0, 1), + mockExecutorServiceSupplier); pruner.start(); pruner.stop(); verify(markSweepPruner).cleanup(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 5d885cf577..77b48056f5 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -141,6 +141,14 @@ public class DefaultSynchronizer implements Synchronizer { LOG.info("Stopping synchronizer"); fastSyncDownloader.ifPresent(FastSyncDownloader::stop); fullSyncDownloader.stop(); + maybePruner.ifPresent(Pruner::stop); + } + } + + @Override + public void awaitStop() throws InterruptedException { + if (maybePruner.isPresent()) { + maybePruner.get().awaitStop(); } }