Fix snapsync heal (#5838)

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
pull/5893/head
matkt 1 year ago committed by GitHub
parent c4f73aa643
commit 3e724a01f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java
  2. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java
  3. 127
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java
  4. 10
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java
  5. 18
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/context/SnapSyncStatePersistenceManager.java
  6. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java
  7. 46
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/AccountFlatDatabaseHealingRangeRequest.java
  8. 36
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/AccountTrieNodeHealingRequest.java
  9. 19
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/StorageFlatDatabaseHealingRangeRequest.java
  10. 30
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/StorageTrieNodeHealingRequest.java
  11. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/TrieNodeHealingRequest.java
  12. 218
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/AccountHealingTrackingTest.java
  13. 35
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java
  14. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/AccountFlatDatabaseHealingRangeRequestTest.java
  15. 3
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/StorageTrieNodeHealingRequestTest.java

@ -88,7 +88,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
.getAccountToRepair()
.ifPresent(
address ->
snapContext.addAccountsToBeRepaired(
snapContext.addAccountToHealingList(
CompactEncoding.bytesToPath(address.addressHash())));
} else if (fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()

@ -83,7 +83,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
.getAccountToRepair()
.ifPresent(
address ->
snapContext.addAccountsToBeRepaired(
snapContext.addAccountToHealingList(
CompactEncoding.bytesToPath(address.addressHash())));
} else if (fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()

@ -28,7 +28,6 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.StorageRangeDataR
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.AccountFlatDatabaseHealingRangeRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.StorageFlatDatabaseHealingRangeRequest;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.FlatDbMode;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
@ -43,6 +42,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
@ -72,7 +72,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
protected final InMemoryTasksPriorityQueues<SnapDataRequest>
pendingStorageFlatDatabaseHealingRequests = new InMemoryTasksPriorityQueues<>();
private HashSet<Bytes> accountsToBeRepaired = new HashSet<>();
private HashSet<Bytes> accountsHealingList = new HashSet<>();
private DynamicPivotBlockSelector pivotBlockSelector;
private final SnapSyncStatePersistenceManager snapContext;
@ -156,6 +156,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
@Override
public synchronized boolean checkCompletion(final BlockHeader header) {
// Check if all snapsync tasks are completed
if (!internalFuture.isDone()
&& pendingAccountRequests.allTasksCompleted()
&& pendingCodeRequests.allTasksCompleted()
@ -164,29 +165,50 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
&& pendingTrieNodeRequests.allTasksCompleted()
&& pendingAccountFlatDatabaseHealingRequests.allTasksCompleted()
&& pendingStorageFlatDatabaseHealingRequests.allTasksCompleted()) {
// if all snapsync tasks are completed and the healing process was not running
if (!snapSyncState.isHealTrieInProgress()) {
// Register blockchain observer if not already registered
blockObserverId =
blockObserverId.isEmpty()
? OptionalLong.of(blockchain.observeBlockAdded(createBlockchainObserver()))
: blockObserverId;
// Start the healing process
startTrieHeal();
} else if (pivotBlockSelector.isBlockchainBehind()) {
}
// if all snapsync tasks are completed and the healing was running and blockchain is behind
// the pivot block
else if (pivotBlockSelector.isBlockchainBehind()) {
LOG.info("Pausing world state download while waiting for sync to complete");
if (blockObserverId.isEmpty())
blockObserverId = OptionalLong.of(blockchain.observeBlockAdded(getBlockAddedListener()));
// Set the snapsync to wait for the blockchain to catch up
snapSyncState.setWaitingBlockchain(true);
} else if (!snapSyncState.isHealFlatDatabaseInProgress()
&& worldStateStorage.getFlatDbMode().equals(FlatDbMode.FULL)) {
// only doing a flat db heal for bonsai
startFlatDatabaseHeal(header);
} else {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
updater.commit();
metricsManager.notifySnapSyncCompleted();
snapContext.clear();
internalFuture.complete(null);
return true;
}
// if all snapsync tasks are completed and the healing was running and the blockchain is not
// behind the pivot block
else {
// Remove the blockchain observer
blockObserverId.ifPresent(blockchain::removeObserver);
// If the flat database healing process is not in progress and the flat database mode is
// FULL
if (!snapSyncState.isHealFlatDatabaseInProgress()
&& worldStateStorage.getFlatDbMode().equals(FlatDbMode.FULL)) {
// Start the flat database healing process
startFlatDatabaseHeal(header);
}
// If the flat database healing process is in progress or the flat database mode is not FULL
else {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
updater.commit();
// Notify that the snap sync has completed
metricsManager.notifySnapSyncCompleted();
// Clear the snap context
snapContext.clear();
internalFuture.complete(null);
return true;
}
}
}
return false;
}
@ -200,10 +222,11 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
pendingTrieNodeRequests.clear();
}
/** Method to start the healing process of the trie */
public synchronized void startTrieHeal() {
snapContext.clearAccountRangeTasks();
snapSyncState.setHealTrieStatus(true);
// try to find new pivot block before healing
// Try to find a new pivot block before starting the healing process
pivotBlockSelector.switchToNewPivotBlock(
(blockHeader, newPivotBlockFound) -> {
snapContext.clearAccountRangeTasks();
@ -212,21 +235,25 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
blockHeader.getNumber());
enqueueRequest(
createAccountTrieNodeDataRequest(
blockHeader.getStateRoot(), Bytes.EMPTY, accountsToBeRepaired));
blockHeader.getStateRoot(), Bytes.EMPTY, accountsHealingList));
});
}
/** Method to reload the healing process of the trie */
public synchronized void reloadTrieHeal() {
// Clear the flat database and trie log from the world state storage if needed
worldStateStorage.clearFlatDatabase();
worldStateStorage.clearTrieLog();
// Clear pending trie node and code requests
pendingTrieNodeRequests.clear();
pendingCodeRequests.clear();
snapSyncState.setHealTrieStatus(false);
checkCompletion(snapSyncState.getPivotBlockHeader().orElseThrow());
}
public synchronized void startFlatDatabaseHeal(final BlockHeader header) {
LOG.info("Running flat database heal process");
LOG.info("Initiating the healing process for the flat database");
snapSyncState.setHealFlatDatabaseInProgress(true);
final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16);
ranges.forEach(
@ -235,10 +262,6 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
createAccountFlatHealingRangeRequest(header.getStateRoot(), key, value)));
}
public boolean isBonsaiStorageFormat() {
return worldStateStorage.getDataStorageFormat().equals(DataStorageFormat.BONSAI);
}
@Override
public synchronized void enqueueRequest(final SnapDataRequest request) {
if (!internalFuture.isDone()) {
@ -263,8 +286,8 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
}
}
public synchronized void setAccountsToBeRepaired(final HashSet<Bytes> accountsToBeRepaired) {
this.accountsToBeRepaired = accountsToBeRepaired;
public synchronized void setAccountsHealingList(final HashSet<Bytes> addAccountToHealingList) {
this.accountsHealingList = addAccountToHealingList;
}
/**
@ -274,15 +297,15 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
*
* @param account The account to be added for repair.
*/
public synchronized void addAccountsToBeRepaired(final Bytes account) {
if (!accountsToBeRepaired.contains(account)) {
snapContext.addAccountsToBeRepaired(account);
accountsToBeRepaired.add(account);
public synchronized void addAccountToHealingList(final Bytes account) {
if (!accountsHealingList.contains(account)) {
snapContext.addAccountToHealingList(account);
accountsHealingList.add(account);
}
}
public HashSet<Bytes> getAccountsToBeRepaired() {
return accountsToBeRepaired;
public HashSet<Bytes> getAccountsHealingList() {
return accountsHealingList;
}
@Override
@ -385,25 +408,25 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
this.pivotBlockSelector = pivotBlockSelector;
}
public BlockAddedObserver getBlockAddedListener() {
public BlockAddedObserver createBlockchainObserver() {
return addedBlockContext -> {
if (snapSyncState.isWaitingBlockchain()) {
// if we receive a new pivot block we can restart the heal
pivotBlockSelector.check(
(____, isNewPivotBlock) -> {
if (isNewPivotBlock) {
snapSyncState.setWaitingBlockchain(false);
}
});
// if we are close to the head we can also restart the heal and finish snapsync
if (!pivotBlockSelector.isBlockchainBehind()) {
snapSyncState.setWaitingBlockchain(false);
}
if (!snapSyncState.isWaitingBlockchain()) {
blockObserverId.ifPresent(blockchain::removeObserver);
blockObserverId = OptionalLong.empty();
reloadTrieHeal();
}
final AtomicBoolean foundNewPivotBlock = new AtomicBoolean(false);
pivotBlockSelector.check(
(____, isNewPivotBlock) -> {
if (isNewPivotBlock) {
foundNewPivotBlock.set(true);
}
});
final boolean isNewPivotBlockFound = foundNewPivotBlock.get();
final boolean isBlockchainCaughtUp =
snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind();
if (isNewPivotBlockFound
|| isBlockchainCaughtUp) { // restart heal if we found a new pivot block or if close to
// head again
snapSyncState.setWaitingBlockchain(false);
reloadTrieHeal();
}
};
}

@ -153,10 +153,10 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
final List<AccountRangeDataRequest> currentAccountRange =
snapContext.getCurrentAccountRange();
final HashSet<Bytes> inconsistentAccounts = snapContext.getAccountsToBeRepaired();
final HashSet<Bytes> inconsistentAccounts = snapContext.getAccountsHealingList();
if (!currentAccountRange.isEmpty()) { // continue to download worldstate ranges
newDownloadState.setAccountsToBeRepaired(inconsistentAccounts);
newDownloadState.setAccountsHealingList(inconsistentAccounts);
snapContext
.getCurrentAccountRange()
.forEach(
@ -165,14 +165,14 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
DOWNLOAD, snapDataRequest.getStartKeyHash(), snapDataRequest.getEndKeyHash());
newDownloadState.enqueueRequest(snapDataRequest);
});
} else if (!snapContext.getAccountsToBeRepaired().isEmpty()) { // restart only the heal step
} else if (!snapContext.getAccountsHealingList().isEmpty()) { // restart only the heal step
snapSyncState.setHealTrieStatus(true);
worldStateStorage.clearFlatDatabase();
worldStateStorage.clearTrieLog();
newDownloadState.setAccountsToBeRepaired(inconsistentAccounts);
newDownloadState.setAccountsHealingList(inconsistentAccounts);
newDownloadState.enqueueRequest(
SnapDataRequest.createAccountTrieNodeDataRequest(
stateRoot, Bytes.EMPTY, snapContext.getAccountsToBeRepaired()));
stateRoot, Bytes.EMPTY, snapContext.getAccountsHealingList()));
} else {
// start from scratch
worldStateStorage.clear();

@ -41,7 +41,7 @@ import org.apache.tuweni.bytes.Bytes;
*/
public class SnapSyncStatePersistenceManager {
private final byte[] SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX =
private final byte[] SNAP_ACCOUNT_HEALING_LIST_INDEX =
"snapInconsistentAccountsStorageIndex".getBytes(StandardCharsets.UTF_8);
private final GenericKeyValueStorageFacade<BigInteger, AccountRangeDataRequest>
@ -104,20 +104,20 @@ public class SnapSyncStatePersistenceManager {
}
/**
* Persists the current accounts to be repaired in the database.
* Persists the current accounts to heal in the database.
*
* @param accountsToBeRepaired The current list of accounts to persist.
* @param accountsHealingList The current list of accounts to heal.
*/
public void addAccountsToBeRepaired(final Bytes accountsToBeRepaired) {
public void addAccountToHealingList(final Bytes accountsHealingList) {
final BigInteger index =
healContext
.get(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX)
.get(SNAP_ACCOUNT_HEALING_LIST_INDEX)
.map(bytes -> new BigInteger(bytes.toArrayUnsafe()).add(BigInteger.ONE))
.orElse(BigInteger.ZERO);
healContext.putAll(
keyValueStorageTransaction -> {
keyValueStorageTransaction.put(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX, index.toByteArray());
keyValueStorageTransaction.put(index.toByteArray(), accountsToBeRepaired.toArrayUnsafe());
keyValueStorageTransaction.put(SNAP_ACCOUNT_HEALING_LIST_INDEX, index.toByteArray());
keyValueStorageTransaction.put(index.toByteArray(), accountsHealingList.toArrayUnsafe());
});
}
@ -127,9 +127,9 @@ public class SnapSyncStatePersistenceManager {
.collect(Collectors.toList());
}
public HashSet<Bytes> getAccountsToBeRepaired() {
public HashSet<Bytes> getAccountsHealingList() {
return healContext
.streamValuesFromKeysThat(notEqualsTo(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX))
.streamValuesFromKeysThat(notEqualsTo(SNAP_ACCOUNT_HEALING_LIST_INDEX))
.collect(Collectors.toCollection(HashSet::new));
}

@ -124,6 +124,11 @@ public class StorageRangeDataRequest extends SnapDataRequest {
if (!slots.isEmpty() || !proofs.isEmpty()) {
if (!worldStateProofProvider.isValidRangeProof(
startKeyHash, endKeyHash, storageRoot, proofs, slots)) {
// If the proof is invalid, it means that the storage will be a mix of several blocks.
// Therefore, it will be necessary to heal the account's storage subsequently
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
// We will request the new storage root of the account because it is apparently no longer
// valid with the new pivot block.
downloadState.enqueueRequest(
createAccountDataRequest(
getRootHash(), Hash.wrap(accountHash), startKeyHash, endKeyHash));
@ -173,7 +178,7 @@ public class StorageRangeDataRequest extends SnapDataRequest {
});
if (startKeyHash.equals(MIN_RANGE) && endKeyHash.equals(MAX_RANGE)) {
// need to heal this account storage
downloadState.addAccountsToBeRepaired(CompactEncoding.bytesToPath(accountHash));
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
}
});

@ -59,7 +59,7 @@ public class AccountFlatDatabaseHealingRangeRequest extends SnapDataRequest {
private final Bytes32 endKeyHash;
private TreeMap<Bytes32, Bytes> existingAccounts;
private TreeMap<Bytes32, Bytes> removedAccounts;
private TreeMap<Bytes32, Bytes> flatDbAccounts;
private boolean isProofValid;
public AccountFlatDatabaseHealingRangeRequest(
@ -68,7 +68,7 @@ public class AccountFlatDatabaseHealingRangeRequest extends SnapDataRequest {
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
this.existingAccounts = new TreeMap<>();
this.removedAccounts = new TreeMap<>();
this.flatDbAccounts = new TreeMap<>();
this.isProofValid = false;
}
@ -95,12 +95,12 @@ public class AccountFlatDatabaseHealingRangeRequest extends SnapDataRequest {
downloadState.getMetricsManager().notifyRangeProgress(HEAL_FLAT, endKeyHash, endKeyHash);
}
Stream.of(existingAccounts.entrySet(), removedAccounts.entrySet())
Stream.of(existingAccounts.entrySet(), flatDbAccounts.entrySet())
.flatMap(Collection::stream)
.forEach(
account -> {
if (downloadState
.getAccountsToBeRepaired()
.getAccountsHealingList()
.contains(CompactEncoding.bytesToPath(account.getKey()))) {
final StateTrieAccountValue accountValue =
StateTrieAccountValue.readFrom(RLP.input(account.getValue()));
@ -174,7 +174,7 @@ public class AccountFlatDatabaseHealingRangeRequest extends SnapDataRequest {
// put all flat accounts in the list, and gradually keep only those that are not in the trie
// to remove and heal them.
removedAccounts = new TreeMap<>(existingAccounts);
flatDbAccounts = new TreeMap<>(existingAccounts);
final TrieIterator<Bytes> visitor = RangeStorageEntriesCollector.createVisitor(collector);
existingAccounts =
@ -184,27 +184,33 @@ public class AccountFlatDatabaseHealingRangeRequest extends SnapDataRequest {
RangeStorageEntriesCollector.collectEntries(
collector, visitor, root, startKeyHash));
// doing the fix
// Process each existing account
existingAccounts.forEach(
(key, value) -> {
if (removedAccounts.containsKey(key)) {
removedAccounts.remove(key);
} else {
final Hash accountHash = Hash.wrap(key);
// if the account was missing in the flat db we need to heal the storage
downloadState.addAccountsToBeRepaired(CompactEncoding.bytesToPath(accountHash));
// Remove the key from the flat db list and get its associated value
Bytes flatDbEntry = flatDbAccounts.remove(key);
// If the key was in flat db and its associated value is different from the
// current value
if (!value.equals(flatDbEntry)) {
Hash accountHash = Hash.wrap(key);
// Add the account to the list of accounts to be repaired
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
// Update the account info state
bonsaiUpdater.putAccountInfoState(accountHash, value);
}
});
removedAccounts.forEach(
(key, value) -> {
final Hash accountHash = Hash.wrap(key);
// if the account was removed we will have to heal the storage
downloadState.addAccountsToBeRepaired(CompactEncoding.bytesToPath(accountHash));
bonsaiUpdater.removeAccountInfoState(accountHash);
});
// For each remaining account in flat db list, remove the account info state and add it to
// the list of accounts to be repaired
flatDbAccounts
.keySet()
.forEach(
key -> {
Hash accountHash = Hash.wrap(key);
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
bonsaiUpdater.removeAccountInfoState(accountHash);
});
}
return existingAccounts.size() + removedAccounts.size();
return existingAccounts.size() + flatDbAccounts.size();
}
}

@ -112,6 +112,15 @@ public class AccountTrieNodeHealingRequest extends TrieNodeHealingRequest {
account.size() - getLocation().size()))
.map(RLP::input)
.map(StateTrieAccountValue::readFrom)
.filter(
stateTrieAccountValue ->
// We need to ensure that the accounts to be healed do not have empty storage.
// Therefore, it is unnecessary to create trie heal requests for storage in this
// case.
// If we were to do so, we would be attempting to request storage that does not
// exist from our peers,
// which would cause sync issues.
!stateTrieAccountValue.getStorageRoot().equals(MerkleTrie.EMPTY_TRIE_NODE_HASH))
.ifPresent(
stateTrieAccountValue -> {
// an account need a heal step
@ -129,6 +138,7 @@ public class AccountTrieNodeHealingRequest extends TrieNodeHealingRequest {
@Override
protected Stream<SnapDataRequest> getRequestsFromTrieNodeValue(
final WorldStateStorage worldStateStorage,
final SnapWorldDownloadState downloadState,
final Bytes location,
final Bytes path,
final Bytes value) {
@ -151,13 +161,25 @@ public class AccountTrieNodeHealingRequest extends TrieNodeHealingRequest {
if (!accountValue.getCodeHash().equals(Hash.EMPTY)) {
builder.add(createBytecodeRequest(accountHash, getRootHash(), accountValue.getCodeHash()));
}
// Add storage, if appropriate
if (!accountValue.getStorageRoot().equals(MerkleTrie.EMPTY_TRIE_NODE_HASH)) {
// If we detect an account storage we fill it with snapsync before completing with a heal
final SnapDataRequest storageTrieRequest =
createStorageTrieNodeDataRequest(
accountValue.getStorageRoot(), accountHash, getRootHash(), Bytes.EMPTY);
builder.add(storageTrieRequest);
// Retrieve the storage root from the database, if available
final Hash storageRootFoundInDb =
worldStateStorage
.getTrieNodeUnsafe(Bytes.concatenate(accountHash, Bytes.EMPTY))
.map(Hash::hash)
.orElse(Hash.wrap(MerkleTrie.EMPTY_TRIE_NODE_HASH));
if (!storageRootFoundInDb.equals(accountValue.getStorageRoot())) {
// If the storage root is not found in the database, add the account to the list of accounts
// to be repaired
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
// If the account's storage root is not empty,
// fill it with trie heal before completing with a flat heal
if (!accountValue.getStorageRoot().equals(MerkleTrie.EMPTY_TRIE_NODE_HASH)) {
SnapDataRequest storageTrieRequest =
createStorageTrieNodeDataRequest(
accountValue.getStorageRoot(), accountHash, getRootHash(), Bytes.EMPTY);
builder.add(storageTrieRequest);
}
}
return builder.build();
}

@ -152,7 +152,7 @@ public class StorageFlatDatabaseHealingRangeRequest extends SnapDataRequest {
Function.identity(),
Function.identity());
Map<Bytes32, Bytes> remainingKeys = new TreeMap<>(slots);
Map<Bytes32, Bytes> flatDbSlots = new TreeMap<>(slots);
// Retrieve the data from the trie in order to know what needs to be fixed in the flat
// database
@ -172,18 +172,23 @@ public class StorageFlatDatabaseHealingRangeRequest extends SnapDataRequest {
RangeStorageEntriesCollector.collectEntries(
collector, visitor, root, startKeyHash));
// Perform the fix by updating the flat database
// Process each slot
slots.forEach(
(key, value) -> {
if (remainingKeys.containsKey(key)) {
remainingKeys.remove(key);
} else {
// Remove the key from the flat db and get its associated value
final Bytes flatDbEntry = flatDbSlots.remove(key);
// If the key was not in flat db and its associated value is different from the
// current value
if (!value.equals(flatDbEntry)) {
// Update the storage value
bonsaiUpdater.putStorageValueBySlotHash(
accountHash, Hash.wrap(key), Bytes32.leftPad(RLP.decodeValue(value)));
}
});
remainingKeys.forEach(
(key, value) -> bonsaiUpdater.removeStorageValueBySlotHash(accountHash, Hash.wrap(key)));
// For each remaining key, remove the storage value
flatDbSlots
.keySet()
.forEach(key -> bonsaiUpdater.removeStorageValueBySlotHash(accountHash, Hash.wrap(key)));
}
return slots.size();
}

@ -21,8 +21,6 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncProcessState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.trie.MerkleTrie;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.FlatDbMode;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater;
@ -60,31 +58,8 @@ public class StorageTrieNodeHealingRequest extends TrieNodeHealingRequest {
@Override
public Optional<Bytes> getExistingData(
final SnapWorldDownloadState downloadState, final WorldStateStorage worldStateStorage) {
final Optional<Bytes> storageTrieNode;
if (worldStateStorage.getDataStorageFormat().equals(DataStorageFormat.FOREST)) {
storageTrieNode = worldStateStorage.getTrieNodeUnsafe(getNodeHash());
} else {
storageTrieNode =
worldStateStorage.getTrieNodeUnsafe(Bytes.concatenate(getAccountHash(), getLocation()));
}
if (storageTrieNode.isPresent()) {
return storageTrieNode
.filter(node -> Hash.hash(node).equals(getNodeHash()))
.or(
() -> { // if we have a storage in database but not the good one we will need to fix
// the account later
downloadState.addAccountsToBeRepaired(
CompactEncoding.bytesToPath(getAccountHash()));
return Optional.empty();
});
} else {
if (getNodeHash().equals(MerkleTrie.EMPTY_TRIE_NODE_HASH)) {
return Optional.of(MerkleTrie.EMPTY_TRIE_NODE);
}
return Optional.empty();
}
return worldStateStorage.getAccountStorageTrieNode(
getAccountHash(), getLocation(), getNodeHash());
}
@Override
@ -96,6 +71,7 @@ public class StorageTrieNodeHealingRequest extends TrieNodeHealingRequest {
@Override
protected Stream<SnapDataRequest> getRequestsFromTrieNodeValue(
final WorldStateStorage worldStateStorage,
final SnapWorldDownloadState downloadState,
final Bytes location,
final Bytes path,
final Bytes value) {

@ -104,6 +104,7 @@ public abstract class TrieNodeHealingRequest extends SnapDataRequest
value ->
getRequestsFromTrieNodeValue(
worldStateStorage,
downloadState,
node.getLocation().orElse(Bytes.EMPTY),
node.getPath(),
value))
@ -179,6 +180,7 @@ public abstract class TrieNodeHealingRequest extends SnapDataRequest
protected abstract Stream<SnapDataRequest> getRequestsFromTrieNodeValue(
final WorldStateStorage worldStateStorage,
final SnapWorldDownloadState downloadState,
final Bytes location,
final Bytes path,
final Bytes value);

@ -0,0 +1,218 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RangeManager.MAX_RANGE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.core.TrieGenerator;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.StorageRangeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.StorageTrieNodeHealingRequest;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.trie.MerkleTrie;
import org.hyperledger.besu.ethereum.trie.RangeStorageEntriesCollector;
import org.hyperledger.besu.ethereum.trie.TrieIterator;
import org.hyperledger.besu.ethereum.trie.patricia.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.patricia.StoredNodeFactory;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.List;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import kotlin.collections.ArrayDeque;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class AccountHealingTrackingTest {
private final List<Address> accounts = List.of(Address.fromHexString("0xdeadbeef"));
private final WorldStateStorage worldStateStorage =
new BonsaiWorldStateKeyValueStorage(
new InMemoryKeyValueStorageProvider(), new NoOpMetricsSystem());
private WorldStateProofProvider worldStateProofProvider;
private MerkleTrie<Bytes, Bytes> accountStateTrie;
@Mock SnapWorldDownloadState snapWorldDownloadState;
@BeforeEach
public void setup() {
accountStateTrie =
TrieGenerator.generateTrie(
worldStateStorage,
accounts.stream().map(Address::addressHash).collect(Collectors.toList()));
worldStateProofProvider = new WorldStateProofProvider(worldStateStorage);
}
@Test
void avoidMarkingAccountWhenStorageProofValid() {
// generate valid proof
final Hash accountHash = Hash.hash(accounts.get(0));
final StateTrieAccountValue stateTrieAccountValue =
StateTrieAccountValue.readFrom(RLP.input(accountStateTrie.get(accountHash).orElseThrow()));
final StoredMerklePatriciaTrie<Bytes, Bytes> storageTrie =
new StoredMerklePatriciaTrie<>(
new StoredNodeFactory<>(
(location, hash) ->
worldStateStorage.getAccountStorageTrieNode(accountHash, location, hash),
Function.identity(),
Function.identity()),
stateTrieAccountValue.getStorageRoot());
final RangeStorageEntriesCollector collector =
RangeStorageEntriesCollector.createCollector(Hash.ZERO, MAX_RANGE, 10, Integer.MAX_VALUE);
final TrieIterator<Bytes> visitor = RangeStorageEntriesCollector.createVisitor(collector);
final TreeMap<Bytes32, Bytes> slots =
(TreeMap<Bytes32, Bytes>)
storageTrie.entriesFrom(
root ->
RangeStorageEntriesCollector.collectEntries(
collector, visitor, root, Hash.ZERO));
// generate the proof
final List<Bytes> proofs =
worldStateProofProvider.getStorageProofRelatedNodes(
Hash.wrap(storageTrie.getRootHash()), accountHash, Hash.ZERO);
proofs.addAll(
worldStateProofProvider.getStorageProofRelatedNodes(
Hash.wrap(storageTrie.getRootHash()), accountHash, slots.lastKey()));
final StorageRangeDataRequest storageRangeDataRequest =
SnapDataRequest.createStorageRangeDataRequest(
Hash.wrap(accountStateTrie.getRootHash()),
accountHash,
storageTrie.getRootHash(),
Hash.ZERO,
MAX_RANGE);
storageRangeDataRequest.addResponse(
snapWorldDownloadState, worldStateProofProvider, slots, new ArrayDeque<>(proofs));
storageRangeDataRequest.getChildRequests(snapWorldDownloadState, worldStateStorage, null);
verify(snapWorldDownloadState, never()).addAccountToHealingList(any(Bytes.class));
}
@Test
void markAccountOnInvalidStorageProof() {
final Hash accountHash = Hash.hash(accounts.get(0));
final StateTrieAccountValue stateTrieAccountValue =
StateTrieAccountValue.readFrom(RLP.input(accountStateTrie.get(accountHash).orElseThrow()));
final List<Bytes> proofs =
List.of(
worldStateStorage
.getAccountStorageTrieNode(
accountHash, Bytes.EMPTY, stateTrieAccountValue.getStorageRoot())
.get());
final StorageRangeDataRequest storageRangeDataRequest =
SnapDataRequest.createStorageRangeDataRequest(
Hash.wrap(accountStateTrie.getRootHash()),
accountHash,
stateTrieAccountValue.getStorageRoot(),
Hash.ZERO,
MAX_RANGE);
storageRangeDataRequest.addResponse(
snapWorldDownloadState, worldStateProofProvider, new TreeMap<>(), new ArrayDeque<>(proofs));
verify(snapWorldDownloadState).addAccountToHealingList(any(Bytes.class));
}
@Test
void markAccountOnPartialStorageRange() {
// generate valid proof
final Hash accountHash = Hash.hash(accounts.get(0));
final StateTrieAccountValue stateTrieAccountValue =
StateTrieAccountValue.readFrom(RLP.input(accountStateTrie.get(accountHash).orElseThrow()));
final StoredMerklePatriciaTrie<Bytes, Bytes> storageTrie =
new StoredMerklePatriciaTrie<>(
new StoredNodeFactory<>(
(location, hash) ->
worldStateStorage.getAccountStorageTrieNode(accountHash, location, hash),
Function.identity(),
Function.identity()),
stateTrieAccountValue.getStorageRoot());
final RangeStorageEntriesCollector collector =
RangeStorageEntriesCollector.createCollector(
Hash.ZERO,
MAX_RANGE,
1,
Integer.MAX_VALUE); // limit to 1 in order to have a partial range
final TrieIterator<Bytes> visitor = RangeStorageEntriesCollector.createVisitor(collector);
final TreeMap<Bytes32, Bytes> slots =
(TreeMap<Bytes32, Bytes>)
storageTrie.entriesFrom(
root ->
RangeStorageEntriesCollector.collectEntries(
collector, visitor, root, Hash.ZERO));
// generate the proof
final List<Bytes> proofs =
worldStateProofProvider.getStorageProofRelatedNodes(
Hash.wrap(storageTrie.getRootHash()), accountHash, Hash.ZERO);
proofs.addAll(
worldStateProofProvider.getStorageProofRelatedNodes(
Hash.wrap(storageTrie.getRootHash()), accountHash, slots.lastKey()));
final StorageRangeDataRequest storageRangeDataRequest =
SnapDataRequest.createStorageRangeDataRequest(
Hash.wrap(accountStateTrie.getRootHash()),
accountHash,
storageTrie.getRootHash(),
Hash.ZERO,
MAX_RANGE);
storageRangeDataRequest.addResponse(
snapWorldDownloadState, worldStateProofProvider, slots, new ArrayDeque<>(proofs));
verify(snapWorldDownloadState, never()).addAccountToHealingList(any(Bytes.class));
// should mark during the getchild request
storageRangeDataRequest.getChildRequests(snapWorldDownloadState, worldStateStorage, null);
verify(snapWorldDownloadState).addAccountToHealingList(any(Bytes.class));
}
@Test
void avoidMarkingAccountOnValidStorageTrieNodeDetection() {
final Hash accountHash = Hash.hash(accounts.get(0));
final StateTrieAccountValue stateTrieAccountValue =
StateTrieAccountValue.readFrom(RLP.input(accountStateTrie.get(accountHash).orElseThrow()));
final StorageTrieNodeHealingRequest storageTrieNodeHealingRequest =
SnapDataRequest.createStorageTrieNodeDataRequest(
stateTrieAccountValue.getStorageRoot(),
accountHash,
Hash.wrap(accountStateTrie.getRootHash()),
Bytes.EMPTY);
storageTrieNodeHealingRequest.getExistingData(snapWorldDownloadState, worldStateStorage);
verify(snapWorldDownloadState, never()).addAccountToHealingList(any(Bytes.class));
}
}

@ -325,13 +325,12 @@ public class SnapWorldDownloadStateTest {
@ParameterizedTest
@ArgumentsSource(SnapWorldDownloadStateTestArguments.class)
public void shouldWaitingBlockchainWhenTooBehind(
public void shouldListeningBlockchainDuringHeal(
final DataStorageFormat storageFormat, final boolean isFlatDbHealingEnabled) {
setUp(storageFormat);
when(snapSyncState.isHealTrieInProgress()).thenReturn(true);
when(snapSyncState.isHealTrieInProgress()).thenReturn(false);
downloadState.setPivotBlockSelector(dynamicPivotBlockManager);
when(dynamicPivotBlockManager.isBlockchainBehind()).thenReturn(true);
downloadState.checkCompletion(header);
@ -339,7 +338,7 @@ public class SnapWorldDownloadStateTest {
// should register only one time
verify(blockchain, times(1)).observeBlockAdded(any());
verify(snapSyncState, atLeastOnce()).setWaitingBlockchain(true);
verify(snapSyncState, atLeastOnce()).setHealTrieStatus(true);
assertThat(future).isNotDone();
assertThat(worldStateStorage.getAccountStateTrieNode(Bytes.EMPTY, ROOT_NODE_HASH)).isEmpty();
@ -374,7 +373,14 @@ public class SnapWorldDownloadStateTest {
.when(dynamicPivotBlockManager)
.check(any());
final BlockAddedObserver blockAddedListener = downloadState.getBlockAddedListener();
final Block newBlock =
new Block(
new BlockHeaderTestFixture().number(500).buildHeader(),
new BlockBody(emptyList(), emptyList()));
when(snapSyncState.getPivotBlockHeader()).thenReturn(Optional.of(newBlock.getHeader()));
final BlockAddedObserver blockAddedListener = downloadState.createBlockchainObserver();
blockAddedListener.onBlockAdded(
BlockAddedEvent.createForHeadAdvancement(
new Block(
@ -383,7 +389,9 @@ public class SnapWorldDownloadStateTest {
Collections.emptyList(),
Collections.emptyList()));
// reload heal
verify(snapSyncState).setWaitingBlockchain(false);
verify(snapSyncState).setHealTrieStatus(false);
}
@ParameterizedTest
@ -395,6 +403,7 @@ public class SnapWorldDownloadStateTest {
when(snapSyncState.isHealTrieInProgress()).thenReturn(true);
downloadState.setPivotBlockSelector(dynamicPivotBlockManager);
when(dynamicPivotBlockManager.isBlockchainBehind()).thenReturn(true);
downloadState.checkCompletion(header);
@ -402,17 +411,21 @@ public class SnapWorldDownloadStateTest {
verify(snapSyncState).setWaitingBlockchain(true);
when(snapSyncState.isWaitingBlockchain()).thenReturn(true);
final Block newBlock =
new Block(
new BlockHeaderTestFixture().number(500).buildHeader(),
new BlockBody(emptyList(), emptyList()));
when(dynamicPivotBlockManager.isBlockchainBehind()).thenReturn(false);
final BlockAddedObserver blockAddedListener = downloadState.getBlockAddedListener();
when(snapSyncState.getPivotBlockHeader()).thenReturn(Optional.of(newBlock.getHeader()));
final BlockAddedObserver blockAddedListener = downloadState.createBlockchainObserver();
blockAddedListener.onBlockAdded(
BlockAddedEvent.createForHeadAdvancement(
new Block(
new BlockHeaderTestFixture().number(500).buildHeader(),
new BlockBody(emptyList(), emptyList())),
Collections.emptyList(),
Collections.emptyList()));
newBlock, Collections.emptyList(), Collections.emptyList()));
verify(snapSyncState).setWaitingBlockchain(false);
verify(snapSyncState).setHealTrieStatus(false);
}
@ParameterizedTest

@ -67,7 +67,7 @@ public class AccountFlatDatabaseHealingRangeRequestTest {
public void setup() {
Mockito.when(downloadState.getMetricsManager())
.thenReturn(Mockito.mock(SnapsyncMetricsManager.class));
Mockito.when(downloadState.getAccountsToBeRepaired()).thenReturn(new HashSet<>());
Mockito.when(downloadState.getAccountsHealingList()).thenReturn(new HashSet<>());
}
@Test
@ -120,7 +120,7 @@ public class AccountFlatDatabaseHealingRangeRequestTest {
Assertions.assertThat(snapDataRequest.getStartKeyHash()).isGreaterThan(accounts.lastKey());
// Verify that we have storage healing request when the account need to be repaired
Mockito.when(downloadState.getAccountsToBeRepaired())
Mockito.when(downloadState.getAccountsHealingList())
.thenReturn(
new HashSet<>(
accounts.keySet().stream()

@ -57,7 +57,6 @@ class StorageTrieNodeHealingRequestTest {
Address.fromHexString("0xdeadbeeb"));
private WorldStateStorage worldStateStorage;
private Hash account0Hash;
private Hash account0StorageRoot;
@ -81,6 +80,7 @@ class StorageTrieNodeHealingRequestTest {
TrieGenerator.generateTrie(
worldStateStorage,
accounts.stream().map(Address::addressHash).collect(Collectors.toList()));
account0Hash = accounts.get(0).addressHash();
account0StorageRoot =
trie.get(account0Hash)
@ -94,6 +94,7 @@ class StorageTrieNodeHealingRequestTest {
@ArgumentsSource(StorageFormatArguments.class)
void shouldDetectExistingData(final DataStorageFormat storageFormat) {
setup(storageFormat);
final StorageTrieNodeHealingRequest request =
new StorageTrieNodeHealingRequest(
account0StorageRoot, account0Hash, Hash.EMPTY, Bytes.EMPTY);

Loading…
Cancel
Save