mirror of https://github.com/hyperledger/besu
Add chain data pruning experimental feature (#4686)
* Add chain pruner * Increase minimum blocks to retain * Skip ancestor check in pruning mode * Separate class for pruning storage * Move pruning to separate thread * Limit total pruning threads Signed-off-by: wcgcyx <wcgcyx@gmail.com> Signed-off-by: Zhenyang Shi <wcgcyx@gmail.com> Co-authored-by: Simon Dudley <simon.l.dudley@hotmail.com> Co-authored-by: Jason Frame <jason.frame@consensys.net>pull/4826/head
parent
f3b7db1580
commit
97588ae744
@ -0,0 +1,90 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
* |
||||
*/ |
||||
package org.hyperledger.besu.cli.options.unstable; |
||||
|
||||
import org.hyperledger.besu.cli.options.CLIOptions; |
||||
import org.hyperledger.besu.ethereum.chain.ChainPrunerConfiguration; |
||||
import org.hyperledger.besu.util.number.PositiveNumber; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
|
||||
import picocli.CommandLine; |
||||
|
||||
public class ChainPruningOptions implements CLIOptions<ChainPrunerConfiguration> { |
||||
private static final String CHAIN_PRUNING_ENABLED_FLAG = "--Xchain-pruning-enabled"; |
||||
private static final String CHAIN_PRUNING_BLOCKS_RETAINED_FLAG = |
||||
"--Xchain-pruning-blocks-retained"; |
||||
private static final String CHAIN_PRUNING_FREQUENCY_FLAG = "--Xchain-pruning-frequency"; |
||||
public static final long DEFAULT_CHAIN_DATA_PRUNING_MIN_BLOCKS_RETAINED = 7200; |
||||
public static final int DEFAULT_CHAIN_DATA_PRUNING_FREQUENCY = 256; |
||||
|
||||
@CommandLine.Option( |
||||
hidden = true, |
||||
names = {CHAIN_PRUNING_ENABLED_FLAG}, |
||||
description = |
||||
"Enable the chain pruner to actively prune old chain data (default: ${DEFAULT-VALUE})") |
||||
private final Boolean chainDataPruningEnabled = Boolean.FALSE; |
||||
|
||||
@CommandLine.Option( |
||||
hidden = true, |
||||
names = {CHAIN_PRUNING_BLOCKS_RETAINED_FLAG}, |
||||
description = |
||||
"The number of recent blocks for which to keep the chain data. Must be >= " |
||||
+ DEFAULT_CHAIN_DATA_PRUNING_MIN_BLOCKS_RETAINED |
||||
+ " (default: ${DEFAULT-VALUE})") |
||||
private final Long chainDataPruningBlocksRetained = |
||||
DEFAULT_CHAIN_DATA_PRUNING_MIN_BLOCKS_RETAINED; |
||||
|
||||
@CommandLine.Option( |
||||
hidden = true, |
||||
names = {CHAIN_PRUNING_FREQUENCY_FLAG}, |
||||
description = |
||||
"The number of blocks added to the chain between two pruning operations. Must be non-negative (default: ${DEFAULT-VALUE})") |
||||
private final PositiveNumber chainDataPruningBlocksFrequency = |
||||
PositiveNumber.fromInt(DEFAULT_CHAIN_DATA_PRUNING_FREQUENCY); |
||||
|
||||
public static ChainPruningOptions create() { |
||||
return new ChainPruningOptions(); |
||||
} |
||||
|
||||
public Boolean getChainDataPruningEnabled() { |
||||
return chainDataPruningEnabled; |
||||
} |
||||
|
||||
public Long getChainDataPruningBlocksRetained() { |
||||
return chainDataPruningBlocksRetained; |
||||
} |
||||
|
||||
@Override |
||||
public ChainPrunerConfiguration toDomainObject() { |
||||
return new ChainPrunerConfiguration( |
||||
chainDataPruningEnabled, |
||||
chainDataPruningBlocksRetained, |
||||
chainDataPruningBlocksFrequency.getValue()); |
||||
} |
||||
|
||||
@Override |
||||
public List<String> getCLIOptions() { |
||||
return Arrays.asList( |
||||
CHAIN_PRUNING_ENABLED_FLAG, |
||||
chainDataPruningEnabled.toString(), |
||||
CHAIN_PRUNING_BLOCKS_RETAINED_FLAG, |
||||
chainDataPruningBlocksRetained.toString(), |
||||
CHAIN_PRUNING_FREQUENCY_FLAG, |
||||
chainDataPruningBlocksFrequency.toString()); |
||||
} |
||||
} |
@ -0,0 +1,111 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
* |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.chain; |
||||
|
||||
import org.hyperledger.besu.datatypes.Hash; |
||||
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; |
||||
|
||||
import java.util.Collection; |
||||
import java.util.concurrent.ExecutorService; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
public class ChainDataPruner implements BlockAddedObserver { |
||||
public static final int MAX_PRUNING_THREAD_QUEUE_SIZE = 16; |
||||
private static final Logger LOG = LoggerFactory.getLogger(ChainDataPruner.class); |
||||
private final BlockchainStorage blockchainStorage; |
||||
private final ChainDataPrunerStorage prunerStorage; |
||||
private final long blocksToRetain; |
||||
private final long pruningFrequency; |
||||
private final ExecutorService pruningExecutor; |
||||
|
||||
public ChainDataPruner( |
||||
final BlockchainStorage blockchainStorage, |
||||
final ChainDataPrunerStorage prunerStorage, |
||||
final long blocksToRetain, |
||||
final long pruningFrequency, |
||||
final ExecutorService pruningExecutor) { |
||||
this.blockchainStorage = blockchainStorage; |
||||
this.prunerStorage = prunerStorage; |
||||
this.blocksToRetain = blocksToRetain; |
||||
this.pruningFrequency = pruningFrequency; |
||||
this.pruningExecutor = pruningExecutor; |
||||
} |
||||
|
||||
@Override |
||||
public void onBlockAdded(final BlockAddedEvent event) { |
||||
final long blockNumber = event.getBlock().getHeader().getNumber(); |
||||
final long storedPruningMark = prunerStorage.getPruningMark().orElse(blockNumber); |
||||
if (blockNumber < storedPruningMark) { |
||||
LOG.warn( |
||||
"Block added event: " |
||||
+ event |
||||
+ " has a block number of " |
||||
+ blockNumber |
||||
+ " < pruning mark " |
||||
+ storedPruningMark |
||||
+ " which normally indicates chain-pruning-blocks-retained is too small"); |
||||
return; |
||||
} |
||||
final KeyValueStorageTransaction recordBlockHashesTransaction = |
||||
prunerStorage.startTransaction(); |
||||
final Collection<Hash> forkBlocks = prunerStorage.getForkBlocks(blockNumber); |
||||
forkBlocks.add(event.getBlock().getHash()); |
||||
prunerStorage.setForkBlocks(recordBlockHashesTransaction, blockNumber, forkBlocks); |
||||
recordBlockHashesTransaction.commit(); |
||||
|
||||
pruningExecutor.submit( |
||||
() -> { |
||||
final KeyValueStorageTransaction pruningTransaction = prunerStorage.startTransaction(); |
||||
long currentPruningMark = storedPruningMark; |
||||
final long newPruningMark = blockNumber - blocksToRetain; |
||||
final long blocksToBePruned = newPruningMark - currentPruningMark; |
||||
if (event.isNewCanonicalHead() && blocksToBePruned >= pruningFrequency) { |
||||
long currentRetainedBlock = blockNumber - currentPruningMark + 1; |
||||
while (currentRetainedBlock > blocksToRetain) { |
||||
LOG.debug("Pruning chain data with block height of " + currentPruningMark); |
||||
pruneChainDataAtBlock(pruningTransaction, currentPruningMark); |
||||
currentPruningMark++; |
||||
currentRetainedBlock = blockNumber - currentPruningMark; |
||||
} |
||||
} |
||||
prunerStorage.setPruningMark(pruningTransaction, currentPruningMark); |
||||
pruningTransaction.commit(); |
||||
}); |
||||
} |
||||
|
||||
private void pruneChainDataAtBlock(final KeyValueStorageTransaction tx, final long blockNumber) { |
||||
final Collection<Hash> oldForkBlocks = prunerStorage.getForkBlocks(blockNumber); |
||||
final BlockchainStorage.Updater updater = blockchainStorage.updater(); |
||||
for (final Hash toPrune : oldForkBlocks) { |
||||
updater.removeBlockHeader(toPrune); |
||||
updater.removeBlockBody(toPrune); |
||||
updater.removeTransactionReceipts(toPrune); |
||||
updater.removeTotalDifficulty(toPrune); |
||||
blockchainStorage |
||||
.getBlockBody(toPrune) |
||||
.ifPresent( |
||||
blockBody -> |
||||
blockBody |
||||
.getTransactions() |
||||
.forEach(t -> updater.removeTransactionLocation(t.getHash()))); |
||||
} |
||||
updater.removeBlockHash(blockNumber); |
||||
updater.commit(); |
||||
prunerStorage.removeForkBlocks(tx, blockNumber); |
||||
} |
||||
} |
@ -0,0 +1,99 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
* |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.chain; |
||||
|
||||
import org.hyperledger.besu.datatypes.Hash; |
||||
import org.hyperledger.besu.ethereum.rlp.RLP; |
||||
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; |
||||
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; |
||||
|
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.Collection; |
||||
import java.util.Optional; |
||||
|
||||
import com.google.common.collect.Lists; |
||||
import org.apache.tuweni.bytes.Bytes; |
||||
import org.apache.tuweni.bytes.Bytes32; |
||||
import org.apache.tuweni.units.bigints.UInt256; |
||||
|
||||
public class ChainDataPrunerStorage { |
||||
private static final Bytes PRUNING_MARK_KEY = |
||||
Bytes.wrap("pruningMark".getBytes(StandardCharsets.UTF_8)); |
||||
|
||||
private static final Bytes VARIABLES_PREFIX = Bytes.of(1); |
||||
private static final Bytes FORK_BLOCKS_PREFIX = Bytes.of(2); |
||||
|
||||
private final KeyValueStorage storage; |
||||
|
||||
public ChainDataPrunerStorage(final KeyValueStorage storage) { |
||||
this.storage = storage; |
||||
} |
||||
|
||||
public KeyValueStorageTransaction startTransaction() { |
||||
return this.storage.startTransaction(); |
||||
} |
||||
|
||||
public Optional<Long> getPruningMark() { |
||||
return get(VARIABLES_PREFIX, PRUNING_MARK_KEY).map(UInt256::fromBytes).map(UInt256::toLong); |
||||
} |
||||
|
||||
public Collection<Hash> getForkBlocks(final long blockNumber) { |
||||
return get(FORK_BLOCKS_PREFIX, UInt256.valueOf(blockNumber)) |
||||
.map(bytes -> RLP.input(bytes).readList(in -> bytesToHash(in.readBytes32()))) |
||||
.orElse(Lists.newArrayList()); |
||||
} |
||||
|
||||
public void setPruningMark(final KeyValueStorageTransaction transaction, final long pruningMark) { |
||||
set(transaction, VARIABLES_PREFIX, PRUNING_MARK_KEY, UInt256.valueOf(pruningMark)); |
||||
} |
||||
|
||||
public void setForkBlocks( |
||||
final KeyValueStorageTransaction transaction, |
||||
final long blockNumber, |
||||
final Collection<Hash> forkBlocks) { |
||||
set( |
||||
transaction, |
||||
FORK_BLOCKS_PREFIX, |
||||
UInt256.valueOf(blockNumber), |
||||
RLP.encode(o -> o.writeList(forkBlocks, (val, out) -> out.writeBytes(val)))); |
||||
} |
||||
|
||||
public void removeForkBlocks( |
||||
final KeyValueStorageTransaction transaction, final long blockNumber) { |
||||
remove(transaction, FORK_BLOCKS_PREFIX, UInt256.valueOf(blockNumber)); |
||||
} |
||||
|
||||
private Optional<Bytes> get(final Bytes prefix, final Bytes key) { |
||||
return storage.get(Bytes.concatenate(prefix, key).toArrayUnsafe()).map(Bytes::wrap); |
||||
} |
||||
|
||||
private void set( |
||||
final KeyValueStorageTransaction transaction, |
||||
final Bytes prefix, |
||||
final Bytes key, |
||||
final Bytes value) { |
||||
transaction.put(Bytes.concatenate(prefix, key).toArrayUnsafe(), value.toArrayUnsafe()); |
||||
} |
||||
|
||||
private void remove( |
||||
final KeyValueStorageTransaction transaction, final Bytes prefix, final Bytes key) { |
||||
transaction.remove(Bytes.concatenate(prefix, key).toArrayUnsafe()); |
||||
} |
||||
|
||||
private Hash bytesToHash(final Bytes bytes) { |
||||
return Hash.wrap(Bytes32.wrap(bytes, 0)); |
||||
} |
||||
} |
@ -0,0 +1,43 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
* |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.chain; |
||||
|
||||
public class ChainPrunerConfiguration { |
||||
public static final ChainPrunerConfiguration DEFAULT = |
||||
new ChainPrunerConfiguration(false, 7200, 256); |
||||
private final boolean enabled; |
||||
private final long blocksRetained; |
||||
private final long blocksFrequency; |
||||
|
||||
public ChainPrunerConfiguration( |
||||
final boolean enabled, final long blocksRetained, final long blocksFrequency) { |
||||
this.enabled = enabled; |
||||
this.blocksRetained = blocksRetained; |
||||
this.blocksFrequency = blocksFrequency; |
||||
} |
||||
|
||||
public long getChainPruningBlocksRetained() { |
||||
return blocksRetained; |
||||
} |
||||
|
||||
public boolean getChainPruningEnabled() { |
||||
return enabled; |
||||
} |
||||
|
||||
public long getChainPruningBlocksFrequency() { |
||||
return blocksFrequency; |
||||
} |
||||
} |
@ -0,0 +1,140 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
* |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.chain; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.hyperledger.besu.ethereum.chain.ChainDataPruner.MAX_PRUNING_THREAD_QUEUE_SIZE; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Block; |
||||
import org.hyperledger.besu.ethereum.core.BlockDataGenerator; |
||||
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; |
||||
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStoragePrefixedKeyBlockchainStorage; |
||||
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; |
||||
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; |
||||
|
||||
import java.util.List; |
||||
import java.util.concurrent.ArrayBlockingQueue; |
||||
import java.util.concurrent.ThreadPoolExecutor; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import org.awaitility.Awaitility; |
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
public class ChainDataPrunerTest { |
||||
|
||||
@Test |
||||
public void singleChainPruning() { |
||||
final BlockDataGenerator gen = new BlockDataGenerator(); |
||||
final BlockchainStorage blockchainStorage = |
||||
new KeyValueStoragePrefixedKeyBlockchainStorage( |
||||
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()); |
||||
final ChainDataPruner chainDataPruner = |
||||
new ChainDataPruner( |
||||
blockchainStorage, |
||||
new ChainDataPrunerStorage(new InMemoryKeyValueStorage()), |
||||
512, |
||||
0, |
||||
new ThreadPoolExecutor( |
||||
1, |
||||
1, |
||||
60L, |
||||
TimeUnit.SECONDS, |
||||
new ArrayBlockingQueue<>(MAX_PRUNING_THREAD_QUEUE_SIZE), |
||||
new ThreadPoolExecutor.DiscardPolicy())); |
||||
Block genesisBlock = gen.genesisBlock(); |
||||
final MutableBlockchain blockchain = |
||||
DefaultBlockchain.createMutable( |
||||
genesisBlock, blockchainStorage, new NoOpMetricsSystem(), 0); |
||||
blockchain.observeBlockAdded(chainDataPruner); |
||||
|
||||
// Generate & Import 1000 blocks
|
||||
gen.blockSequence(genesisBlock, 1000) |
||||
.forEach( |
||||
blk -> { |
||||
blockchain.appendBlock(blk, gen.receipts(blk)); |
||||
long number = blk.getHeader().getNumber(); |
||||
if (number <= 512) { |
||||
// No prune happened
|
||||
assertThat(blockchain.getBlockHeader(1)).isPresent(); |
||||
} else { |
||||
// Prune number - 512 only
|
||||
Awaitility.await() |
||||
.pollInterval(1, TimeUnit.MILLISECONDS) |
||||
.atMost(50, TimeUnit.MILLISECONDS) |
||||
.until(() -> blockchain.getBlockHeader(number - 512).isEmpty()); |
||||
assertThat(blockchain.getBlockHeader(number - 511)).isPresent(); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
public void forkPruning() { |
||||
final BlockDataGenerator gen = new BlockDataGenerator(); |
||||
final BlockchainStorage blockchainStorage = |
||||
new KeyValueStoragePrefixedKeyBlockchainStorage( |
||||
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()); |
||||
final ChainDataPruner chainDataPruner = |
||||
new ChainDataPruner( |
||||
blockchainStorage, |
||||
new ChainDataPrunerStorage(new InMemoryKeyValueStorage()), |
||||
512, |
||||
0, |
||||
new ThreadPoolExecutor( |
||||
1, |
||||
1, |
||||
60L, |
||||
TimeUnit.SECONDS, |
||||
new ArrayBlockingQueue<>(MAX_PRUNING_THREAD_QUEUE_SIZE), |
||||
new ThreadPoolExecutor.DiscardPolicy())); |
||||
Block genesisBlock = gen.genesisBlock(); |
||||
final MutableBlockchain blockchain = |
||||
DefaultBlockchain.createMutable( |
||||
genesisBlock, blockchainStorage, new NoOpMetricsSystem(), 0); |
||||
blockchain.observeBlockAdded(chainDataPruner); |
||||
|
||||
List<Block> canonicalChain = gen.blockSequence(genesisBlock, 1000); |
||||
List<Block> forkChain = gen.blockSequence(genesisBlock, 16); |
||||
for (Block blk : forkChain) { |
||||
blockchain.storeBlock(blk, gen.receipts(blk)); |
||||
} |
||||
for (int i = 0; i < 512; i++) { |
||||
Block blk = canonicalChain.get(i); |
||||
blockchain.appendBlock(blk, gen.receipts(blk)); |
||||
} |
||||
// No prune happened
|
||||
assertThat(blockchain.getBlockByHash(canonicalChain.get(0).getHash())).isPresent(); |
||||
assertThat(blockchain.getBlockByHash(forkChain.get(0).getHash())).isPresent(); |
||||
for (int i = 512; i < 527; i++) { |
||||
final int index = i; |
||||
Block blk = canonicalChain.get(i); |
||||
blockchain.appendBlock(blk, gen.receipts(blk)); |
||||
// Prune block on canonical chain and fork for i - 512 only
|
||||
Awaitility.await() |
||||
.pollInterval(1, TimeUnit.MILLISECONDS) |
||||
.atMost(50, TimeUnit.MILLISECONDS) |
||||
.until( |
||||
() -> blockchain.getBlockByHash(canonicalChain.get(index - 512).getHash()).isEmpty()); |
||||
assertThat(blockchain.getBlockByHash(canonicalChain.get(i - 511).getHash())).isPresent(); |
||||
Awaitility.await() |
||||
.pollInterval(1, TimeUnit.MILLISECONDS) |
||||
.atMost(50, TimeUnit.MILLISECONDS) |
||||
.until( |
||||
() -> blockchain.getBlockByHash(canonicalChain.get(index - 512).getHash()).isEmpty()); |
||||
|
||||
assertThat(blockchain.getBlockByHash(forkChain.get(i - 511).getHash())).isPresent(); |
||||
} |
||||
} |
||||
} |
Loading…
Reference in new issue