diff --git a/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java b/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java index 4008ef9b66..053a4df3a3 100644 --- a/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java +++ b/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java @@ -123,6 +123,10 @@ public class PrunerIntegrationTest { generateBlockchainData(numBlockInCycle, accountsPerBlock); assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE); + // Restarting the Pruner shouldn't matter since we're idle + pruner.stop(); + pruner.start(); + // Collect the nodes we expect to keep final Set expectedNodes = new HashSet<>(); for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) { diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java index 951e265de7..b240af7766 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java @@ -97,14 +97,12 @@ public class MarkSweepPruner { } public void prepare() { - worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case. - markStorage.clear(); - pendingMarks.clear(); - nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes); - } + // Optimization for the case where the previous cycle was interrupted (like the node was shut + // down). If the previous cycle was interrupted, there will be marks in the mark storage from + // last time, causing the first sweep to be smaller than it needs to be. + clearMarks(); - public void cleanup() { - worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); + nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes); } public void mark(final Hash rootHash) { @@ -151,9 +149,18 @@ public class MarkSweepPruner { // Sweep non-state-root nodes prunedNodeCount += worldStateStorage.prune(this::isMarked); sweptNodesCounter.inc(prunedNodeCount); + clearMarks(); + LOG.debug("Completed sweeping unused nodes"); + } + + public void cleanup() { worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); + clearMarks(); + } + + public void clearMarks() { markStorage.clear(); - LOG.debug("Completed sweeping unused nodes"); + pendingMarks.clear(); } private boolean isMarked(final Bytes32 key) { @@ -190,7 +197,14 @@ public class MarkSweepPruner { @VisibleForTesting void markNode(final Bytes32 hash) { - markNodes(Collections.singleton(hash)); + markedNodesCounter.inc(); + markLock.lock(); + try { + pendingMarks.add(hash); + maybeFlushPendingMarks(); + } finally { + markLock.unlock(); + } } private void markNodes(final Collection nodeHashes) { @@ -210,7 +224,7 @@ public class MarkSweepPruner { } } - void flushPendingMarks() { + private void flushPendingMarks() { markLock.lock(); try { final KeyValueStorageTransaction transaction = markStorage.startTransaction(); diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java index fb8c7e5116..aebd198830 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java @@ -35,6 +35,7 @@ public class Pruner { private final MarkSweepPruner pruningStrategy; private final Blockchain blockchain; private final ExecutorService executorService; + private Long blockAddedObserverId; private final long blocksRetained; private final AtomicReference state = new AtomicReference<>(State.IDLE); private volatile long markBlockNumber = 0; @@ -58,11 +59,14 @@ public class Pruner { public void start() { LOG.info("Starting Pruner."); - blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event)); + pruningStrategy.prepare(); + blockAddedObserverId = + blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event)); } public void stop() throws InterruptedException { pruningStrategy.cleanup(); + blockchain.removeObserver(blockAddedObserverId); executorService.awaitTermination(10, TimeUnit.SECONDS); } @@ -73,7 +77,6 @@ public class Pruner { final long blockNumber = event.getBlock().getHeader().getNumber(); if (state.compareAndSet(State.IDLE, State.MARK_BLOCK_CONFIRMATIONS_AWAITING)) { - pruningStrategy.prepare(); markBlockNumber = blockNumber; } else if (blockNumber >= markBlockNumber + blockConfirmations && state.compareAndSet(State.MARK_BLOCK_CONFIRMATIONS_AWAITING, State.MARKING)) { @@ -87,7 +90,6 @@ public class Pruner { } private void mark(final BlockHeader header) { - markBlockNumber = header.getNumber(); final Hash stateRoot = header.getStateRoot(); LOG.debug( "Begin marking used nodes for pruning. Block number: {} State root: {}", @@ -117,6 +119,7 @@ public class Pruner { executorService.execute(action); } catch (final Throwable t) { LOG.error("Pruning failed", t); + pruningStrategy.cleanup(); state.set(State.IDLE); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index ad9b013a73..51004654d8 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -170,8 +170,8 @@ public class DefaultSynchronizer implements Synchronizer { } private void startFullSync() { - fullSyncDownloader.start(); maybePruner.ifPresent(Pruner::start); + fullSyncDownloader.start(); } @Override