[Refactor] Don't generate shutdown tasks in controller (#141)

Signed-off-by: Meredith Baxter <meredith.baxter@consensys.net>
pull/144/head
mbaxter 5 years ago committed by GitHub
parent 952454523c
commit 605a6835a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      besu/src/main/java/org/hyperledger/besu/Runner.java
  2. 25
      besu/src/main/java/org/hyperledger/besu/controller/BesuController.java
  3. 53
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  4. 13
      ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java
  5. 2
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java
  6. 81
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java
  7. 42
      ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java
  8. 8
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java

@ -113,6 +113,7 @@ public class Runner implements AutoCloseable {
waitForServiceToStop("Mining Coordinator", besuController.getMiningCoordinator()::awaitStop);
if (networkRunner.getNetwork().isP2pEnabled()) {
besuController.getSynchronizer().stop();
waitForServiceToStop("Synchronizer", besuController.getSynchronizer()::awaitStop);
}
networkRunner.stop();

@ -31,11 +31,18 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class BesuController<C> implements java.io.Closeable {
private static final Logger LOG = LogManager.getLogger();
public static final String DATABASE_PATH = "database";
private final ProtocolSchedule<C> protocolSchedule;
@ -50,7 +57,7 @@ public class BesuController<C> implements java.io.Closeable {
private final TransactionPool transactionPool;
private final MiningCoordinator miningCoordinator;
private final PrivacyParameters privacyParameters;
private final Runnable close;
private final List<Closeable> closeables;
private final SyncState syncState;
BesuController(
@ -64,9 +71,9 @@ public class BesuController<C> implements java.io.Closeable {
final TransactionPool transactionPool,
final MiningCoordinator miningCoordinator,
final PrivacyParameters privacyParameters,
final Runnable close,
final JsonRpcMethodFactory additionalJsonRpcMethodsFactory,
final KeyPair keyPair) {
final KeyPair keyPair,
final List<Closeable> closeables) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethProtocolManager = ethProtocolManager;
@ -79,7 +86,7 @@ public class BesuController<C> implements java.io.Closeable {
this.transactionPool = transactionPool;
this.miningCoordinator = miningCoordinator;
this.privacyParameters = privacyParameters;
this.close = close;
this.closeables = closeables;
}
public ProtocolContext<C> getProtocolContext() {
@ -120,7 +127,15 @@ public class BesuController<C> implements java.io.Closeable {
@Override
public void close() {
close.run();
closeables.forEach(this::tryClose);
}
private void tryClose(final Closeable closeable) {
try {
closeable.close();
} catch (IOException e) {
LOG.error("Unable to close resource.", e);
}
}
public PrivacyParameters getPrivacyParameters() {

@ -52,6 +52,7 @@ import org.hyperledger.besu.ethereum.worldstate.PruningConfiguration;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.metrics.ObservableMetricsSystem;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
@ -63,16 +64,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.Executors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public abstract class BesuControllerBuilder<C> {
private static final Logger LOG = LogManager.getLogger();
protected GenesisConfigFile genesisConfig;
SynchronizerConfiguration syncConfig;
EthProtocolConfiguration ethereumWireProtocolConfiguration;
@ -87,7 +80,6 @@ public abstract class BesuControllerBuilder<C> {
protected boolean isRevertReasonEnabled;
GasLimitCalculator gasLimitCalculator;
private StorageProvider storageProvider;
private final List<Runnable> shutdownActions = new ArrayList<>();
private boolean isPruningEnabled;
private PruningConfiguration pruningConfiguration;
Map<String, String> genesisConfigOverrides;
@ -238,27 +230,9 @@ public abstract class BesuControllerBuilder<C> {
storageProvider.createPruningStorage(),
metricsSystem),
blockchain,
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
.setNameFormat("StatePruning-%d")
.build()),
pruningConfiguration));
}
final Optional<Pruner> finalMaybePruner = maybePruner;
addShutdownAction(
() ->
finalMaybePruner.ifPresent(
pruner -> {
try {
pruner.stop();
} catch (final InterruptedException ie) {
throw new RuntimeException(ie);
}
}));
final boolean fastSyncEnabled = syncConfig.getSyncMode().equals(SyncMode.FAST);
final EthProtocolManager ethProtocolManager =
createEthProtocolManager(
@ -304,6 +278,13 @@ public abstract class BesuControllerBuilder<C> {
final JsonRpcMethodFactory additionalJsonRpcMethodFactory =
createAdditionalJsonRpcMethodFactory(protocolContext);
List<Closeable> closeables = new ArrayList<>();
closeables.add(storageProvider);
if (privacyParameters.getPrivateStorageProvider() != null) {
closeables.add(privacyParameters.getPrivateStorageProvider());
}
return new BesuController<>(
protocolSchedule,
protocolContext,
@ -315,19 +296,9 @@ public abstract class BesuControllerBuilder<C> {
transactionPool,
miningCoordinator,
privacyParameters,
() -> {
shutdownActions.forEach(Runnable::run);
try {
storageProvider.close();
if (privacyParameters.getPrivateStorageProvider() != null) {
privacyParameters.getPrivateStorageProvider().close();
}
} catch (final IOException e) {
LOG.error("Failed to close storage provider", e);
}
},
additionalJsonRpcMethodFactory,
nodeKeys);
nodeKeys,
closeables);
}
protected void prepForBuild() {}
@ -342,10 +313,6 @@ public abstract class BesuControllerBuilder<C> {
return new SubProtocolConfiguration().withSubProtocol(EthProtocol.get(), ethProtocolManager);
}
final void addShutdownAction(final Runnable action) {
shutdownActions.add(action);
}
protected abstract MiningCoordinator createMiningCoordinator(
ProtocolSchedule<C> protocolSchedule,
ProtocolContext<C> protocolContext,

@ -31,6 +31,7 @@ import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStatePreimageKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.Pruner.PruningPhase;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.MockExecutorService;
@ -107,8 +108,8 @@ public class PrunerIntegrationTest {
new Pruner(
markSweepPruner,
blockchain,
new MockExecutorService(),
new PruningConfiguration(blockConfirmations, numBlocksToKeep));
new PruningConfiguration(blockConfirmations, numBlocksToKeep),
MockExecutorService::new);
pruner.start();
@ -119,13 +120,9 @@ public class PrunerIntegrationTest {
var fullyMarkedBlockNum = cycle * numBlockInCycle + 1;
// This should cause a full mark and sweep cycle
assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE);
assertThat(pruner.getPruningPhase()).isEqualByComparingTo(PruningPhase.IDLE);
generateBlockchainData(numBlockInCycle, accountsPerBlock);
assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE);
// Restarting the Pruner shouldn't matter since we're idle
pruner.stop();
pruner.start();
assertThat(pruner.getPruningPhase()).isEqualByComparingTo(PruningPhase.IDLE);
// Collect the nodes we expect to keep
final Set<BytesValue> expectedNodes = new HashSet<>();

@ -29,6 +29,8 @@ public interface Synchronizer {
void stop();
void awaitStop() throws InterruptedException;
/**
* @return the status, based on SyncingResult When actively synchronizing blocks, alternatively
* empty

@ -22,34 +22,43 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class Pruner {
private static final Logger LOG = LogManager.getLogger();
private final MarkSweepPruner pruningStrategy;
private final Blockchain blockchain;
private final ExecutorService executorService;
private Long blockAddedObserverId;
private final long blocksRetained;
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
private final AtomicReference<PruningPhase> pruningPhase =
new AtomicReference<>(PruningPhase.IDLE);
private volatile long markBlockNumber = 0;
private volatile BlockHeader markedBlockHeader;
private long blockConfirmations;
public Pruner(
private AtomicReference<State> state = new AtomicReference<>(State.IDLE);
private final Supplier<ExecutorService> executorServiceSupplier;
private ExecutorService executorService;
@VisibleForTesting
Pruner(
final MarkSweepPruner pruningStrategy,
final Blockchain blockchain,
final ExecutorService executorService,
final PruningConfiguration pruningConfiguration) {
final PruningConfiguration pruningConfiguration,
final Supplier<ExecutorService> executorServiceSupplier) {
this.pruningStrategy = pruningStrategy;
this.executorService = executorService;
this.blockchain = blockchain;
this.executorServiceSupplier = executorServiceSupplier;
this.blocksRetained = pruningConfiguration.getBlocksRetained();
this.blockConfirmations = pruningConfiguration.getBlockConfirmations();
checkArgument(
@ -57,17 +66,47 @@ public class Pruner {
"blockConfirmations and blocksRetained must be non-negative. blockConfirmations must be less than blockRetained.");
}
public Pruner(
final MarkSweepPruner pruningStrategy,
final Blockchain blockchain,
final PruningConfiguration pruningConfiguration) {
this(pruningStrategy, blockchain, pruningConfiguration, getDefaultExecutorSupplier());
}
private static Supplier<ExecutorService> getDefaultExecutorSupplier() {
return () ->
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
.setNameFormat("StatePruning-%d")
.build());
}
public void start() {
if (state.compareAndSet(State.IDLE, State.RUNNING)) {
LOG.info("Starting Pruner.");
executorService = executorServiceSupplier.get();
pruningStrategy.prepare();
blockAddedObserverId =
blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event));
}
}
public void stop() throws InterruptedException {
public void stop() {
if (state.compareAndSet(State.RUNNING, State.STOPPED)) {
LOG.info("Stopping Pruner.");
pruningStrategy.cleanup();
blockchain.removeObserver(blockAddedObserverId);
executorService.awaitTermination(10, TimeUnit.SECONDS);
executorService.shutdownNow();
}
}
public void awaitStop() throws InterruptedException {
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.error("Failed to shutdown Pruner executor service.");
}
}
private void handleNewBlock(final BlockAddedEvent event) {
@ -76,15 +115,17 @@ public class Pruner {
}
final long blockNumber = event.getBlock().getHeader().getNumber();
if (state.compareAndSet(State.IDLE, State.MARK_BLOCK_CONFIRMATIONS_AWAITING)) {
if (pruningPhase.compareAndSet(
PruningPhase.IDLE, PruningPhase.MARK_BLOCK_CONFIRMATIONS_AWAITING)) {
markBlockNumber = blockNumber;
} else if (blockNumber >= markBlockNumber + blockConfirmations
&& state.compareAndSet(State.MARK_BLOCK_CONFIRMATIONS_AWAITING, State.MARKING)) {
&& pruningPhase.compareAndSet(
PruningPhase.MARK_BLOCK_CONFIRMATIONS_AWAITING, PruningPhase.MARKING)) {
markedBlockHeader = blockchain.getBlockHeader(markBlockNumber).get();
mark(markedBlockHeader);
} else if (blockNumber >= markBlockNumber + blocksRetained
&& blockchain.blockIsOnCanonicalChain(markedBlockHeader.getHash())
&& state.compareAndSet(State.MARKING_COMPLETE, State.SWEEPING)) {
&& pruningPhase.compareAndSet(PruningPhase.MARKING_COMPLETE, PruningPhase.SWEEPING)) {
sweep();
}
}
@ -98,7 +139,7 @@ public class Pruner {
execute(
() -> {
pruningStrategy.mark(stateRoot);
state.compareAndSet(State.MARKING, State.MARKING_COMPLETE);
pruningPhase.compareAndSet(PruningPhase.MARKING, PruningPhase.MARKING_COMPLETE);
});
}
@ -110,7 +151,7 @@ public class Pruner {
execute(
() -> {
pruningStrategy.sweepBefore(markBlockNumber);
state.compareAndSet(State.SWEEPING, State.IDLE);
pruningPhase.compareAndSet(PruningPhase.SWEEPING, PruningPhase.IDLE);
});
}
@ -120,20 +161,26 @@ public class Pruner {
} catch (final Throwable t) {
LOG.error("Pruning failed", t);
pruningStrategy.cleanup();
state.set(State.IDLE);
pruningPhase.set(PruningPhase.IDLE);
}
}
@VisibleForTesting
State getState() {
return state.get();
PruningPhase getPruningPhase() {
return pruningPhase.get();
}
enum State {
enum PruningPhase {
IDLE,
MARK_BLOCK_CONFIRMATIONS_AWAITING,
MARKING,
MARKING_COMPLETE,
SWEEPING;
}
private enum State {
IDLE,
RUNNING,
STOPPED
}
}

@ -37,6 +37,7 @@ import org.hyperledger.besu.testutil.MockExecutorService;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -52,11 +53,12 @@ public class PrunerTest {
@Mock private MarkSweepPruner markSweepPruner;
private final ExecutorService mockExecutorService = new MockExecutorService();
private final Supplier<ExecutorService> mockExecutorServiceSupplier = () -> mockExecutorService;
private final Block genesisBlock = gen.genesisBlock();
@Test
public void shouldMarkCorrectBlockAndSweep() throws InterruptedException {
public void shouldMarkCorrectBlockAndSweep() {
final BlockchainStorage blockchainStorage =
new KeyValueStoragePrefixedKeyBlockchainStorage(
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions());
@ -65,7 +67,10 @@ public class PrunerTest {
final Pruner pruner =
new Pruner(
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1));
markSweepPruner,
blockchain,
new PruningConfiguration(0, 1),
mockExecutorServiceSupplier);
pruner.start();
final Block block1 = appendBlockWithParent(blockchain, genesisBlock);
@ -78,8 +83,7 @@ public class PrunerTest {
}
@Test
public void shouldOnlySweepAfterBlockConfirmationPeriodAndRetentionPeriodEnds()
throws InterruptedException {
public void shouldOnlySweepAfterBlockConfirmationPeriodAndRetentionPeriodEnds() {
final BlockchainStorage blockchainStorage =
new KeyValueStoragePrefixedKeyBlockchainStorage(
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions());
@ -88,7 +92,10 @@ public class PrunerTest {
final Pruner pruner =
new Pruner(
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(1, 2));
markSweepPruner,
blockchain,
new PruningConfiguration(1, 2),
mockExecutorServiceSupplier);
pruner.start();
final Hash markBlockStateRootHash =
@ -106,8 +113,7 @@ public class PrunerTest {
}
@Test
public void abortsPruningWhenFullyMarkedBlockNoLongerOnCanonicalChain()
throws InterruptedException {
public void abortsPruningWhenFullyMarkedBlockNoLongerOnCanonicalChain() {
final BlockchainStorage blockchainStorage =
new KeyValueStoragePrefixedKeyBlockchainStorage(
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions());
@ -117,7 +123,10 @@ public class PrunerTest {
// start pruner so it can start handling block added events
final Pruner pruner =
new Pruner(
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1));
markSweepPruner,
blockchain,
new PruningConfiguration(0, 1),
mockExecutorServiceSupplier);
pruner.start();
/*
@ -156,24 +165,24 @@ public class PrunerTest {
new Pruner(
markSweepPruner,
mockchain,
mockExecutorService,
new PruningConfiguration(-1, -2)))
new PruningConfiguration(-1, -2),
mockExecutorServiceSupplier))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(
() ->
new Pruner(
markSweepPruner,
mockchain,
mockExecutorService,
new PruningConfiguration(10, 8)))
new PruningConfiguration(10, 8),
mockExecutorServiceSupplier))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(
() ->
new Pruner(
markSweepPruner,
mockchain,
mockExecutorService,
new PruningConfiguration(10, 10)))
new PruningConfiguration(10, 10),
mockExecutorServiceSupplier))
.isInstanceOf(IllegalArgumentException.class);
}
@ -187,7 +196,10 @@ public class PrunerTest {
final Pruner pruner =
new Pruner(
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1));
markSweepPruner,
blockchain,
new PruningConfiguration(0, 1),
mockExecutorServiceSupplier);
pruner.start();
pruner.stop();
verify(markSweepPruner).cleanup();

@ -141,6 +141,14 @@ public class DefaultSynchronizer<C> implements Synchronizer {
LOG.info("Stopping synchronizer");
fastSyncDownloader.ifPresent(FastSyncDownloader::stop);
fullSyncDownloader.stop();
maybePruner.ifPresent(Pruner::stop);
}
}
@Override
public void awaitStop() throws InterruptedException {
if (maybePruner.isPresent()) {
maybePruner.get().awaitStop();
}
}

Loading…
Cancel
Save