From 78ecf332376de20e8f29f88c8293f1c90ab469a1 Mon Sep 17 00:00:00 2001 From: Ratan Rai Sur Date: Wed, 25 Sep 2019 20:57:37 +0300 Subject: [PATCH] Fix some mark sweep pruner bugs where nodes that should be kept were being swept (Re-merge of #38) (#50) Adds integration tests to catch bug where the mark storage was being cleared before a mark began. Instead, the mark storage is now cleared upon preparing the mark sweep pruner Fixes bug where the pending marks where not being checked while pruning was occurring. By removing the flush in sweepBefore, the existing tests catch the bug. Signed-off-by: Ratan Rai Sur --- .../worldstate/PrunerIntegrationTest.java | 255 ++++++++++++++++++ .../ethereum/worldstate/MarkSweepPruner.java | 45 ++-- .../besu/ethereum/worldstate/Pruner.java | 27 +- .../worldstate/MarkSweepPrunerTest.java | 100 +------ .../besu/ethereum/worldstate/PrunerTest.java | 20 +- .../kvstore/InMemoryKeyValueStorage.java | 12 +- 6 files changed, 324 insertions(+), 135 deletions(-) create mode 100644 ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java diff --git a/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java b/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java new file mode 100644 index 0000000000..4008ef9b66 --- /dev/null +++ b/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java @@ -0,0 +1,255 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.worldstate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.hyperledger.besu.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain; + +import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.MutableWorldState; +import org.hyperledger.besu.ethereum.core.TransactionReceipt; +import org.hyperledger.besu.ethereum.core.WorldState; +import org.hyperledger.besu.ethereum.rlp.RLP; +import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage; +import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStatePreimageKeyValueStorage; +import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie; +import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; +import org.hyperledger.besu.testutil.MockExecutorService; +import org.hyperledger.besu.util.bytes.Bytes32; +import org.hyperledger.besu.util.bytes.BytesValue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.junit.Test; + +public class PrunerIntegrationTest { + + private final BlockDataGenerator gen = new BlockDataGenerator(); + private final NoOpMetricsSystem metricsSystem = new NoOpMetricsSystem(); + private final Map hashValueStore = new HashMap<>(); + private final InMemoryKeyValueStorage stateStorage = new TestInMemoryStorage(hashValueStore); + private final WorldStateStorage worldStateStorage = new WorldStateKeyValueStorage(stateStorage); + private final WorldStateArchive worldStateArchive = + new WorldStateArchive( + worldStateStorage, new WorldStatePreimageKeyValueStorage(new InMemoryKeyValueStorage())); + private final InMemoryKeyValueStorage markStorage = new InMemoryKeyValueStorage(); + private final Block genesisBlock = gen.genesisBlock(); + private final MutableBlockchain blockchain = createInMemoryBlockchain(genesisBlock); + + @Test + public void pruner_smallState_manyOpsPerTx() throws InterruptedException { + testPruner(3, 1, 1, 4, 1000); + } + + @Test + public void pruner_largeState_fewOpsPerTx() throws InterruptedException { + testPruner(2, 5, 5, 6, 5); + } + + @Test + public void pruner_emptyBlocks() throws InterruptedException { + testPruner(5, 0, 2, 5, 10); + } + + @Test + public void pruner_markChainhead() throws InterruptedException { + testPruner(4, 2, 1, 10, 20); + } + + @Test + public void pruner_lowRelativeBlockConfirmations() throws InterruptedException { + testPruner(3, 2, 1, 4, 20); + } + + @Test + public void pruner_highRelativeBlockConfirmations() throws InterruptedException { + testPruner(3, 2, 9, 10, 20); + } + + private void testPruner( + final int numCycles, + final int accountsPerBlock, + final long blockConfirmations, + final int numBlocksToKeep, + final int opsPerTransaction) + throws InterruptedException { + + final var markSweepPruner = + new MarkSweepPruner( + worldStateStorage, blockchain, markStorage, metricsSystem, opsPerTransaction); + final var pruner = + new Pruner( + markSweepPruner, + blockchain, + new MockExecutorService(), + new PruningConfiguration(blockConfirmations, numBlocksToKeep)); + + pruner.start(); + + for (int cycle = 0; cycle < numCycles; ++cycle) { + int numBlockInCycle = + numBlocksToKeep + + 1; // +1 to get it to switch from MARKING_COMPLETE TO SWEEPING on each cycle + var fullyMarkedBlockNum = cycle * numBlockInCycle + 1; + + // This should cause a full mark and sweep cycle + assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE); + generateBlockchainData(numBlockInCycle, accountsPerBlock); + assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE); + + // Collect the nodes we expect to keep + final Set expectedNodes = new HashSet<>(); + for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) { + final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot(); + collectWorldStateNodes(stateRoot, expectedNodes); + } + + if (accountsPerBlock != 0) { + assertThat(hashValueStore.size()) + .isGreaterThanOrEqualTo(expectedNodes.size()); // Sanity check + } + + // Assert that blocks from mark point onward are still accessible + for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) { + final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot(); + assertThat(worldStateArchive.get(stateRoot)).isPresent(); + final WorldState markedState = worldStateArchive.get(stateRoot).get(); + // Traverse accounts and make sure all are accessible + final int expectedAccounts = accountsPerBlock * i; + final long accounts = + markedState.streamAccounts(Bytes32.ZERO, expectedAccounts * 2).count(); + assertThat(accounts).isEqualTo(expectedAccounts); + // Traverse storage to ensure that all storage is accessible + markedState + .streamAccounts(Bytes32.ZERO, expectedAccounts * 2) + .forEach(a -> a.storageEntriesFrom(Bytes32.ZERO, 1000)); + } + + // All other state roots should have been removed + for (int i = 0; i < fullyMarkedBlockNum; i++) { + final BlockHeader curHeader = blockchain.getBlockHeader(i).get(); + if (!curHeader.getStateRoot().equals(Hash.EMPTY_TRIE_HASH)) { + assertThat(worldStateArchive.get(curHeader.getStateRoot())).isEmpty(); + } + } + + // Check that storage contains only the values we expect + assertThat(hashValueStore.size()).isEqualTo(expectedNodes.size()); + assertThat(hashValueStore.values()) + .containsExactlyInAnyOrderElementsOf( + expectedNodes.stream().map(BytesValue::getArrayUnsafe).collect(Collectors.toSet())); + } + + pruner.stop(); + } + + private void generateBlockchainData(final int numBlocks, final int numAccounts) { + Block parentBlock = blockchain.getChainHeadBlock(); + for (int i = 0; i < numBlocks; i++) { + final MutableWorldState worldState = + worldStateArchive.getMutable(parentBlock.getHeader().getStateRoot()).get(); + gen.createRandomContractAccountsWithNonEmptyStorage(worldState, numAccounts); + final Hash stateRoot = worldState.rootHash(); + + final Block block = + gen.block( + BlockOptions.create() + .setStateRoot(stateRoot) + .setBlockNumber(parentBlock.getHeader().getNumber() + 1L) + .setParentHash(parentBlock.getHash())); + final List receipts = gen.receipts(block); + blockchain.appendBlock(block, receipts); + parentBlock = block; + } + } + + private Set collectWorldStateNodes( + final Hash stateRootHash, final Set collector) { + final List storageRoots = new ArrayList<>(); + final MerklePatriciaTrie stateTrie = createStateTrie(stateRootHash); + + // Collect storage roots and code + stateTrie + .entriesFrom(Bytes32.ZERO, 1000) + .forEach( + (key, val) -> { + final StateTrieAccountValue accountValue = + StateTrieAccountValue.readFrom(RLP.input(val)); + stateStorage + .get(accountValue.getCodeHash().getArrayUnsafe()) + .ifPresent(v -> collector.add(BytesValue.wrap(v))); + storageRoots.add(accountValue.getStorageRoot()); + }); + + // Collect state nodes + collectTrieNodes(stateTrie, collector); + // Collect storage nodes + for (Hash storageRoot : storageRoots) { + final MerklePatriciaTrie storageTrie = createStorageTrie(storageRoot); + collectTrieNodes(storageTrie, collector); + } + + return collector; + } + + private void collectTrieNodes( + final MerklePatriciaTrie trie, final Set collector) { + final Bytes32 rootHash = trie.getRootHash(); + trie.visitAll( + (node) -> { + if (node.isReferencedByHash() || node.getHash().equals(rootHash)) { + collector.add(node.getRlp()); + } + }); + } + + private MerklePatriciaTrie createStateTrie(final Bytes32 rootHash) { + return new StoredMerklePatriciaTrie<>( + worldStateStorage::getAccountStateTrieNode, + rootHash, + Function.identity(), + Function.identity()); + } + + private MerklePatriciaTrie createStorageTrie(final Bytes32 rootHash) { + return new StoredMerklePatriciaTrie<>( + worldStateStorage::getAccountStorageTrieNode, + rootHash, + Function.identity(), + Function.identity()); + } + + // Proxy class so that we have access to the constructor that takes our own map + private static class TestInMemoryStorage extends InMemoryKeyValueStorage { + + public TestInMemoryStorage(final Map hashValueStore) { + super(hashValueStore); + } + } +} diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java index efa3b38a0e..951e265de7 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java @@ -54,7 +54,7 @@ public class MarkSweepPruner { private final Counter sweptNodesCounter; private volatile long nodeAddedListenerId; private final ReentrantLock markLock = new ReentrantLock(true); - private final Set pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>()); public MarkSweepPruner( final WorldStateStorage worldStateStorage, @@ -98,7 +98,9 @@ public class MarkSweepPruner { public void prepare() { worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case. - nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNewNodes); + markStorage.clear(); + pendingMarks.clear(); + nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes); } public void cleanup() { @@ -107,7 +109,6 @@ public class MarkSweepPruner { public void mark(final Hash rootHash) { markOperationCounter.inc(); - markStorage.clear(); createStateTrie(rootHash) .visitAll( node -> { @@ -119,13 +120,12 @@ public class MarkSweepPruner { markNode(node.getHash()); node.getValue().ifPresent(this::processAccountState); }); - LOG.info("Completed marking used nodes for pruning"); + LOG.debug("Completed marking used nodes for pruning"); } public void sweepBefore(final long markedBlockNumber) { - flushPendingMarks(); sweepOperationCounter.inc(); - LOG.info("Sweeping unused nodes"); + LOG.debug("Sweeping unused nodes"); // Sweep state roots first, walking backwards until we get to a state root that isn't in the // storage long prunedNodeCount = 0; @@ -138,7 +138,7 @@ public class MarkSweepPruner { break; } - if (!markStorage.containsKey(candidateStateRootHash.getArrayUnsafe())) { + if (!isMarked(candidateStateRootHash)) { updater.removeAccountStateTrieNode(candidateStateRootHash); prunedNodeCount++; if (prunedNodeCount % operationsPerTransaction == 0) { @@ -149,11 +149,19 @@ public class MarkSweepPruner { } updater.commit(); // Sweep non-state-root nodes - prunedNodeCount += worldStateStorage.prune(markStorage::containsKey); + prunedNodeCount += worldStateStorage.prune(this::isMarked); sweptNodesCounter.inc(prunedNodeCount); worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); markStorage.clear(); - LOG.info("Completed sweeping unused nodes"); + LOG.debug("Completed sweeping unused nodes"); + } + + private boolean isMarked(final Bytes32 key) { + return pendingMarks.contains(key) || markStorage.containsKey(key.getArrayUnsafe()); + } + + private boolean isMarked(final byte[] key) { + return pendingMarks.contains(Bytes32.wrap(key)) || markStorage.containsKey(key); } private MerklePatriciaTrie createStateTrie(final Bytes32 rootHash) { @@ -182,10 +190,14 @@ public class MarkSweepPruner { @VisibleForTesting void markNode(final Bytes32 hash) { - markedNodesCounter.inc(); + markNodes(Collections.singleton(hash)); + } + + private void markNodes(final Collection nodeHashes) { + markedNodesCounter.inc(nodeHashes.size()); markLock.lock(); try { - pendingMarks.add(hash); + pendingMarks.addAll(nodeHashes); maybeFlushPendingMarks(); } finally { markLock.unlock(); @@ -209,15 +221,4 @@ public class MarkSweepPruner { markLock.unlock(); } } - - private void markNewNodes(final Collection nodeHashes) { - markedNodesCounter.inc(nodeHashes.size()); - markLock.lock(); - try { - pendingMarks.addAll(nodeHashes); - maybeFlushPendingMarks(); - } finally { - markLock.unlock(); - } - } } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java index b76a7978eb..fb8c7e5116 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java @@ -14,6 +14,8 @@ */ package org.hyperledger.besu.ethereum.worldstate; +import static com.google.common.base.Preconditions.checkArgument; + import org.hyperledger.besu.ethereum.chain.BlockAddedEvent; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -23,6 +25,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,15 +51,13 @@ public class Pruner { this.blockchain = blockchain; this.blocksRetained = pruningConfiguration.getBlocksRetained(); this.blockConfirmations = pruningConfiguration.getBlockConfirmations(); - if (blockConfirmations < 0 || blocksRetained < 0) { - throw new IllegalArgumentException( - String.format( - "blockConfirmations and blocksRetained must be non-negative. blockConfirmations=%d, blocksRetained=%d", - blockConfirmations, blocksRetained)); - } + checkArgument( + blockConfirmations >= 0 && blockConfirmations < blocksRetained, + "blockConfirmations and blocksRetained must be non-negative. blockConfirmations must be less than blockRetained."); } public void start() { + LOG.info("Starting Pruner."); blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event)); } @@ -88,7 +89,7 @@ public class Pruner { private void mark(final BlockHeader header) { markBlockNumber = header.getNumber(); final Hash stateRoot = header.getStateRoot(); - LOG.info( + LOG.debug( "Begin marking used nodes for pruning. Block number: {} State root: {}", markBlockNumber, stateRoot); @@ -100,7 +101,10 @@ public class Pruner { } private void sweep() { - LOG.info("Begin sweeping unused nodes for pruning. Retention period: {}", blocksRetained); + LOG.debug( + "Begin sweeping unused nodes for pruning. Keeping full state for blocks {} to {}", + markBlockNumber, + markBlockNumber + blocksRetained); execute( () -> { pruningStrategy.sweepBefore(markBlockNumber); @@ -117,7 +121,12 @@ public class Pruner { } } - private enum State { + @VisibleForTesting + State getState() { + return state.get(); + } + + enum State { IDLE, MARK_BLOCK_CONFIRMATIONS_AWAITING, MARKING, diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java index efa0c089b9..e2fcddab72 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java @@ -65,105 +65,6 @@ public class MarkSweepPrunerTest { private final Block genesisBlock = gen.genesisBlock(); private final MutableBlockchain blockchain = createInMemoryBlockchain(genesisBlock); - @Test - public void prepareMarkAndSweep_smallState_manyOpsPerTx() { - testPrepareMarkAndSweep(3, 1, 2, 1000); - } - - @Test - public void prepareMarkAndSweep_largeState_fewOpsPerTx() { - testPrepareMarkAndSweep(20, 5, 5, 5); - } - - @Test - public void prepareMarkAndSweep_emptyBlocks() { - testPrepareMarkAndSweep(10, 0, 5, 10); - } - - @Test - public void prepareMarkAndSweep_markChainhead() { - testPrepareMarkAndSweep(10, 2, 10, 20); - } - - @Test - public void prepareMarkAndSweep_markGenesis() { - testPrepareMarkAndSweep(10, 2, 0, 20); - } - - @Test - public void prepareMarkAndSweep_multipleRounds() { - testPrepareMarkAndSweep(10, 2, 10, 20); - testPrepareMarkAndSweep(10, 2, 15, 20); - } - - private void testPrepareMarkAndSweep( - final int numBlocks, - final int accountsPerBlock, - final int markBlockNumber, - final int opsPerTransaction) { - final MarkSweepPruner pruner = - new MarkSweepPruner( - worldStateStorage, blockchain, markStorage, metricsSystem, opsPerTransaction); - final int chainHeight = (int) blockchain.getChainHead().getHeight(); - // Generate blocks up to markBlockNumber - final int blockCountBeforeMarkedBlock = markBlockNumber - chainHeight; - generateBlockchainData(blockCountBeforeMarkedBlock, accountsPerBlock); - - // Prepare - pruner.prepare(); - // Mark - final BlockHeader markBlock = blockchain.getBlockHeader(markBlockNumber).get(); - pruner.mark(markBlock.getStateRoot()); - - // Generate more blocks that should be kept - generateBlockchainData(numBlocks - blockCountBeforeMarkedBlock, accountsPerBlock); - - // Collect the nodes we expect to keep - final Set expectedNodes = collectWorldStateNodes(markBlock.getStateRoot()); - for (int i = markBlockNumber; i <= blockchain.getChainHeadBlockNumber(); i++) { - final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot(); - collectWorldStateNodes(stateRoot, expectedNodes); - } - if (accountsPerBlock != 0 && markBlockNumber > 0) { - assertThat(hashValueStore.size()).isGreaterThan(expectedNodes.size()); // Sanity check - } - - // Sweep - pruner.sweepBefore(markBlock.getNumber()); - - // Assert that blocks from mark point onward are still accessible - for (int i = markBlockNumber; i <= blockchain.getChainHeadBlockNumber(); i++) { - final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot(); - assertThat(worldStateArchive.get(stateRoot)).isPresent(); - final WorldState markedState = worldStateArchive.get(stateRoot).get(); - // Traverse accounts and make sure all are accessible - final int expectedAccounts = accountsPerBlock * i; - final long accounts = markedState.streamAccounts(Bytes32.ZERO, expectedAccounts * 2).count(); - assertThat(accounts).isEqualTo(expectedAccounts); - // Traverse storage to ensure that all storage is accessible - markedState - .streamAccounts(Bytes32.ZERO, expectedAccounts * 2) - .forEach(a -> a.storageEntriesFrom(Bytes32.ZERO, 1000)); - } - - // All other state roots should have been removed - for (int i = 0; i < markBlockNumber; i++) { - final BlockHeader curHeader = blockchain.getBlockHeader(i + 1L).get(); - if (curHeader.getNumber() == markBlock.getNumber()) { - continue; - } - if (!curHeader.getStateRoot().equals(Hash.EMPTY_TRIE_HASH)) { - assertThat(worldStateArchive.get(curHeader.getStateRoot())).isEmpty(); - } - } - - // Check that storage contains only the values we expect - assertThat(hashValueStore.size()).isEqualTo(expectedNodes.size()); - assertThat(hashValueStore.values()) - .containsExactlyInAnyOrderElementsOf( - expectedNodes.stream().map(BytesValue::getArrayUnsafe).collect(Collectors.toSet())); - } - @Test public void mark_marksAllExpectedNodes() { final MarkSweepPruner pruner = @@ -362,6 +263,7 @@ public class MarkSweepPrunerTest { Function.identity()); } + // Proxy class so that we have access to the constructor that takes our own map private static class TestInMemoryStorage extends InMemoryKeyValueStorage { public TestInMemoryStorage(final Map hashValueStore) { diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java index 1b13cdbcdb..b60fd54348 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java @@ -65,7 +65,7 @@ public class PrunerTest { final Pruner pruner = new Pruner( - markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 0)); + markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1)); pruner.start(); final Block block1 = appendBlockWithParent(blockchain, genesisBlock); @@ -159,6 +159,22 @@ public class PrunerTest { mockExecutorService, new PruningConfiguration(-1, -2))) .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + new Pruner( + markSweepPruner, + mockchain, + mockExecutorService, + new PruningConfiguration(10, 8))) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + new Pruner( + markSweepPruner, + mockchain, + mockExecutorService, + new PruningConfiguration(10, 10))) + .isInstanceOf(IllegalArgumentException.class); } @Test @@ -171,7 +187,7 @@ public class PrunerTest { final Pruner pruner = new Pruner( - markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 0)); + markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1)); pruner.start(); pruner.stop(); verify(markSweepPruner).cleanup(); diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java index 7b8efdc7ab..59ae98c626 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java @@ -77,9 +77,15 @@ public class InMemoryKeyValueStorage implements KeyValueStorage { @Override public long removeAllKeysUnless(final Predicate retainCondition) throws StorageException { - long initialSize = hashValueStore.keySet().size(); - hashValueStore.keySet().removeIf(key -> !retainCondition.test(key.getArrayUnsafe())); - return initialSize - hashValueStore.keySet().size(); + final Lock lock = rwLock.writeLock(); + lock.lock(); + try { + long initialSize = hashValueStore.keySet().size(); + hashValueStore.keySet().removeIf(key -> !retainCondition.test(key.getArrayUnsafe())); + return initialSize - hashValueStore.keySet().size(); + } finally { + lock.unlock(); + } } @Override