fix trie by asking peers (#4312)

* fix trie by asking peers

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
pull/4362/head
matkt 2 years ago committed by GitHub
parent 101f5efbdc
commit c57dcd99d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/bonsai/BonsaiInMemoryWorldStateKeyValueStorage.java
  2. 7
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/bonsai/BonsaiLayeredWorldState.java
  3. 8
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/bonsai/BonsaiPersistedWorldState.java
  4. 7
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/bonsai/BonsaiWorldStateArchive.java
  5. 76
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/bonsai/BonsaiWorldStateKeyValueStorage.java
  6. 29
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/PeerTrieNodeFinder.java
  7. 60
      ethereum/core/src/test/java/org/hyperledger/besu/ethereum/bonsai/BonsaiWorldStateKeyValueStorageTest.java
  8. 26
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  9. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountTrieNodeDataRequest.java
  10. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageTrieNodeDataRequest.java
  11. 156
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStatePeerTrieNodeFinder.java
  12. 33
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java
  13. 61
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/SnapProtocolManagerTestUtil.java
  14. 261
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStatePeerTrieNodeFinderTest.java

@ -14,10 +14,13 @@
*/
package org.hyperledger.besu.ethereum.bonsai;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,8 +35,15 @@ public class BonsaiInMemoryWorldStateKeyValueStorage extends BonsaiWorldStateKey
final KeyValueStorage codeStorage,
final KeyValueStorage storageStorage,
final KeyValueStorage trieBranchStorage,
final KeyValueStorage trieLogStorage) {
super(accountStorage, codeStorage, storageStorage, trieBranchStorage, trieLogStorage);
final KeyValueStorage trieLogStorage,
final Optional<PeerTrieNodeFinder> fallbackNodeFinder) {
super(
accountStorage,
codeStorage,
storageStorage,
trieBranchStorage,
trieLogStorage,
fallbackNodeFinder);
}
@Override

@ -259,14 +259,15 @@ public class BonsaiLayeredWorldState implements MutableWorldState, BonsaiWorldVi
public MutableWorldState copy() {
final BonsaiPersistedWorldState bonsaiPersistedWorldState =
((BonsaiPersistedWorldState) archive.getMutable());
return new BonsaiInMemoryWorldState(
archive,
BonsaiInMemoryWorldStateKeyValueStorage bonsaiInMemoryWorldStateKeyValueStorage =
new BonsaiInMemoryWorldStateKeyValueStorage(
bonsaiPersistedWorldState.getWorldStateStorage().accountStorage,
bonsaiPersistedWorldState.getWorldStateStorage().codeStorage,
bonsaiPersistedWorldState.getWorldStateStorage().storageStorage,
bonsaiPersistedWorldState.getWorldStateStorage().trieBranchStorage,
bonsaiPersistedWorldState.getWorldStateStorage().trieLogStorage));
bonsaiPersistedWorldState.getWorldStateStorage().trieLogStorage,
bonsaiPersistedWorldState.getWorldStateStorage().getMaybeFallbackNodeFinder());
return new BonsaiInMemoryWorldState(archive, bonsaiInMemoryWorldStateKeyValueStorage);
}
@Override

@ -75,14 +75,16 @@ public class BonsaiPersistedWorldState implements MutableWorldState, BonsaiWorld
@Override
public MutableWorldState copy() {
return new BonsaiInMemoryWorldState(
archive,
BonsaiInMemoryWorldStateKeyValueStorage bonsaiInMemoryWorldStateKeyValueStorage =
new BonsaiInMemoryWorldStateKeyValueStorage(
worldStateStorage.accountStorage,
worldStateStorage.codeStorage,
worldStateStorage.storageStorage,
worldStateStorage.trieBranchStorage,
worldStateStorage.trieLogStorage));
worldStateStorage.trieLogStorage,
getWorldStateStorage().getMaybeFallbackNodeFinder());
return new BonsaiInMemoryWorldState(archive, bonsaiInMemoryWorldStateKeyValueStorage);
}
@Override

