Fix some mark sweep pruner bugs where nodes that should be kept were being swept (Re-merge of #38) (#50)

Adds integration tests to catch bug where the mark storage was being cleared before a mark began. Instead, the mark storage is now cleared upon preparing the mark sweep pruner

Fixes bug where the pending marks where not being checked while pruning was occurring. By removing the flush in sweepBefore, the existing tests catch the bug.

Signed-off-by: Ratan Rai Sur <ratan.r.sur@gmail.com>
pull/51/head
Ratan Rai Sur 5 years ago committed by GitHub
parent 497e825919
commit 78ecf33237
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 255
      ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java
  2. 45
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java
  3. 27
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java
  4. 100
      ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java
  5. 20
      ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java
  6. 6
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java

@ -0,0 +1,255 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.worldstate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.WorldState;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStatePreimageKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.MockExecutorService;
import org.hyperledger.besu.util.bytes.Bytes32;
import org.hyperledger.besu.util.bytes.BytesValue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.Test;
public class PrunerIntegrationTest {
private final BlockDataGenerator gen = new BlockDataGenerator();
private final NoOpMetricsSystem metricsSystem = new NoOpMetricsSystem();
private final Map<BytesValue, byte[]> hashValueStore = new HashMap<>();
private final InMemoryKeyValueStorage stateStorage = new TestInMemoryStorage(hashValueStore);
private final WorldStateStorage worldStateStorage = new WorldStateKeyValueStorage(stateStorage);
private final WorldStateArchive worldStateArchive =
new WorldStateArchive(
worldStateStorage, new WorldStatePreimageKeyValueStorage(new InMemoryKeyValueStorage()));
private final InMemoryKeyValueStorage markStorage = new InMemoryKeyValueStorage();
private final Block genesisBlock = gen.genesisBlock();
private final MutableBlockchain blockchain = createInMemoryBlockchain(genesisBlock);
@Test
public void pruner_smallState_manyOpsPerTx() throws InterruptedException {
testPruner(3, 1, 1, 4, 1000);
}
@Test
public void pruner_largeState_fewOpsPerTx() throws InterruptedException {
testPruner(2, 5, 5, 6, 5);
}
@Test
public void pruner_emptyBlocks() throws InterruptedException {
testPruner(5, 0, 2, 5, 10);
}
@Test
public void pruner_markChainhead() throws InterruptedException {
testPruner(4, 2, 1, 10, 20);
}
@Test
public void pruner_lowRelativeBlockConfirmations() throws InterruptedException {
testPruner(3, 2, 1, 4, 20);
}
@Test
public void pruner_highRelativeBlockConfirmations() throws InterruptedException {
testPruner(3, 2, 9, 10, 20);
}
private void testPruner(
final int numCycles,
final int accountsPerBlock,
final long blockConfirmations,
final int numBlocksToKeep,
final int opsPerTransaction)
throws InterruptedException {
final var markSweepPruner =
new MarkSweepPruner(
worldStateStorage, blockchain, markStorage, metricsSystem, opsPerTransaction);
final var pruner =
new Pruner(
markSweepPruner,
blockchain,
new MockExecutorService(),
new PruningConfiguration(blockConfirmations, numBlocksToKeep));
pruner.start();
for (int cycle = 0; cycle < numCycles; ++cycle) {
int numBlockInCycle =
numBlocksToKeep
+ 1; // +1 to get it to switch from MARKING_COMPLETE TO SWEEPING on each cycle
var fullyMarkedBlockNum = cycle * numBlockInCycle + 1;
// This should cause a full mark and sweep cycle
assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE);
generateBlockchainData(numBlockInCycle, accountsPerBlock);
assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE);
// Collect the nodes we expect to keep
final Set<BytesValue> expectedNodes = new HashSet<>();
for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
collectWorldStateNodes(stateRoot, expectedNodes);
}
if (accountsPerBlock != 0) {
assertThat(hashValueStore.size())
.isGreaterThanOrEqualTo(expectedNodes.size()); // Sanity check
}
// Assert that blocks from mark point onward are still accessible
for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
assertThat(worldStateArchive.get(stateRoot)).isPresent();
final WorldState markedState = worldStateArchive.get(stateRoot).get();
// Traverse accounts and make sure all are accessible
final int expectedAccounts = accountsPerBlock * i;
final long accounts =
markedState.streamAccounts(Bytes32.ZERO, expectedAccounts * 2).count();
assertThat(accounts).isEqualTo(expectedAccounts);
// Traverse storage to ensure that all storage is accessible
markedState
.streamAccounts(Bytes32.ZERO, expectedAccounts * 2)
.forEach(a -> a.storageEntriesFrom(Bytes32.ZERO, 1000));
}
// All other state roots should have been removed
for (int i = 0; i < fullyMarkedBlockNum; i++) {
final BlockHeader curHeader = blockchain.getBlockHeader(i).get();
if (!curHeader.getStateRoot().equals(Hash.EMPTY_TRIE_HASH)) {
assertThat(worldStateArchive.get(curHeader.getStateRoot())).isEmpty();
}
}
// Check that storage contains only the values we expect
assertThat(hashValueStore.size()).isEqualTo(expectedNodes.size());
assertThat(hashValueStore.values())
.containsExactlyInAnyOrderElementsOf(
expectedNodes.stream().map(BytesValue::getArrayUnsafe).collect(Collectors.toSet()));
}
pruner.stop();
}
private void generateBlockchainData(final int numBlocks, final int numAccounts) {
Block parentBlock = blockchain.getChainHeadBlock();
for (int i = 0; i < numBlocks; i++) {
final MutableWorldState worldState =
worldStateArchive.getMutable(parentBlock.getHeader().getStateRoot()).get();
gen.createRandomContractAccountsWithNonEmptyStorage(worldState, numAccounts);
final Hash stateRoot = worldState.rootHash();
final Block block =
gen.block(
BlockOptions.create()
.setStateRoot(stateRoot)
.setBlockNumber(parentBlock.getHeader().getNumber() + 1L)
.setParentHash(parentBlock.getHash()));
final List<TransactionReceipt> receipts = gen.receipts(block);
blockchain.appendBlock(block, receipts);
parentBlock = block;
}
}
private Set<BytesValue> collectWorldStateNodes(
final Hash stateRootHash, final Set<BytesValue> collector) {
final List<Hash> storageRoots = new ArrayList<>();
final MerklePatriciaTrie<Bytes32, BytesValue> stateTrie = createStateTrie(stateRootHash);
// Collect storage roots and code
stateTrie
.entriesFrom(Bytes32.ZERO, 1000)
.forEach(
(key, val) -> {
final StateTrieAccountValue accountValue =
StateTrieAccountValue.readFrom(RLP.input(val));
stateStorage
.get(accountValue.getCodeHash().getArrayUnsafe())
.ifPresent(v -> collector.add(BytesValue.wrap(v)));
storageRoots.add(accountValue.getStorageRoot());
});
// Collect state nodes
collectTrieNodes(stateTrie, collector);
// Collect storage nodes
for (Hash storageRoot : storageRoots) {
final MerklePatriciaTrie<Bytes32, BytesValue> storageTrie = createStorageTrie(storageRoot);
collectTrieNodes(storageTrie, collector);
}
return collector;
}
private void collectTrieNodes(
final MerklePatriciaTrie<Bytes32, BytesValue> trie, final Set<BytesValue> collector) {
final Bytes32 rootHash = trie.getRootHash();
trie.visitAll(
(node) -> {
if (node.isReferencedByHash() || node.getHash().equals(rootHash)) {
collector.add(node.getRlp());
}
});
}
private MerklePatriciaTrie<Bytes32, BytesValue> createStateTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStateTrieNode,
rootHash,
Function.identity(),
Function.identity());
}
private MerklePatriciaTrie<Bytes32, BytesValue> createStorageTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStorageTrieNode,
rootHash,
Function.identity(),
Function.identity());
}
// Proxy class so that we have access to the constructor that takes our own map
private static class TestInMemoryStorage extends InMemoryKeyValueStorage {
public TestInMemoryStorage(final Map<BytesValue, byte[]> hashValueStore) {
super(hashValueStore);
}
}
}

@ -54,7 +54,7 @@ public class MarkSweepPruner {
private final Counter sweptNodesCounter;
private volatile long nodeAddedListenerId;
private final ReentrantLock markLock = new ReentrantLock(true);
private final Set<BytesValue> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Bytes32> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());
public MarkSweepPruner(
final WorldStateStorage worldStateStorage,
@ -98,7 +98,9 @@ public class MarkSweepPruner {
public void prepare() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case.
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNewNodes);
markStorage.clear();
pendingMarks.clear();
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes);
}
public void cleanup() {
@ -107,7 +109,6 @@ public class MarkSweepPruner {
public void mark(final Hash rootHash) {
markOperationCounter.inc();
markStorage.clear();
createStateTrie(rootHash)
.visitAll(
node -> {
@ -119,13 +120,12 @@ public class MarkSweepPruner {
markNode(node.getHash());
node.getValue().ifPresent(this::processAccountState);
});
LOG.info("Completed marking used nodes for pruning");
LOG.debug("Completed marking used nodes for pruning");
}
public void sweepBefore(final long markedBlockNumber) {
flushPendingMarks();
sweepOperationCounter.inc();
LOG.info("Sweeping unused nodes");
LOG.debug("Sweeping unused nodes");
// Sweep state roots first, walking backwards until we get to a state root that isn't in the
// storage
long prunedNodeCount = 0;
@ -138,7 +138,7 @@ public class MarkSweepPruner {
break;
}
if (!markStorage.containsKey(candidateStateRootHash.getArrayUnsafe())) {
if (!isMarked(candidateStateRootHash)) {
updater.removeAccountStateTrieNode(candidateStateRootHash);
prunedNodeCount++;
if (prunedNodeCount % operationsPerTransaction == 0) {
@ -149,11 +149,19 @@ public class MarkSweepPruner {
}
updater.commit();
// Sweep non-state-root nodes
prunedNodeCount += worldStateStorage.prune(markStorage::containsKey);
prunedNodeCount += worldStateStorage.prune(this::isMarked);
sweptNodesCounter.inc(prunedNodeCount);
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
markStorage.clear();
LOG.info("Completed sweeping unused nodes");
LOG.debug("Completed sweeping unused nodes");
}
private boolean isMarked(final Bytes32 key) {
return pendingMarks.contains(key) || markStorage.containsKey(key.getArrayUnsafe());
}
private boolean isMarked(final byte[] key) {
return pendingMarks.contains(Bytes32.wrap(key)) || markStorage.containsKey(key);
}
private MerklePatriciaTrie<Bytes32, BytesValue> createStateTrie(final Bytes32 rootHash) {
@ -182,10 +190,14 @@ public class MarkSweepPruner {
@VisibleForTesting
void markNode(final Bytes32 hash) {
markedNodesCounter.inc();
markNodes(Collections.singleton(hash));
}
private void markNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markLock.lock();
try {
pendingMarks.add(hash);
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
@ -209,15 +221,4 @@ public class MarkSweepPruner {
markLock.unlock();
}
}
private void markNewNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markLock.lock();
try {
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
}
}

@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.worldstate;
import static com.google.common.base.Preconditions.checkArgument;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@ -23,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -48,15 +51,13 @@ public class Pruner {
this.blockchain = blockchain;
this.blocksRetained = pruningConfiguration.getBlocksRetained();
this.blockConfirmations = pruningConfiguration.getBlockConfirmations();
if (blockConfirmations < 0 || blocksRetained < 0) {
throw new IllegalArgumentException(
String.format(
"blockConfirmations and blocksRetained must be non-negative. blockConfirmations=%d, blocksRetained=%d",
blockConfirmations, blocksRetained));
}
checkArgument(
blockConfirmations >= 0 && blockConfirmations < blocksRetained,
"blockConfirmations and blocksRetained must be non-negative. blockConfirmations must be less than blockRetained.");
}
public void start() {
LOG.info("Starting Pruner.");
blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event));
}
@ -88,7 +89,7 @@ public class Pruner {
private void mark(final BlockHeader header) {
markBlockNumber = header.getNumber();
final Hash stateRoot = header.getStateRoot();
LOG.info(
LOG.debug(
"Begin marking used nodes for pruning. Block number: {} State root: {}",
markBlockNumber,
stateRoot);
@ -100,7 +101,10 @@ public class Pruner {
}
private void sweep() {
LOG.info("Begin sweeping unused nodes for pruning. Retention period: {}", blocksRetained);
LOG.debug(
"Begin sweeping unused nodes for pruning. Keeping full state for blocks {} to {}",
markBlockNumber,
markBlockNumber + blocksRetained);
execute(
() -> {
pruningStrategy.sweepBefore(markBlockNumber);
@ -117,7 +121,12 @@ public class Pruner {
}
}
private enum State {
@VisibleForTesting
State getState() {
return state.get();
}
enum State {
IDLE,
MARK_BLOCK_CONFIRMATIONS_AWAITING,
MARKING,

@ -65,105 +65,6 @@ public class MarkSweepPrunerTest {
private final Block genesisBlock = gen.genesisBlock();
private final MutableBlockchain blockchain = createInMemoryBlockchain(genesisBlock);
@Test
public void prepareMarkAndSweep_smallState_manyOpsPerTx() {
testPrepareMarkAndSweep(3, 1, 2, 1000);
}
@Test
public void prepareMarkAndSweep_largeState_fewOpsPerTx() {
testPrepareMarkAndSweep(20, 5, 5, 5);
}
@Test
public void prepareMarkAndSweep_emptyBlocks() {
testPrepareMarkAndSweep(10, 0, 5, 10);
}
@Test
public void prepareMarkAndSweep_markChainhead() {
testPrepareMarkAndSweep(10, 2, 10, 20);
}
@Test
public void prepareMarkAndSweep_markGenesis() {
testPrepareMarkAndSweep(10, 2, 0, 20);
}
@Test
public void prepareMarkAndSweep_multipleRounds() {
testPrepareMarkAndSweep(10, 2, 10, 20);
testPrepareMarkAndSweep(10, 2, 15, 20);
}
private void testPrepareMarkAndSweep(
final int numBlocks,
final int accountsPerBlock,
final int markBlockNumber,
final int opsPerTransaction) {
final MarkSweepPruner pruner =
new MarkSweepPruner(
worldStateStorage, blockchain, markStorage, metricsSystem, opsPerTransaction);
final int chainHeight = (int) blockchain.getChainHead().getHeight();
// Generate blocks up to markBlockNumber
final int blockCountBeforeMarkedBlock = markBlockNumber - chainHeight;
generateBlockchainData(blockCountBeforeMarkedBlock, accountsPerBlock);
// Prepare
pruner.prepare();
// Mark
final BlockHeader markBlock = blockchain.getBlockHeader(markBlockNumber).get();
pruner.mark(markBlock.getStateRoot());
// Generate more blocks that should be kept
generateBlockchainData(numBlocks - blockCountBeforeMarkedBlock, accountsPerBlock);
// Collect the nodes we expect to keep
final Set<BytesValue> expectedNodes = collectWorldStateNodes(markBlock.getStateRoot());
for (int i = markBlockNumber; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
collectWorldStateNodes(stateRoot, expectedNodes);
}
if (accountsPerBlock != 0 && markBlockNumber > 0) {
assertThat(hashValueStore.size()).isGreaterThan(expectedNodes.size()); // Sanity check
}
// Sweep
pruner.sweepBefore(markBlock.getNumber());
// Assert that blocks from mark point onward are still accessible
for (int i = markBlockNumber; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
assertThat(worldStateArchive.get(stateRoot)).isPresent();
final WorldState markedState = worldStateArchive.get(stateRoot).get();
// Traverse accounts and make sure all are accessible
final int expectedAccounts = accountsPerBlock * i;
final long accounts = markedState.streamAccounts(Bytes32.ZERO, expectedAccounts * 2).count();
assertThat(accounts).isEqualTo(expectedAccounts);
// Traverse storage to ensure that all storage is accessible
markedState
.streamAccounts(Bytes32.ZERO, expectedAccounts * 2)
.forEach(a -> a.storageEntriesFrom(Bytes32.ZERO, 1000));
}
// All other state roots should have been removed
for (int i = 0; i < markBlockNumber; i++) {
final BlockHeader curHeader = blockchain.getBlockHeader(i + 1L).get();
if (curHeader.getNumber() == markBlock.getNumber()) {
continue;
}
if (!curHeader.getStateRoot().equals(Hash.EMPTY_TRIE_HASH)) {
assertThat(worldStateArchive.get(curHeader.getStateRoot())).isEmpty();
}
}
// Check that storage contains only the values we expect
assertThat(hashValueStore.size()).isEqualTo(expectedNodes.size());
assertThat(hashValueStore.values())
.containsExactlyInAnyOrderElementsOf(
expectedNodes.stream().map(BytesValue::getArrayUnsafe).collect(Collectors.toSet()));
}
@Test
public void mark_marksAllExpectedNodes() {
final MarkSweepPruner pruner =
@ -362,6 +263,7 @@ public class MarkSweepPrunerTest {
Function.identity());
}
// Proxy class so that we have access to the constructor that takes our own map
private static class TestInMemoryStorage extends InMemoryKeyValueStorage {
public TestInMemoryStorage(final Map<BytesValue, byte[]> hashValueStore) {

@ -65,7 +65,7 @@ public class PrunerTest {
final Pruner pruner =
new Pruner(
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 0));
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1));
pruner.start();
final Block block1 = appendBlockWithParent(blockchain, genesisBlock);
@ -159,6 +159,22 @@ public class PrunerTest {
mockExecutorService,
new PruningConfiguration(-1, -2)))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(
() ->
new Pruner(
markSweepPruner,
mockchain,
mockExecutorService,
new PruningConfiguration(10, 8)))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(
() ->
new Pruner(
markSweepPruner,
mockchain,
mockExecutorService,
new PruningConfiguration(10, 10)))
.isInstanceOf(IllegalArgumentException.class);
}
@Test
@ -171,7 +187,7 @@ public class PrunerTest {
final Pruner pruner =
new Pruner(
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 0));
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1));
pruner.start();
pruner.stop();
verify(markSweepPruner).cleanup();

@ -77,9 +77,15 @@ public class InMemoryKeyValueStorage implements KeyValueStorage {
@Override
public long removeAllKeysUnless(final Predicate<byte[]> retainCondition) throws StorageException {
final Lock lock = rwLock.writeLock();
lock.lock();
try {
long initialSize = hashValueStore.keySet().size();
hashValueStore.keySet().removeIf(key -> !retainCondition.test(key.getArrayUnsafe()));
return initialSize - hashValueStore.keySet().size();
} finally {
lock.unlock();
}
}
@Override

Loading…
Cancel
Save