@ -16,6 +16,7 @@
package org.hyperledger.besu.ethereum.bonsai;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.hyperledger.besu.datatypes.Hash.fromPlugin;
import org.hyperledger.besu.datatypes.Address;
@ -26,6 +27,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.proof.WorldStateProof;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.evm.worldstate.WorldState;
@ -243,4 +245,9 @@ public class BonsaiWorldStateArchive implements WorldStateArchive {
// FIXME we can do proofs for layered tries and the persisted trie
return Optional.empty();
}
public void useFallbackNodeFinder(final Optional<PeerTrieNodeFinder> fallbackNodeFinder) {
checkNotNull(fallbackNodeFinder);
worldStateStorage.useFallbackNodeFinder(fallbackNodeFinder);
}
}

@ -14,12 +14,15 @@
*/
package org.hyperledger.besu.ethereum.bonsai;
import static com.google.common.base.Preconditions.checkNotNull;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StoredNodeFactory;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
@ -46,16 +49,16 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage {
protected final KeyValueStorage trieBranchStorage;
protected final KeyValueStorage trieLogStorage;
private Optional<PeerTrieNodeFinder> maybeFallbackNodeFinder;
public BonsaiWorldStateKeyValueStorage(final StorageProvider provider) {
accountStorage =
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.ACCOUNT_INFO_STATE);
codeStorage = provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.CODE_STORAGE);
storageStorage =
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.ACCOUNT_STORAGE_STORAGE);
trieBranchStorage =
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.TRIE_BRANCH_STORAGE);
trieLogStorage =
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.TRIE_LOG_STORAGE);
this(
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.ACCOUNT_INFO_STATE),
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.CODE_STORAGE),
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.ACCOUNT_STORAGE_STORAGE),
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.TRIE_BRANCH_STORAGE),
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.TRIE_LOG_STORAGE),
Optional.empty());
}
public BonsaiWorldStateKeyValueStorage(
@ -64,11 +67,28 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage {
final KeyValueStorage storageStorage,
final KeyValueStorage trieBranchStorage,
final KeyValueStorage trieLogStorage) {
this(
accountStorage,
codeStorage,
storageStorage,
trieBranchStorage,
trieLogStorage,
Optional.empty());
}
public BonsaiWorldStateKeyValueStorage(
final KeyValueStorage accountStorage,
final KeyValueStorage codeStorage,
final KeyValueStorage storageStorage,
final KeyValueStorage trieBranchStorage,
final KeyValueStorage trieLogStorage,
final Optional<PeerTrieNodeFinder> fallbackNodeFinder) {
this.accountStorage = accountStorage;
this.codeStorage = codeStorage;
this.storageStorage = storageStorage;
this.trieBranchStorage = trieBranchStorage;
this.trieLogStorage = trieLogStorage;
this.maybeFallbackNodeFinder = fallbackNodeFinder;
}
@Override
@ -104,7 +124,17 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage {
if (nodeHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
return Optional.of(MerklePatriciaTrie.EMPTY_TRIE_NODE);
} else {
return trieBranchStorage.get(location.toArrayUnsafe()).map(Bytes::wrap);
final Optional<Bytes> value =
trieBranchStorage.get(location.toArrayUnsafe()).map(Bytes::wrap);
if (value.isPresent()) {
return value
.filter(b -> Hash.hash(b).equals(nodeHash))
.or(
() ->
maybeFallbackNodeFinder.flatMap(
finder -> finder.getAccountStateTrieNode(location, nodeHash)));
}
return Optional.empty();
}
}
@ -114,9 +144,20 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage {
if (nodeHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
return Optional.of(MerklePatriciaTrie.EMPTY_TRIE_NODE);
} else {
return trieBranchStorage
.get(Bytes.concatenate(accountHash, location).toArrayUnsafe())
.map(Bytes::wrap);
final Optional<Bytes> value =
trieBranchStorage
.get(Bytes.concatenate(accountHash, location).toArrayUnsafe())
.map(Bytes::wrap);
if (value.isPresent()) {
return value
.filter(b -> Hash.hash(b).equals(nodeHash))
.or(
() ->
maybeFallbackNodeFinder.flatMap(
finder ->
finder.getAccountStorageTrieNode(accountHash, location, nodeHash)));
}
return Optional.empty();
}
}
@ -218,6 +259,15 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage {
throw new RuntimeException("removeNodeAddedListener not available");
}
public Optional<PeerTrieNodeFinder> getMaybeFallbackNodeFinder() {
return maybeFallbackNodeFinder;
}
public void useFallbackNodeFinder(final Optional<PeerTrieNodeFinder> maybeFallbackNodeFinder) {
checkNotNull(maybeFallbackNodeFinder);
this.maybeFallbackNodeFinder = maybeFallbackNodeFinder;
}
public static class Updater implements WorldStateStorage.Updater {
private final KeyValueStorageTransaction accountStorageTransaction;

@ -0,0 +1,29 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.worldstate;
import org.hyperledger.besu.datatypes.Hash;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
public interface PeerTrieNodeFinder {
Optional<Bytes> getAccountStateTrieNode(final Bytes location, final Bytes32 nodeHash);
Optional<Bytes> getAccountStorageTrieNode(Hash accountHash, Bytes location, Bytes32 nodeHash);
}

@ -18,9 +18,11 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage.WORLD_ROOT_HASH_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
@ -30,8 +32,10 @@ import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StorageEntriesCollector;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.tuweni.bytes.Bytes;
@ -291,6 +295,62 @@ public class BonsaiWorldStateKeyValueStorageTest {
assertThat(storage.isWorldStateAvailable(Bytes32.wrap(nodeHashKey), Hash.EMPTY)).isTrue();
}
@Test
public void getAccountStateTrieNode_callFallbackMechanismForInvalidNode() {
PeerTrieNodeFinder peerTrieNodeFinder = mock(PeerTrieNodeFinder.class);
final Bytes location = Bytes.fromHexString("0x01");
final Bytes bytesInDB = Bytes.fromHexString("0x123456");
final Hash hashToFind = Hash.hash(Bytes.of(1));
final Bytes bytesToFind = Bytes.fromHexString("0x123457");
final BonsaiWorldStateKeyValueStorage storage = emptyStorage();
when(peerTrieNodeFinder.getAccountStateTrieNode(location, hashToFind))
.thenReturn(Optional.of(bytesToFind));
storage.useFallbackNodeFinder(Optional.of(peerTrieNodeFinder));
storage.updater().putAccountStateTrieNode(location, Hash.hash(bytesInDB), bytesInDB).commit();
Optional<Bytes> accountStateTrieNodeResult =
storage.getAccountStateTrieNode(location, hashToFind);
verify(peerTrieNodeFinder).getAccountStateTrieNode(location, hashToFind);
assertThat(accountStateTrieNodeResult).contains(bytesToFind);
}
@Test
public void getAccountStorageTrieNode_callFallbackMechanismForInvalidNode() {
PeerTrieNodeFinder peerTrieNodeFinder = mock(PeerTrieNodeFinder.class);
final Hash account = Hash.hash(Bytes32.ZERO);
final Bytes location = Bytes.fromHexString("0x01");
final Bytes bytesInDB = Bytes.fromHexString("0x123456");
final Hash hashToFind = Hash.hash(Bytes.of(1));
final Bytes bytesToFind = Bytes.fromHexString("0x123457");
final BonsaiWorldStateKeyValueStorage storage = emptyStorage();
when(peerTrieNodeFinder.getAccountStorageTrieNode(account, location, hashToFind))
.thenReturn(Optional.of(bytesToFind));
storage.useFallbackNodeFinder(Optional.of(peerTrieNodeFinder));
storage
.updater()
.putAccountStorageTrieNode(account, location, Hash.hash(bytesInDB), bytesInDB)
.commit();
Optional<Bytes> accountStateTrieNodeResult =
storage.getAccountStorageTrieNode(account, location, hashToFind);
verify(peerTrieNodeFinder).getAccountStorageTrieNode(account, location, hashToFind);
assertThat(accountStateTrieNodeResult).contains(bytesToFind);
}
private BonsaiWorldStateKeyValueStorage emptyStorage() {
return new BonsaiWorldStateKeyValueStorage(new InMemoryKeyValueStorageProvider());
}

@ -19,6 +19,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import org.hyperledger.besu.consensus.merge.ForkchoiceMessageListener;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateArchive;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.checkpointsync.CheckpointDownloaderFactory;
@ -30,7 +32,9 @@ import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStatePeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.worldstate.Pruner;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
@ -57,7 +61,10 @@ public class DefaultSynchronizer implements Synchronizer, ForkchoiceMessageListe
private final Optional<BlockPropagationManager> blockPropagationManager;
private final Optional<FastSyncDownloader<?>> fastSyncDownloader;
private final Optional<FullSyncDownloader> fullSyncDownloader;
private final EthContext ethContext;
private final ProtocolContext protocolContext;
private final WorldStateStorage worldStateStorage;
private final MetricsSystem metricsSystem;
private final PivotBlockSelector pivotBlockSelector;
private final SyncTerminationCondition terminationCondition;
@ -78,7 +85,10 @@ public class DefaultSynchronizer implements Synchronizer, ForkchoiceMessageListe
this.maybePruner = maybePruner;
this.syncState = syncState;
this.pivotBlockSelector = pivotBlockSelector;
this.ethContext = ethContext;
this.protocolContext = protocolContext;
this.worldStateStorage = worldStateStorage;
this.metricsSystem = metricsSystem;
this.terminationCondition = terminationCondition;
ChainHeadTracker.trackChainHeadForPeers(
@ -193,6 +203,7 @@ public class DefaultSynchronizer implements Synchronizer, ForkchoiceMessageListe
} else {
syncState.markInitialSyncPhaseAsDone();
enableFallbackNodeFinder();
future = startFullSync();
}
return future.thenApply(this::finalizeSync);
@ -241,6 +252,8 @@ public class DefaultSynchronizer implements Synchronizer, ForkchoiceMessageListe
pivotBlockSelector.close();
syncState.markInitialSyncPhaseAsDone();
enableFallbackNodeFinder();
if (terminationCondition.shouldContinueDownload()) {
return startFullSync();
} else {
@ -249,6 +262,19 @@ public class DefaultSynchronizer implements Synchronizer, ForkchoiceMessageListe
}
}
private void enableFallbackNodeFinder() {
if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) {
final Optional<PeerTrieNodeFinder> fallbackNodeFinder =
Optional.of(
new WorldStatePeerTrieNodeFinder(
ethContext, protocolContext.getBlockchain(), metricsSystem));
((BonsaiWorldStateArchive) protocolContext.getWorldStateArchive())
.useFallbackNodeFinder(fallbackNodeFinder);
((BonsaiWorldStateKeyValueStorage) worldStateStorage)
.useFallbackNodeFinder(fallbackNodeFinder);
}
}
private CompletableFuture<Void> startFullSync() {
maybePruner.ifPresent(Pruner::start);
return fullSyncDownloader

@ -65,8 +65,7 @@ public class AccountTrieNodeDataRequest extends TrieNodeDataRequest {
public Optional<Bytes> getExistingData(final WorldStateStorage worldStateStorage) {
return worldStateStorage
.getAccountStateTrieNode(getLocation(), getNodeHash())
.filter(data -> !getLocation().isEmpty())
.filter(data -> Hash.hash(data).equals(getNodeHash()));
.filter(data -> !getLocation().isEmpty());
}
@Override

@ -52,9 +52,8 @@ public class StorageTrieNodeDataRequest extends TrieNodeDataRequest {
@Override
public Optional<Bytes> getExistingData(final WorldStateStorage worldStateStorage) {
return worldStateStorage
.getAccountStorageTrieNode(getAccountHash(), getLocation(), getNodeHash())
.filter(data -> Hash.hash(data).equals(getNodeHash()));
return worldStateStorage.getAccountStorageTrieNode(
getAccountHash(), getLocation(), getNodeHash());
}
@Override

@ -0,0 +1,156 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.worldstate;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.snap.RetryingGetTrieNodeFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetNodeDataFromPeerTask;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** This class is used to retrieve missing nodes in the trie by querying the peers */
public class WorldStatePeerTrieNodeFinder implements PeerTrieNodeFinder {
private static final Logger LOG = LoggerFactory.getLogger(WorldStatePeerTrieNodeFinder.class);
private final Cache<Bytes32, Bytes> foundNodes =
CacheBuilder.newBuilder().maximumSize(10_000).expireAfterWrite(5, TimeUnit.MINUTES).build();
private static final long TIMEOUT_SECONDS = 1;
final EthContext ethContext;
final Blockchain blockchain;
final MetricsSystem metricsSystem;
public WorldStatePeerTrieNodeFinder(
final EthContext ethContext, final Blockchain blockchain, final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.blockchain = blockchain;
this.metricsSystem = metricsSystem;
}
@Override
public Optional<Bytes> getAccountStateTrieNode(final Bytes location, final Bytes32 nodeHash) {
Optional<Bytes> cachedValue = Optional.ofNullable(foundNodes.getIfPresent(nodeHash));
if (cachedValue.isPresent()) {
return cachedValue;
}
final Optional<Bytes> response =
findByGetNodeData(Hash.wrap(nodeHash))
.or(() -> findByGetTrieNodeData(Hash.wrap(nodeHash), Optional.empty(), location));
response.ifPresent(
bytes -> {
LOG.debug(
"Fixed missing account state trie node for location {} and hash {}",
location,
nodeHash);
foundNodes.put(nodeHash, bytes);
});
return response;
}
@Override
public Optional<Bytes> getAccountStorageTrieNode(
final Hash accountHash, final Bytes location, final Bytes32 nodeHash) {
Optional<Bytes> cachedValue = Optional.ofNullable(foundNodes.getIfPresent(nodeHash));
if (cachedValue.isPresent()) {
return cachedValue;
}
final Optional<Bytes> response =
findByGetNodeData(Hash.wrap(nodeHash))
.or(
() ->
findByGetTrieNodeData(Hash.wrap(nodeHash), Optional.of(accountHash), location));
response.ifPresent(
bytes -> {
LOG.debug(
"Fixed missing storage state trie node for location {} and hash {}",
location,
nodeHash);
foundNodes.put(nodeHash, bytes);
});
return response;
}
public Optional<Bytes> findByGetNodeData(final Hash nodeHash) {
final BlockHeader chainHead = blockchain.getChainHeadHeader();
final RetryingGetNodeDataFromPeerTask retryingGetNodeDataFromPeerTask =
RetryingGetNodeDataFromPeerTask.forHashes(
ethContext, List.of(nodeHash), chainHead.getNumber(), metricsSystem);
try {
final Map<Hash, Bytes> response =
retryingGetNodeDataFromPeerTask.run().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (response.containsKey(nodeHash)) {
LOG.debug("Found node {} with getNodeData request", nodeHash);
return Optional.of(response.get(nodeHash));
} else {
LOG.debug("Found invalid node {} with getNodeData request", nodeHash);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.debug("Error when trying to find node {} with getNodeData request", nodeHash);
}
return Optional.empty();
}
public Optional<Bytes> findByGetTrieNodeData(
final Hash nodeHash, final Optional<Bytes32> accountHash, final Bytes location) {
final BlockHeader chainHead = blockchain.getChainHeadHeader();
final Map<Bytes, List<Bytes>> request = new HashMap<>();
if (accountHash.isPresent()) {
request.put(accountHash.get(), List.of(CompactEncoding.encode(location)));
} else {
request.put(CompactEncoding.encode(location), new ArrayList<>());
}
final Bytes path = CompactEncoding.encode(location);
final EthTask<Map<Bytes, Bytes>> getTrieNodeFromPeerTask =
RetryingGetTrieNodeFromPeerTask.forTrieNodes(ethContext, request, chainHead, metricsSystem);
try {
final Map<Bytes, Bytes> response =
getTrieNodeFromPeerTask.run().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
final Bytes nodeValue =
response.get(Bytes.concatenate(accountHash.map(Bytes::wrap).orElse(Bytes.EMPTY), path));
if (nodeValue != null && Hash.hash(nodeValue).equals(nodeHash)) {
LOG.debug("Found node {} with getTrieNode request", nodeHash);
return Optional.of(nodeValue);
} else {
LOG.debug("Found invalid node {} with getTrieNode request", nodeHash);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.debug("Error when trying to find node {} with getTrieNode request", nodeHash);
}
return Optional.empty();
}
}

@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
@ -68,14 +69,17 @@ public class RespondingEthPeer {
private final EthPeer ethPeer;
private final BlockingQueue<OutgoingMessage> outgoingMessages;
private final EthProtocolManager ethProtocolManager;
private final Optional<SnapProtocolManager> snapProtocolManager;
private final MockPeerConnection peerConnection;
private RespondingEthPeer(
final EthProtocolManager ethProtocolManager,
final Optional<SnapProtocolManager> snapProtocolManager,
final MockPeerConnection peerConnection,
final EthPeer ethPeer,
final BlockingQueue<OutgoingMessage> outgoingMessages) {
this.ethProtocolManager = ethProtocolManager;
this.snapProtocolManager = snapProtocolManager;
this.peerConnection = peerConnection;
this.ethPeer = ethPeer;
this.outgoingMessages = outgoingMessages;
@ -113,6 +117,7 @@ public class RespondingEthPeer {
private static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final Optional<SnapProtocolManager> snapProtocolManager,
final Hash chainHeadHash,
final Difficulty totalDifficulty,
final OptionalLong estimatedHeight,
@ -130,7 +135,8 @@ public class RespondingEthPeer {
estimatedHeight.ifPresent(height -> peer.chainState().update(chainHeadHash, height));
peer.registerStatusSent();
return new RespondingEthPeer(ethProtocolManager, peerConnection, peer, outgoingMessages);
return new RespondingEthPeer(
ethProtocolManager, snapProtocolManager, peerConnection, peer, outgoingMessages);
}
public EthPeer getEthPeer() {
@ -197,9 +203,16 @@ public class RespondingEthPeer {
private void respondToMessage(final Responder responder, final OutgoingMessage msg) {
final Optional<MessageData> maybeResponse = responder.respond(msg.capability, msg.messageData);
maybeResponse.ifPresent(
(response) ->
(response) -> {
if (ethProtocolManager.getSupportedCapabilities().contains(msg.capability)) {
ethProtocolManager.processMessage(
msg.capability, new DefaultMessage(peerConnection, response)));
msg.capability, new DefaultMessage(peerConnection, response));
} else
snapProtocolManager.ifPresent(
protocolManager ->
protocolManager.processMessage(
msg.capability, new DefaultMessage(peerConnection, response)));
});
}
public Optional<MessageData> peekNextOutgoingRequest() {
@ -376,6 +389,7 @@ public class RespondingEthPeer {
public static class Builder {
private EthProtocolManager ethProtocolManager;
private Optional<SnapProtocolManager> snapProtocolManager = Optional.empty();
private Hash chainHeadHash = gen.hash();
private Difficulty totalDifficulty = Difficulty.of(1000L);
private OptionalLong estimatedHeight = OptionalLong.of(1000L);
@ -385,7 +399,12 @@ public class RespondingEthPeer {
checkNotNull(ethProtocolManager, "Must configure EthProtocolManager");
return RespondingEthPeer.create(
ethProtocolManager, chainHeadHash, totalDifficulty, estimatedHeight, peerValidators);
ethProtocolManager,
snapProtocolManager,
chainHeadHash,
totalDifficulty,
estimatedHeight,
peerValidators);
}
public Builder ethProtocolManager(final EthProtocolManager ethProtocolManager) {
@ -394,6 +413,12 @@ public class RespondingEthPeer {
return this;
}
public Builder snapProtocolManager(final SnapProtocolManager snapProtocolManager) {
checkNotNull(snapProtocolManager);
this.snapProtocolManager = Optional.of(snapProtocolManager);
return this;
}
public Builder chainHeadHash(final Hash chainHeadHash) {
checkNotNull(chainHeadHash);
this.chainHeadHash = chainHeadHash;

@ -0,0 +1,61 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.util.Collections;
public class SnapProtocolManagerTestUtil {
public static SnapProtocolManager create(final EthPeers ethPeers) {
return create(
BlockchainSetupUtil.forTesting(DataStorageFormat.FOREST).getWorldArchive(), ethPeers);
}
public static SnapProtocolManager create(
final WorldStateArchive worldStateArchive, final EthPeers ethPeers) {
EthMessages messages = new EthMessages();
return new SnapProtocolManager(Collections.emptyList(), ethPeers, messages, worldStateArchive);
}
public static SnapProtocolManager create(
final WorldStateArchive worldStateArchive,
final EthPeers ethPeers,
final EthMessages snapMessages) {
return new SnapProtocolManager(
Collections.emptyList(), ethPeers, snapMessages, worldStateArchive);
}
public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager,
final SnapProtocolManager snapProtocolManager,
final long estimatedHeight) {
return RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.snapProtocolManager(snapProtocolManager)
.estimatedHeight(estimatedHeight)
.build();
}
}

@ -0,0 +1,261 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.worldstate;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.PeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.task.SnapProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.messages.EthPV63;
import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodesMessage;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.math.BigInteger;
import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@SuppressWarnings({"FieldCanBeLocal", "unused"})
@RunWith(MockitoJUnitRunner.class)
public class WorldStatePeerTrieNodeFinderTest {
WorldStatePeerTrieNodeFinder worldStatePeerTrieNodeFinder;
private final BlockHeaderTestFixture blockHeaderBuilder = new BlockHeaderTestFixture();
@Mock private Blockchain blockchain;
private EthProtocolManager ethProtocolManager;
private SnapProtocolManager snapProtocolManager;
private EthPeers ethPeers;
private final PeerRequest peerRequest = mock(PeerRequest.class);
private final RequestManager.ResponseStream responseStream =
mock(RequestManager.ResponseStream.class);
@Before
public void setup() throws Exception {
ethProtocolManager = EthProtocolManagerTestUtil.create();
ethPeers = ethProtocolManager.ethContext().getEthPeers();
snapProtocolManager = SnapProtocolManagerTestUtil.create(ethPeers);
worldStatePeerTrieNodeFinder =
new WorldStatePeerTrieNodeFinder(
ethProtocolManager.ethContext(), blockchain, new NoOpMetricsSystem());
}
private RespondingEthPeer.Responder respondToGetNodeDataRequest(
final RespondingEthPeer peer, final Bytes32 nodeValue) {
return RespondingEthPeer.targetedResponder(
(cap, msg) -> {
if (msg.getCode() != EthPV63.GET_NODE_DATA) {
return false;
}
return true;
},
(cap, msg) -> NodeDataMessage.create(List.of(nodeValue)));
}
private RespondingEthPeer.Responder respondToGetTrieNodeRequest(
final RespondingEthPeer peer, final Bytes32 nodeValue) {
return RespondingEthPeer.targetedResponder(
(cap, msg) -> {
if (msg.getCode() != SnapV1.GET_TRIE_NODES) {
return false;
}
return true;
},
(cap, msg) -> TrieNodesMessage.create(Optional.of(BigInteger.ZERO), List.of(nodeValue)));
}
@Test
public void getAccountStateTrieNodeShouldReturnValueFromGetNodeDataRequest() {
BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
final RespondingEthPeer targetPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, blockHeader.getNumber());
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
new Thread(
() ->
targetPeer.respondWhileOtherThreadsWork(
respondToGetNodeDataRequest(targetPeer, nodeValue),
() -> response.accountStateTrieNode.isEmpty()))
.start();
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStateTrieNode(Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).contains(nodeValue);
}
@Test
public void getAccountStateTrieNodeShouldReturnValueFromGetTrieNodeRequest() {
final BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
final RespondingEthPeer targetPeer =
SnapProtocolManagerTestUtil.createPeer(
ethProtocolManager, snapProtocolManager, blockHeader.getNumber());
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
new Thread(
() ->
targetPeer.respondWhileOtherThreadsWork(
respondToGetTrieNodeRequest(targetPeer, nodeValue),
() -> response.accountStateTrieNode.isEmpty()))
.start();
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStateTrieNode(Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).contains(nodeValue);
}
@Test
public void getAccountStateTrieNodeShouldReturnEmptyWhenFoundNothing() {
final BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStateTrieNode(Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).isEmpty();
}
@Test
public void getAccountStorageTrieNodeShouldReturnValueFromGetNodeDataRequest() {
BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
final Hash accountHash = Hash.wrap(Bytes32.random());
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
final RespondingEthPeer targetPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, blockHeader.getNumber());
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
new Thread(
() ->
targetPeer.respondWhileOtherThreadsWork(
respondToGetNodeDataRequest(targetPeer, nodeValue),
() -> response.accountStateTrieNode.isEmpty()))
.start();
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStorageTrieNode(accountHash, Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).contains(nodeValue);
}
@Test
public void getAccountStorageTrieNodeShouldReturnValueFromGetTrieNodeRequest() {
final BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
final Hash accountHash = Hash.wrap(Bytes32.random());
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
final RespondingEthPeer targetPeer =
SnapProtocolManagerTestUtil.createPeer(
ethProtocolManager, snapProtocolManager, blockHeader.getNumber());
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
new Thread(
() ->
targetPeer.respondWhileOtherThreadsWork(
respondToGetTrieNodeRequest(targetPeer, nodeValue),
() -> response.accountStateTrieNode.isEmpty()))
.start();
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStorageTrieNode(accountHash, Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).contains(nodeValue);
}
@Test
public void getAccountStorageTrieNodeShouldReturnEmptyWhenFoundNothing() {
final BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
final Hash accountHash = Hash.wrap(Bytes32.random());
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStorageTrieNode(accountHash, Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).isEmpty();
}
}
Loading…
Cancel
Save