Snap server rebase (#6640)

* initial snap server implementation

Signed-off-by: garyschulte <garyschulte@gmail.com>
Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
pull/6856/head 24.3.1
garyschulte 8 months ago committed by GitHub
parent deaea9b34d
commit 34fc5eed58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 2
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  3. 16
      besu/src/main/java/org/hyperledger/besu/cli/ConfigurationOverviewBuilder.java
  4. 23
      besu/src/main/java/org/hyperledger/besu/cli/options/unstable/SynchronizerOptions.java
  5. 28
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  6. 3
      datatypes/src/main/java/org/hyperledger/besu/datatypes/Hash.java
  7. 1
      ethereum/core/build.gradle
  8. 30
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/proof/WorldStateProofProvider.java
  9. 2
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/cache/NoOpBonsaiCachedWorldStorageManager.java
  10. 4
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/BonsaiWorldStateKeyValueStorage.java
  11. 2
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/DiffBasedWorldStateProvider.java
  12. 50
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/cache/DiffBasedCachedWorldStorageManager.java
  13. 26
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/DiffBasedWorldStateKeyValueStorage.java
  14. 101
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/flat/FlatDbStrategy.java
  15. 2
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/flat/FlatDbStrategyProvider.java
  16. 3
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/WorldStateArchive.java
  17. 5
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/WorldStateStorageCoordinator.java
  18. 8
      ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/BonsaiSnapshotIsolationTests.java
  19. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/EthProtocolConfiguration.java
  20. 12
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java
  21. 626
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapServer.java
  22. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/GetAccountRangeMessage.java
  23. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/GetStorageRangeMessage.java
  24. 9
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  25. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncConfiguration.java
  26. 65
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java
  27. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java
  28. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java
  29. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/StorageFlatDatabaseHealingRangeRequest.java
  30. 629
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapServerTest.java
  31. 61
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/SnapProtocolManagerTestUtil.java
  32. 32
      ethereum/trie/src/main/java/org/hyperledger/besu/ethereum/trie/CompactEncoding.java
  33. 10
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java
  34. 119
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LayeredKeyValueStorage.java
  35. 52
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedInMemoryKeyValueStorage.java

@ -31,6 +31,7 @@
- Dedicated log marker for invalid txs removed from the txpool [#6826](https://github.com/hyperledger/besu/pull/6826) - Dedicated log marker for invalid txs removed from the txpool [#6826](https://github.com/hyperledger/besu/pull/6826)
- Prevent startup with BONSAI and privacy enabled [#6809](https://github.com/hyperledger/besu/pull/6809) - Prevent startup with BONSAI and privacy enabled [#6809](https://github.com/hyperledger/besu/pull/6809)
- Remove deprecated Forest pruning [#6810](https://github.com/hyperledger/besu/pull/6810) - Remove deprecated Forest pruning [#6810](https://github.com/hyperledger/besu/pull/6810)
- Experimental Snap Sync Server [#6640](https://github.com/hyperledger/besu/pull/6640)
### Bug fixes ### Bug fixes
- Fix txpool dump/restore race condition [#6665](https://github.com/hyperledger/besu/pull/6665) - Fix txpool dump/restore race condition [#6665](https://github.com/hyperledger/besu/pull/6665)

@ -2732,6 +2732,8 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
getDataStorageConfiguration().getUnstable().getBonsaiTrieLogPruningWindowSize()); getDataStorageConfiguration().getUnstable().getBonsaiTrieLogPruningWindowSize());
} }
builder.setSnapServerEnabled(this.unstableSynchronizerOptions.isSnapsyncServerEnabled());
builder.setTxPoolImplementation(buildTransactionPoolConfiguration().getTxPoolImplementation()); builder.setTxPoolImplementation(buildTransactionPoolConfiguration().getTxPoolImplementation());
builder.setWorldStateUpdateMode(unstableEvmOptions.toDomainObject().worldUpdaterMode()); builder.setWorldStateUpdateMode(unstableEvmOptions.toDomainObject().worldUpdaterMode());

@ -55,6 +55,7 @@ public class ConfigurationOverviewBuilder {
private boolean isBonsaiLimitTrieLogsEnabled = false; private boolean isBonsaiLimitTrieLogsEnabled = false;
private long trieLogRetentionLimit = 0; private long trieLogRetentionLimit = 0;
private Integer trieLogsPruningWindowSize = null; private Integer trieLogsPruningWindowSize = null;
private boolean isSnapServerEnabled = false;
private TransactionPoolConfiguration.Implementation txPoolImplementation; private TransactionPoolConfiguration.Implementation txPoolImplementation;
private EvmConfiguration.WorldUpdaterMode worldStateUpdateMode; private EvmConfiguration.WorldUpdaterMode worldStateUpdateMode;
private Map<String, String> environment; private Map<String, String> environment;
@ -219,6 +220,17 @@ public class ConfigurationOverviewBuilder {
return this; return this;
} }
/**
* Sets snap server enabled/disabled
*
* @param snapServerEnabled bool to indicate if snap server is enabled
* @return the builder
*/
public ConfigurationOverviewBuilder setSnapServerEnabled(final boolean snapServerEnabled) {
isSnapServerEnabled = snapServerEnabled;
return this;
}
/** /**
* Sets trie logs pruning window size * Sets trie logs pruning window size
* *
@ -339,6 +351,10 @@ public class ConfigurationOverviewBuilder {
lines.add("Using " + worldStateUpdateMode + " worldstate update mode"); lines.add("Using " + worldStateUpdateMode + " worldstate update mode");
if (isSnapServerEnabled) {
lines.add("Experimental Snap Sync server enabled");
}
if (isBonsaiLimitTrieLogsEnabled) { if (isBonsaiLimitTrieLogsEnabled) {
final StringBuilder trieLogPruningString = new StringBuilder(); final StringBuilder trieLogPruningString = new StringBuilder();
trieLogPruningString trieLogPruningString

@ -82,6 +82,8 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
private static final String SNAP_FLAT_DB_HEALING_ENABLED_FLAG = private static final String SNAP_FLAT_DB_HEALING_ENABLED_FLAG =
"--Xsnapsync-synchronizer-flat-db-healing-enabled"; "--Xsnapsync-synchronizer-flat-db-healing-enabled";
private static final String SNAP_SERVER_ENABLED_FLAG = "--Xsnapsync-server-enabled";
private static final String CHECKPOINT_POST_MERGE_FLAG = "--Xcheckpoint-post-merge-enabled"; private static final String CHECKPOINT_POST_MERGE_FLAG = "--Xcheckpoint-post-merge-enabled";
/** /**
@ -296,6 +298,13 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
private Boolean snapsyncFlatDbHealingEnabled = private Boolean snapsyncFlatDbHealingEnabled =
SnapSyncConfiguration.DEFAULT_IS_FLAT_DB_HEALING_ENABLED; SnapSyncConfiguration.DEFAULT_IS_FLAT_DB_HEALING_ENABLED;
@CommandLine.Option(
names = SNAP_SERVER_ENABLED_FLAG,
hidden = true,
paramLabel = "<Boolean>",
description = "Snap sync server enabled (default: ${DEFAULT-VALUE})")
private Boolean snapsyncServerEnabled = SnapSyncConfiguration.DEFAULT_SNAP_SERVER_ENABLED;
@CommandLine.Option( @CommandLine.Option(
names = {CHECKPOINT_POST_MERGE_FLAG}, names = {CHECKPOINT_POST_MERGE_FLAG},
hidden = true, hidden = true,
@ -314,6 +323,15 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
return snapsyncFlatDbHealingEnabled; return snapsyncFlatDbHealingEnabled;
} }
/**
* Flag to know whether the Snap sync server feature is enabled or disabled.
*
* @return true if snap sync server is enabled
*/
public boolean isSnapsyncServerEnabled() {
return snapsyncServerEnabled;
}
/** /**
* Create synchronizer options. * Create synchronizer options.
* *
@ -398,6 +416,7 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
.localFlatAccountCountToHealPerRequest(snapsyncFlatAccountHealedCountPerRequest) .localFlatAccountCountToHealPerRequest(snapsyncFlatAccountHealedCountPerRequest)
.localFlatStorageCountToHealPerRequest(snapsyncFlatStorageHealedCountPerRequest) .localFlatStorageCountToHealPerRequest(snapsyncFlatStorageHealedCountPerRequest)
.isFlatDbHealingEnabled(snapsyncFlatDbHealingEnabled) .isFlatDbHealingEnabled(snapsyncFlatDbHealingEnabled)
.isSnapServerEnabled(snapsyncServerEnabled)
.build()); .build());
builder.checkpointPostMergeEnabled(checkpointPostMergeSyncEnabled); builder.checkpointPostMergeEnabled(checkpointPostMergeSyncEnabled);
@ -456,7 +475,9 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
SNAP_FLAT_ACCOUNT_HEALED_COUNT_PER_REQUEST_FLAG, SNAP_FLAT_ACCOUNT_HEALED_COUNT_PER_REQUEST_FLAG,
OptionParser.format(snapsyncFlatAccountHealedCountPerRequest), OptionParser.format(snapsyncFlatAccountHealedCountPerRequest),
SNAP_FLAT_STORAGE_HEALED_COUNT_PER_REQUEST_FLAG, SNAP_FLAT_STORAGE_HEALED_COUNT_PER_REQUEST_FLAG,
OptionParser.format(snapsyncFlatStorageHealedCountPerRequest))); OptionParser.format(snapsyncFlatStorageHealedCountPerRequest),
SNAP_SERVER_ENABLED_FLAG,
OptionParser.format(snapsyncServerEnabled)));
} }
return value; return value;
} }

@ -652,9 +652,6 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
peerValidators, peerValidators,
Optional.empty()); Optional.empty());
final Optional<SnapProtocolManager> maybeSnapProtocolManager =
createSnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive);
final PivotBlockSelector pivotBlockSelector = final PivotBlockSelector pivotBlockSelector =
createPivotSelector( createPivotSelector(
protocolSchedule, protocolContext, ethContext, syncState, metricsSystem); protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
@ -671,6 +668,10 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
protocolContext.setSynchronizer(Optional.of(synchronizer)); protocolContext.setSynchronizer(Optional.of(synchronizer));
final Optional<SnapProtocolManager> maybeSnapProtocolManager =
createSnapProtocolManager(
protocolContext, worldStateStorageCoordinator, ethPeers, snapMessages);
final MiningCoordinator miningCoordinator = final MiningCoordinator miningCoordinator =
createMiningCoordinator( createMiningCoordinator(
protocolSchedule, protocolSchedule,
@ -986,12 +987,23 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
} }
private Optional<SnapProtocolManager> createSnapProtocolManager( private Optional<SnapProtocolManager> createSnapProtocolManager(
final List<PeerValidator> peerValidators, final ProtocolContext protocolContext,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final EthPeers ethPeers, final EthPeers ethPeers,
final EthMessages snapMessages, final EthMessages snapMessages) {
final WorldStateArchive worldStateArchive) { if (Optional.ofNullable(syncConfig.getSnapSyncConfiguration())
return Optional.of( .map(snapConfig -> snapConfig.isSnapServerEnabled())
new SnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive)); .orElse(false)) {
return Optional.of(
new SnapProtocolManager(
worldStateStorageCoordinator,
syncConfig.getSnapSyncConfiguration(),
ethPeers,
snapMessages,
protocolContext));
} else {
return Optional.empty();
}
} }
WorldStateArchive createWorldStateArchive( WorldStateArchive createWorldStateArchive(

@ -29,6 +29,9 @@ public class Hash extends DelegatingBytes32 {
/** The constant ZERO. */ /** The constant ZERO. */
public static final Hash ZERO = new Hash(Bytes32.ZERO); public static final Hash ZERO = new Hash(Bytes32.ZERO);
/** Last hash */
public static final Hash LAST = new Hash(Bytes32.fromHexString("F".repeat(64)));
/** /**
* Hash of an RLP encoded trie hash with no content, or * Hash of an RLP encoded trie hash with no content, or
* "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421" * "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"

@ -48,6 +48,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.google.guava:guava' implementation 'com.google.guava:guava'
implementation 'com.github.ben-manes.caffeine:caffeine'
implementation 'com.google.dagger:dagger' implementation 'com.google.dagger:dagger'
implementation 'org.apache.maven:maven-artifact' implementation 'org.apache.maven:maven-artifact'
annotationProcessor 'com.google.dagger:dagger-compiler' annotationProcessor 'com.google.dagger:dagger-compiler'

@ -28,6 +28,7 @@ import org.hyperledger.besu.ethereum.trie.patricia.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue; import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -41,6 +42,8 @@ import com.google.common.collect.Ordering;
import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256; import org.apache.tuweni.units.bigints.UInt256;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* The WorldStateProofProvider class is responsible for providing proofs for world state entries. It * The WorldStateProofProvider class is responsible for providing proofs for world state entries. It
@ -49,6 +52,7 @@ import org.apache.tuweni.units.bigints.UInt256;
public class WorldStateProofProvider { public class WorldStateProofProvider {
private final WorldStateStorageCoordinator worldStateStorageCoordinator; private final WorldStateStorageCoordinator worldStateStorageCoordinator;
private static final Logger LOG = LoggerFactory.getLogger(WorldStateProofProvider.class);
public WorldStateProofProvider(final WorldStateStorageCoordinator worldStateStorageCoordinator) { public WorldStateProofProvider(final WorldStateStorageCoordinator worldStateStorageCoordinator) {
this.worldStateStorageCoordinator = worldStateStorageCoordinator; this.worldStateStorageCoordinator = worldStateStorageCoordinator;
@ -85,7 +89,8 @@ public class WorldStateProofProvider {
final List<UInt256> accountStorageKeys) { final List<UInt256> accountStorageKeys) {
final MerkleTrie<Bytes32, Bytes> storageTrie = final MerkleTrie<Bytes32, Bytes> storageTrie =
newAccountStorageTrie(accountHash, account.getStorageRoot()); newAccountStorageTrie(accountHash, account.getStorageRoot());
final NavigableMap<UInt256, Proof<Bytes>> storageProofs = new TreeMap<>(); final NavigableMap<UInt256, Proof<Bytes>> storageProofs =
new TreeMap<>(Comparator.comparing(Bytes32::toHexString));
accountStorageKeys.forEach( accountStorageKeys.forEach(
key -> storageProofs.put(key, storageTrie.getValueWithProof(Hash.hash(key)))); key -> storageProofs.put(key, storageTrie.getValueWithProof(Hash.hash(key))));
return storageProofs; return storageProofs;
@ -153,19 +158,26 @@ public class WorldStateProofProvider {
final SortedMap<Bytes32, Bytes> keys) { final SortedMap<Bytes32, Bytes> keys) {
// check if it's monotonic increasing // check if it's monotonic increasing
if (!Ordering.natural().isOrdered(keys.keySet())) { if (keys.size() > 1 && !Ordering.natural().isOrdered(keys.keySet())) {
return false; return false;
} }
// when proof is empty we need to have all the keys to reconstruct the trie // when proof is empty and we requested the full range, we should
// have all the keys to reconstruct the trie
if (proofs.isEmpty()) { if (proofs.isEmpty()) {
final MerkleTrie<Bytes, Bytes> trie = new SimpleMerklePatriciaTrie<>(Function.identity()); if (startKeyHash.equals(Bytes32.ZERO)) {
// add the received keys in the trie final MerkleTrie<Bytes, Bytes> trie = new SimpleMerklePatriciaTrie<>(Function.identity());
for (Map.Entry<Bytes32, Bytes> key : keys.entrySet()) { // add the received keys in the trie
trie.put(key.getKey(), key.getValue()); for (Map.Entry<Bytes32, Bytes> key : keys.entrySet()) {
trie.put(key.getKey(), key.getValue());
}
return rootHash.equals(trie.getRootHash());
} else {
// TODO: possibly accept a node loader so we can verify this with already
// completed partial storage requests
LOG.info("failing proof due to incomplete range without proofs");
return false;
} }
return rootHash.equals(trie.getRootHash());
} }
// reconstruct a part of the trie with the proof // reconstruct a part of the trie with the proof

@ -38,7 +38,7 @@ public class NoOpBonsaiCachedWorldStorageManager extends BonsaiCachedWorldStorag
} }
@Override @Override
public boolean containWorldStateStorage(final Hash blockHash) { public boolean contains(final Hash blockHash) {
return false; return false;
} }

@ -127,9 +127,7 @@ public class BonsaiWorldStateKeyValueStorage extends DiffBasedWorldStateKeyValue
} }
public Optional<Bytes> getTrieNodeUnsafe(final Bytes key) { public Optional<Bytes> getTrieNodeUnsafe(final Bytes key) {
return composedWorldStateStorage return composedWorldStateStorage.get(TRIE_BRANCH_STORAGE, key.toArrayUnsafe()).map(Bytes::wrap);
.get(TRIE_BRANCH_STORAGE, Bytes.concatenate(key).toArrayUnsafe())
.map(Bytes::wrap);
} }
public Optional<Bytes> getStorageValueByStorageSlotKey( public Optional<Bytes> getStorageValueByStorageSlotKey(

@ -117,7 +117,7 @@ public abstract class DiffBasedWorldStateProvider implements WorldStateArchive {
@Override @Override
public boolean isWorldStateAvailable(final Hash rootHash, final Hash blockHash) { public boolean isWorldStateAvailable(final Hash rootHash, final Hash blockHash) {
return cachedWorldStorageManager.containWorldStateStorage(blockHash) return cachedWorldStorageManager.contains(blockHash)
|| persistedState.blockHash().equals(blockHash) || persistedState.blockHash().equals(blockHash)
|| worldStateKeyValueStorage.isWorldStateAvailable(rootHash, blockHash); || worldStateKeyValueStorage.isWorldStateAvailable(rootHash, blockHash);
} }

@ -15,7 +15,9 @@
package org.hyperledger.besu.ethereum.trie.diffbased.common.cache; package org.hyperledger.besu.ethereum.trie.diffbased.common.cache;
import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.worldview.BonsaiWorldState;
import org.hyperledger.besu.ethereum.trie.diffbased.common.DiffBasedWorldStateProvider; import org.hyperledger.besu.ethereum.trie.diffbased.common.DiffBasedWorldStateProvider;
import org.hyperledger.besu.ethereum.trie.diffbased.common.StorageSubscriber; import org.hyperledger.besu.ethereum.trie.diffbased.common.StorageSubscriber;
import org.hyperledger.besu.ethereum.trie.diffbased.common.storage.DiffBasedLayeredWorldStateKeyValueStorage; import org.hyperledger.besu.ethereum.trie.diffbased.common.storage.DiffBasedLayeredWorldStateKeyValueStorage;
@ -29,8 +31,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -41,6 +46,11 @@ public abstract class DiffBasedCachedWorldStorageManager implements StorageSubsc
LoggerFactory.getLogger(DiffBasedCachedWorldStorageManager.class); LoggerFactory.getLogger(DiffBasedCachedWorldStorageManager.class);
private final DiffBasedWorldStateProvider archive; private final DiffBasedWorldStateProvider archive;
private final EvmConfiguration evmConfiguration; private final EvmConfiguration evmConfiguration;
private final Cache<Hash, BlockHeader> stateRootToBlockHeaderCache =
Caffeine.newBuilder()
.maximumSize(RETAINED_LAYERS)
.expireAfterWrite(100, TimeUnit.MINUTES)
.build();
private final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage; private final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage;
private final Map<Bytes32, DiffBasedCachedWorldView> cachedWorldStatesByHash; private final Map<Bytes32, DiffBasedCachedWorldView> cachedWorldStatesByHash;
@ -104,6 +114,8 @@ public abstract class DiffBasedCachedWorldStorageManager implements StorageSubsc
((DiffBasedLayeredWorldStateKeyValueStorage) forWorldState.getWorldStateStorage()) ((DiffBasedLayeredWorldStateKeyValueStorage) forWorldState.getWorldStateStorage())
.clone())); .clone()));
} }
// add stateroot -> blockHeader cache entry
stateRootToBlockHeaderCache.put(blockHeader.getStateRoot(), blockHeader);
} }
scrubCachedLayers(blockHeader.getNumber()); scrubCachedLayers(blockHeader.getNumber());
} }
@ -192,7 +204,7 @@ public abstract class DiffBasedCachedWorldStorageManager implements StorageSubsc
}); });
} }
public boolean containWorldStateStorage(final Hash blockHash) { public boolean contains(final Hash blockHash) {
return cachedWorldStatesByHash.containsKey(blockHash); return cachedWorldStatesByHash.containsKey(blockHash);
} }
@ -200,6 +212,42 @@ public abstract class DiffBasedCachedWorldStorageManager implements StorageSubsc
this.cachedWorldStatesByHash.clear(); this.cachedWorldStatesByHash.clear();
} }
public void primeRootToBlockHashCache(final Blockchain blockchain, final int numEntries) {
// prime the stateroot-to-blockhash cache
long head = blockchain.getChainHeadHeader().getNumber();
for (long i = head; i > Math.max(0, head - numEntries); i--) {
blockchain
.getBlockHeader(i)
.ifPresent(header -> stateRootToBlockHeaderCache.put(header.getStateRoot(), header));
}
}
/**
* Returns the worldstate for the supplied root hash. If the worldstate is not already in cache,
* this method will attempt to fetch it and add it to the cache. synchronized to prevent
* concurrent loads/adds to the cache of the same root hash.
*
* @param rootHash rootHash to supply worldstate storage for
* @return Optional worldstate storage
*/
public synchronized Optional<DiffBasedWorldStateKeyValueStorage> getStorageByRootHash(
final Hash rootHash) {
return Optional.ofNullable(stateRootToBlockHeaderCache.getIfPresent(rootHash))
.flatMap(
header ->
Optional.ofNullable(cachedWorldStatesByHash.get(header.getHash()))
.map(DiffBasedCachedWorldView::getWorldStateStorage)
.or(
() -> {
// if not cached already, maybe fetch and cache this worldstate
var maybeWorldState =
archive.getMutable(header, false).map(BonsaiWorldState.class::cast);
maybeWorldState.ifPresent(
ws -> addCachedLayer(header, header.getStateRoot(), ws));
return maybeWorldState.map(BonsaiWorldState::getWorldStateStorage);
}));
}
@Override @Override
public void onClearStorage() { public void onClearStorage() {
this.cachedWorldStatesByHash.clear(); this.cachedWorldStatesByHash.clear();

@ -35,11 +35,13 @@ import org.hyperledger.besu.util.Subscribers;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.NavigableMap;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Stream; import java.util.stream.Stream;
import kotlin.Pair;
import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -120,19 +122,37 @@ public abstract class DiffBasedWorldStateKeyValueStorage
.map(Hash::wrap); .map(Hash::wrap);
} }
public Map<Bytes32, Bytes> streamFlatAccounts( public NavigableMap<Bytes32, Bytes> streamFlatAccounts(
final Bytes startKeyHash, final Bytes32 endKeyHash, final long max) { final Bytes startKeyHash, final Bytes32 endKeyHash, final long max) {
return getFlatDbStrategy() return getFlatDbStrategy()
.streamAccountFlatDatabase(composedWorldStateStorage, startKeyHash, endKeyHash, max); .streamAccountFlatDatabase(composedWorldStateStorage, startKeyHash, endKeyHash, max);
} }
public Map<Bytes32, Bytes> streamFlatStorages( public NavigableMap<Bytes32, Bytes> streamFlatAccounts(
final Bytes startKeyHash,
final Bytes32 endKeyHash,
final Predicate<Pair<Bytes32, Bytes>> takeWhile) {
return getFlatDbStrategy()
.streamAccountFlatDatabase(composedWorldStateStorage, startKeyHash, endKeyHash, takeWhile);
}
public NavigableMap<Bytes32, Bytes> streamFlatStorages(
final Hash accountHash, final Bytes startKeyHash, final Bytes32 endKeyHash, final long max) { final Hash accountHash, final Bytes startKeyHash, final Bytes32 endKeyHash, final long max) {
return getFlatDbStrategy() return getFlatDbStrategy()
.streamStorageFlatDatabase( .streamStorageFlatDatabase(
composedWorldStateStorage, accountHash, startKeyHash, endKeyHash, max); composedWorldStateStorage, accountHash, startKeyHash, endKeyHash, max);
} }
public NavigableMap<Bytes32, Bytes> streamFlatStorages(
final Hash accountHash,
final Bytes startKeyHash,
final Bytes32 endKeyHash,
final Predicate<Pair<Bytes32, Bytes>> takeWhile) {
return getFlatDbStrategy()
.streamStorageFlatDatabase(
composedWorldStateStorage, accountHash, startKeyHash, endKeyHash, takeWhile);
}
public boolean isWorldStateAvailable(final Bytes32 rootHash, final Hash blockHash) { public boolean isWorldStateAvailable(final Bytes32 rootHash, final Hash blockHash) {
return composedWorldStateStorage return composedWorldStateStorage
.get(TRIE_BRANCH_STORAGE, WORLD_ROOT_HASH_KEY) .get(TRIE_BRANCH_STORAGE, WORLD_ROOT_HASH_KEY)

@ -28,9 +28,12 @@ import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction; import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction;
import java.util.Map; import java.util.Comparator;
import java.util.NavigableMap;
import java.util.Optional; import java.util.Optional;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -46,7 +49,6 @@ import org.apache.tuweni.rlp.RLP;
* data, and storage data from the corresponding KeyValueStorage. * data, and storage data from the corresponding KeyValueStorage.
*/ */
public abstract class FlatDbStrategy { public abstract class FlatDbStrategy {
protected final MetricsSystem metricsSystem; protected final MetricsSystem metricsSystem;
protected final Counter getAccountCounter; protected final Counter getAccountCounter;
protected final Counter getAccountFoundInFlatDatabaseCounter; protected final Counter getAccountFoundInFlatDatabaseCounter;
@ -106,6 +108,10 @@ public abstract class FlatDbStrategy {
StorageSlotKey storageSlotKey, StorageSlotKey storageSlotKey,
SegmentedKeyValueStorage storageStorage); SegmentedKeyValueStorage storageStorage);
public boolean isCodeByCodeHash() {
return codeStorageStrategy instanceof CodeHashCodeStorageStrategy;
}
/* /*
* Retrieves the code data for the given code hash and account hash. * Retrieves the code data for the given code hash and account hash.
*/ */
@ -190,47 +196,86 @@ public abstract class FlatDbStrategy {
storage.clear(ACCOUNT_STORAGE_STORAGE); storage.clear(ACCOUNT_STORAGE_STORAGE);
} }
public Map<Bytes32, Bytes> streamAccountFlatDatabase( public NavigableMap<Bytes32, Bytes> streamAccountFlatDatabase(
final SegmentedKeyValueStorage storage, final SegmentedKeyValueStorage storage,
final Bytes startKeyHash, final Bytes startKeyHash,
final Bytes32 endKeyHash, final Bytes32 endKeyHash,
final long max) { final long max) {
final Stream<Pair<Bytes32, Bytes>> pairStream =
storage
.streamFromKey(
ACCOUNT_INFO_STATE, startKeyHash.toArrayUnsafe(), endKeyHash.toArrayUnsafe())
.limit(max)
.map(pair -> new Pair<>(Bytes32.wrap(pair.getKey()), Bytes.wrap(pair.getValue())));
final TreeMap<Bytes32, Bytes> collected = return toNavigableMap(accountsToPairStream(storage, startKeyHash, endKeyHash).limit(max));
pairStream.collect( }
Collectors.toMap(Pair::getFirst, Pair::getSecond, (v1, v2) -> v1, TreeMap::new));
pairStream.close(); public NavigableMap<Bytes32, Bytes> streamAccountFlatDatabase(
return collected; final SegmentedKeyValueStorage storage,
final Bytes startKeyHash,
final Bytes32 endKeyHash,
final Predicate<Pair<Bytes32, Bytes>> takeWhile) {
return toNavigableMap(
accountsToPairStream(storage, startKeyHash, endKeyHash).takeWhile(takeWhile));
} }
public Map<Bytes32, Bytes> streamStorageFlatDatabase( /** streams RLP encoded storage values using a specified stream limit. */
public NavigableMap<Bytes32, Bytes> streamStorageFlatDatabase(
final SegmentedKeyValueStorage storage, final SegmentedKeyValueStorage storage,
final Hash accountHash, final Hash accountHash,
final Bytes startKeyHash, final Bytes startKeyHash,
final Bytes32 endKeyHash, final Bytes32 endKeyHash,
final long max) { final long max) {
final Stream<Pair<Bytes32, Bytes>> pairStream =
storage
.streamFromKey(
ACCOUNT_STORAGE_STORAGE,
Bytes.concatenate(accountHash, startKeyHash).toArrayUnsafe(),
Bytes.concatenate(accountHash, endKeyHash).toArrayUnsafe())
.limit(max)
.map(
pair ->
new Pair<>(
Bytes32.wrap(Bytes.wrap(pair.getKey()).slice(Hash.SIZE)),
RLP.encodeValue(Bytes.wrap(pair.getValue()).trimLeadingZeros())));
return toNavigableMap(
storageToPairStream(storage, accountHash, startKeyHash, endKeyHash, RLP::encodeValue)
.limit(max));
}
/** streams raw storage Bytes using a specified predicate filter and value mapper. */
public NavigableMap<Bytes32, Bytes> streamStorageFlatDatabase(
final SegmentedKeyValueStorage storage,
final Hash accountHash,
final Bytes startKeyHash,
final Bytes32 endKeyHash,
final Predicate<Pair<Bytes32, Bytes>> takeWhile) {
return toNavigableMap(
storageToPairStream(storage, accountHash, startKeyHash, endKeyHash, RLP::encodeValue)
.takeWhile(takeWhile));
}
private static Stream<Pair<Bytes32, Bytes>> storageToPairStream(
final SegmentedKeyValueStorage storage,
final Hash accountHash,
final Bytes startKeyHash,
final Bytes32 endKeyHash,
final Function<Bytes, Bytes> valueMapper) {
return storage
.streamFromKey(
ACCOUNT_STORAGE_STORAGE,
Bytes.concatenate(accountHash, startKeyHash).toArrayUnsafe(),
Bytes.concatenate(accountHash, endKeyHash).toArrayUnsafe())
.map(
pair ->
new Pair<>(
Bytes32.wrap(Bytes.wrap(pair.getKey()).slice(Hash.SIZE)),
valueMapper.apply(Bytes.wrap(pair.getValue()).trimLeadingZeros())));
}
private static Stream<Pair<Bytes32, Bytes>> accountsToPairStream(
final SegmentedKeyValueStorage storage, final Bytes startKeyHash, final Bytes32 endKeyHash) {
return storage
.streamFromKey(ACCOUNT_INFO_STATE, startKeyHash.toArrayUnsafe(), endKeyHash.toArrayUnsafe())
.map(pair -> new Pair<>(Bytes32.wrap(pair.getKey()), Bytes.wrap(pair.getValue())));
}
private static NavigableMap<Bytes32, Bytes> toNavigableMap(
final Stream<Pair<Bytes32, Bytes>> pairStream) {
final TreeMap<Bytes32, Bytes> collected = final TreeMap<Bytes32, Bytes> collected =
pairStream.collect( pairStream.collect(
Collectors.toMap(Pair::getFirst, Pair::getSecond, (v1, v2) -> v1, TreeMap::new)); Collectors.toMap(
Pair::getFirst,
Pair::getSecond,
(v1, v2) -> v1,
() -> new TreeMap<>(Comparator.comparing(Bytes::toHexString))));
pairStream.close(); pairStream.close();
return collected; return collected;
} }

@ -82,7 +82,7 @@ public class FlatDbStrategyProvider {
return flatDbMode; return flatDbMode;
} }
private boolean deriveUseCodeStorageByHash( protected boolean deriveUseCodeStorageByHash(
final SegmentedKeyValueStorage composedWorldStateStorage) { final SegmentedKeyValueStorage composedWorldStateStorage) {
final boolean configCodeUsingHash = final boolean configCodeUsingHash =
dataStorageConfiguration.getUnstable().getBonsaiCodeStoredByCodeHashEnabled(); dataStorageConfiguration.getUnstable().getBonsaiCodeStoredByCodeHashEnabled();

@ -19,7 +19,6 @@ import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState; import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.proof.WorldStateProof; import org.hyperledger.besu.ethereum.proof.WorldStateProof;
import org.hyperledger.besu.ethereum.trie.MerkleTrie;
import org.hyperledger.besu.evm.worldstate.WorldState; import org.hyperledger.besu.evm.worldstate.WorldState;
import java.io.Closeable; import java.io.Closeable;
@ -31,8 +30,6 @@ import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.units.bigints.UInt256; import org.apache.tuweni.units.bigints.UInt256;
public interface WorldStateArchive extends Closeable { public interface WorldStateArchive extends Closeable {
Hash EMPTY_ROOT_HASH = Hash.wrap(MerkleTrie.EMPTY_TRIE_NODE_HASH);
Optional<WorldState> get(Hash rootHash, Hash blockHash); Optional<WorldState> get(Hash rootHash, Hash blockHash);
boolean isWorldStateAvailable(Hash rootHash, Hash blockHash); boolean isWorldStateAvailable(Hash rootHash, Hash blockHash);

@ -62,6 +62,11 @@ public class WorldStateStorageCoordinator {
forest -> forest.getAccountStorageTrieNode(nodeHash)); forest -> forest.getAccountStorageTrieNode(nodeHash));
} }
public Optional<Bytes> getCode(final Hash codeHash, final Hash accountHash) {
return applyForStrategy(
bonsai -> bonsai.getCode(codeHash, accountHash), forest -> forest.getCode(codeHash));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <STRATEGY extends WorldStateKeyValueStorage> STRATEGY getStrategy( public <STRATEGY extends WorldStateKeyValueStorage> STRATEGY getStrategy(
final Class<STRATEGY> strategyClass) { final Class<STRATEGY> strategyClass) {

@ -61,12 +61,8 @@ public class BonsaiSnapshotIsolationTests extends AbstractIsolationTests {
assertThat(res.isSuccessful()).isTrue(); assertThat(res.isSuccessful()).isTrue();
assertThat(res2.isSuccessful()).isTrue(); assertThat(res2.isSuccessful()).isTrue();
assertThat( assertThat(archive.getCachedWorldStorageManager().contains(firstBlock.getHash())).isTrue();
archive.getCachedWorldStorageManager().containWorldStateStorage(firstBlock.getHash())) assertThat(archive.getCachedWorldStorageManager().contains(secondBlock.getHash())).isTrue();
.isTrue();
assertThat(
archive.getCachedWorldStorageManager().containWorldStateStorage(secondBlock.getHash()))
.isTrue();
assertThat(archive.getMutable().get(testAddress)).isNotNull(); assertThat(archive.getMutable().get(testAddress)).isNotNull();
assertThat(archive.getMutable().get(testAddress).getBalance()) assertThat(archive.getMutable().get(testAddress).getBalance())

@ -24,7 +24,7 @@ import com.google.common.base.MoreObjects;
public class EthProtocolConfiguration { public class EthProtocolConfiguration {
public static final int DEFAULT_MAX_MESSAGE_SIZE = 10 * ByteUnits.MEGABYTE; public static final int DEFAULT_MAX_MESSAGE_SIZE = 10 * ByteUnits.MEGABYTE;
public static final int DEFAULT_MAX_GET_BLOCK_HEADERS = 192; public static final int DEFAULT_MAX_GET_BLOCK_HEADERS = 512;
public static final int DEFAULT_MAX_GET_BLOCK_BODIES = 128; public static final int DEFAULT_MAX_GET_BLOCK_BODIES = 128;
public static final int DEFAULT_MAX_GET_RECEIPTS = 256; public static final int DEFAULT_MAX_GET_RECEIPTS = 256;
public static final int DEFAULT_MAX_GET_NODE_DATA = 384; public static final int DEFAULT_MAX_GET_NODE_DATA = 384;

@ -14,12 +14,13 @@
*/ */
package org.hyperledger.besu.ethereum.eth.manager.snap; package org.hyperledger.besu.ethereum.eth.manager.snap;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.SnapProtocol; import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthMessage; import org.hyperledger.besu.ethereum.eth.manager.EthMessage;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages; import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncConfiguration;
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager; import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
@ -29,7 +30,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.rlp.RLPException; import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.Comparator; import java.util.Comparator;
@ -49,14 +50,15 @@ public class SnapProtocolManager implements ProtocolManager {
private final EthMessages snapMessages; private final EthMessages snapMessages;
public SnapProtocolManager( public SnapProtocolManager(
final List<PeerValidator> peerValidators, final WorldStateStorageCoordinator worldStateStorageCoordinator,
final SnapSyncConfiguration snapConfig,
final EthPeers ethPeers, final EthPeers ethPeers,
final EthMessages snapMessages, final EthMessages snapMessages,
final WorldStateArchive worldStateArchive) { final ProtocolContext protocolContext) {
this.ethPeers = ethPeers; this.ethPeers = ethPeers;
this.snapMessages = snapMessages; this.snapMessages = snapMessages;
this.supportedCapabilities = calculateCapabilities(); this.supportedCapabilities = calculateCapabilities();
new SnapServer(snapMessages, worldStateArchive); new SnapServer(snapConfig, snapMessages, worldStateStorageCoordinator, protocolContext);
} }
private List<Capability> calculateCapabilities() { private List<Capability> calculateCapabilities() {

@ -14,66 +14,632 @@
*/ */
package org.hyperledger.besu.ethereum.eth.manager.snap; package org.hyperledger.besu.ethereum.eth.manager.snap;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages; import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage; import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.ByteCodesMessage; import org.hyperledger.besu.ethereum.eth.messages.snap.ByteCodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetAccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetByteCodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetStorageRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetTrieNodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1; import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage; import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodesMessage; import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodesMessage;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncConfiguration;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.BonsaiWorldStateProvider;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.FlatDbMode;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.plugin.services.BesuEvents;
import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import kotlin.Pair;
import kotlin.collections.ArrayDeque; import kotlin.collections.ArrayDeque;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("unused") /** See https://github.com/ethereum/devp2p/blob/master/caps/snap.md */
class SnapServer { class SnapServer implements BesuEvents.InitialSyncCompletionListener {
private static final Logger LOGGER = LoggerFactory.getLogger(SnapServer.class);
private static final int PRIME_STATE_ROOT_CACHE_LIMIT = 128;
private static final int MAX_ENTRIES_PER_REQUEST = 100000;
private static final int MAX_RESPONSE_SIZE = 2 * 1024 * 1024;
private static final int MAX_CODE_LOOKUPS_PER_REQUEST = 1024;
private static final int MAX_TRIE_LOOKUPS_PER_REQUEST = 1024;
private static final AccountRangeMessage EMPTY_ACCOUNT_RANGE =
AccountRangeMessage.create(new HashMap<>(), new ArrayDeque<>());
private static final StorageRangeMessage EMPTY_STORAGE_RANGE =
StorageRangeMessage.create(new ArrayDeque<>(), Collections.emptyList());
private static final TrieNodesMessage EMPTY_TRIE_NODES_MESSAGE =
TrieNodesMessage.create(new ArrayList<>());
private static final ByteCodesMessage EMPTY_BYTE_CODES_MESSAGE =
ByteCodesMessage.create(new ArrayDeque<>());
static final Hash HASH_LAST = Hash.wrap(Bytes32.leftPad(Bytes.fromHexString("FF"), (byte) 0xFF));
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final AtomicLong listenerId = new AtomicLong();
private final EthMessages snapMessages; private final EthMessages snapMessages;
private final WorldStateArchive worldStateArchive;
SnapServer(final EthMessages snapMessages, final WorldStateArchive worldStateArchive) { private final WorldStateStorageCoordinator worldStateStorageCoordinator;
private final Optional<ProtocolContext> protocolContext;
// whether snap server is enabled
private final boolean snapServerEnabled;
// provide worldstate storage by root hash
private Function<Hash, Optional<BonsaiWorldStateKeyValueStorage>> worldStateStorageProvider =
__ -> Optional.empty();
SnapServer(
final SnapSyncConfiguration snapConfig,
final EthMessages snapMessages,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext) {
this.snapServerEnabled = snapConfig.isSnapServerEnabled();
this.snapMessages = snapMessages;
this.worldStateStorageCoordinator = worldStateStorageCoordinator;
this.protocolContext = Optional.of(protocolContext);
registerResponseConstructors();
// subscribe to initial sync completed events to start/stop snap server:
this.protocolContext
.flatMap(ProtocolContext::getSynchronizer)
.filter(z -> z instanceof DefaultSynchronizer)
.map(DefaultSynchronizer.class::cast)
.ifPresentOrElse(
z -> this.listenerId.set(z.subscribeInitialSync(this)),
() -> LOGGER.warn("SnapServer created without reference to sync status"));
}
/**
* Create a snap server without registering a listener for worldstate initial sync events or
* priming worldstates by root hash. Used by unit tests.
*/
@VisibleForTesting
SnapServer(
final EthMessages snapMessages,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final Function<Hash, Optional<BonsaiWorldStateKeyValueStorage>> worldStateStorageProvider) {
this.snapServerEnabled = true;
this.snapMessages = snapMessages; this.snapMessages = snapMessages;
this.worldStateArchive = worldStateArchive; this.worldStateStorageCoordinator = worldStateStorageCoordinator;
this.registerResponseConstructors(); this.worldStateStorageProvider = worldStateStorageProvider;
this.protocolContext = Optional.empty();
}
@Override
public void onInitialSyncCompleted() {
start();
}
@Override
public void onInitialSyncRestart() {
stop();
}
public synchronized SnapServer start() {
if (!isStarted.get() && snapServerEnabled) {
// if we are bonsai and full flat, we can provide a worldstate storage:
var worldStateKeyValueStorage = worldStateStorageCoordinator.worldStateKeyValueStorage();
if (worldStateKeyValueStorage.getDataStorageFormat().equals(DataStorageFormat.BONSAI)
&& worldStateStorageCoordinator.isMatchingFlatMode(FlatDbMode.FULL)) {
LOGGER.debug("Starting SnapServer with Bonsai full flat db");
var bonsaiArchive =
protocolContext
.map(ProtocolContext::getWorldStateArchive)
.map(BonsaiWorldStateProvider.class::cast);
var cachedStorageManagerOpt =
bonsaiArchive.map(archive -> archive.getCachedWorldStorageManager());
if (cachedStorageManagerOpt.isPresent()) {
var cachedStorageManager = cachedStorageManagerOpt.get();
this.worldStateStorageProvider =
rootHash ->
cachedStorageManager
.getStorageByRootHash(rootHash)
.map(BonsaiWorldStateKeyValueStorage.class::cast);
// when we start we need to build the cache of latest 128 worldstates
// trielogs-to-root-hash:
var blockchain = protocolContext.map(ProtocolContext::getBlockchain).orElse(null);
// at startup, prime the latest worldstates by roothash:
cachedStorageManager.primeRootToBlockHashCache(blockchain, PRIME_STATE_ROOT_CACHE_LIMIT);
var flatDbStrategy =
((BonsaiWorldStateKeyValueStorage)
worldStateStorageCoordinator.worldStateKeyValueStorage())
.getFlatDbStrategy();
if (!flatDbStrategy.isCodeByCodeHash()) {
LOGGER.warn("SnapServer requires code stored by codehash, but it is not enabled");
}
} else {
LOGGER.warn(
"SnapServer started without cached storage manager, this should only happen in tests");
}
isStarted.set(true);
}
}
return this;
}
public synchronized SnapServer stop() {
isStarted.set(false);
return this;
} }
private void registerResponseConstructors() { private void registerResponseConstructors() {
snapMessages.registerResponseConstructor( snapMessages.registerResponseConstructor(
SnapV1.GET_ACCOUNT_RANGE, SnapV1.GET_ACCOUNT_RANGE, messageData -> constructGetAccountRangeResponse(messageData));
messageData -> constructGetAccountRangeResponse(worldStateArchive, messageData));
snapMessages.registerResponseConstructor( snapMessages.registerResponseConstructor(
SnapV1.GET_STORAGE_RANGE, SnapV1.GET_STORAGE_RANGE, messageData -> constructGetStorageRangeResponse(messageData));
messageData -> constructGetStorageRangeResponse(worldStateArchive, messageData));
snapMessages.registerResponseConstructor( snapMessages.registerResponseConstructor(
SnapV1.GET_BYTECODES, SnapV1.GET_BYTECODES, messageData -> constructGetBytecodesResponse(messageData));
messageData -> constructGetBytecodesResponse(worldStateArchive, messageData));
snapMessages.registerResponseConstructor( snapMessages.registerResponseConstructor(
SnapV1.GET_TRIE_NODES, SnapV1.GET_TRIE_NODES, messageData -> constructGetTrieNodesResponse(messageData));
messageData -> constructGetTrieNodesResponse(worldStateArchive, messageData)); }
MessageData constructGetAccountRangeResponse(final MessageData message) {
if (!isStarted.get()) {
return EMPTY_ACCOUNT_RANGE;
}
StopWatch stopWatch = StopWatch.createStarted();
final GetAccountRangeMessage getAccountRangeMessage = GetAccountRangeMessage.readFrom(message);
final GetAccountRangeMessage.Range range = getAccountRangeMessage.range(true);
final int maxResponseBytes = Math.min(range.responseBytes().intValue(), MAX_RESPONSE_SIZE);
LOGGER
.atTrace()
.setMessage("Received getAccountRangeMessage for {} from {} to {}")
.addArgument(() -> asLogHash(range.worldStateRootHash()))
.addArgument(() -> asLogHash(range.startKeyHash()))
.addArgument(() -> asLogHash(range.endKeyHash()))
.log();
try {
return worldStateStorageProvider
.apply(range.worldStateRootHash())
.map(
storage -> {
LOGGER.trace("obtained worldstate in {}", stopWatch);
StatefulPredicate shouldContinuePredicate =
new StatefulPredicate(
"account",
stopWatch,
maxResponseBytes,
(pair) -> {
var rlpOutput = new BytesValueRLPOutput();
rlpOutput.startList();
rlpOutput.writeBytes(pair.getFirst());
rlpOutput.writeRLPBytes(pair.getSecond());
rlpOutput.endList();
return rlpOutput.encodedSize();
});
NavigableMap<Bytes32, Bytes> accounts =
storage.streamFlatAccounts(
range.startKeyHash(), range.endKeyHash(), shouldContinuePredicate);
if (accounts.isEmpty() && shouldContinuePredicate.shouldContinue.get()) {
// fetch next account after range, if it exists
LOGGER.debug(
"found no accounts in range, taking first value starting from {}",
asLogHash(range.endKeyHash()));
accounts = storage.streamFlatAccounts(range.endKeyHash(), UInt256.MAX_VALUE, 1L);
}
final var worldStateProof =
new WorldStateProofProvider(new WorldStateStorageCoordinator(storage));
final List<Bytes> proof =
worldStateProof.getAccountProofRelatedNodes(
range.worldStateRootHash(), Hash.wrap(range.startKeyHash()));
if (!accounts.isEmpty()) {
proof.addAll(
worldStateProof.getAccountProofRelatedNodes(
range.worldStateRootHash(), Hash.wrap(accounts.lastKey())));
}
var resp = AccountRangeMessage.create(accounts, proof);
if (accounts.isEmpty()) {
LOGGER.debug(
"returned empty account range message for {} to {}, proof count {}",
asLogHash(range.startKeyHash()),
asLogHash(range.endKeyHash()),
proof.size());
}
LOGGER.debug(
"returned in {} account range {} to {} with {} accounts and {} proofs, resp size {} of max {}",
stopWatch,
asLogHash(range.startKeyHash()),
asLogHash(range.endKeyHash()),
accounts.size(),
proof.size(),
resp.getSize(),
maxResponseBytes);
return resp;
})
.orElseGet(
() -> {
LOGGER.debug("returned empty account range due to worldstate not present");
return EMPTY_ACCOUNT_RANGE;
});
} catch (Exception ex) {
LOGGER.error("Unexpected exception serving account range request", ex);
}
return EMPTY_ACCOUNT_RANGE;
} }
private MessageData constructGetAccountRangeResponse( MessageData constructGetStorageRangeResponse(final MessageData message) {
final WorldStateArchive worldStateArchive, final MessageData message) { if (!isStarted.get()) {
// TODO implement return EMPTY_STORAGE_RANGE;
return AccountRangeMessage.create(new HashMap<>(), new ArrayDeque<>()); }
StopWatch stopWatch = StopWatch.createStarted();
final GetStorageRangeMessage getStorageRangeMessage = GetStorageRangeMessage.readFrom(message);
final GetStorageRangeMessage.StorageRange range = getStorageRangeMessage.range(true);
final int maxResponseBytes = Math.min(range.responseBytes().intValue(), MAX_RESPONSE_SIZE);
LOGGER
.atTrace()
.setMessage("Receive get storage range message size {} from {} to {} for {}")
.addArgument(message::getSize)
.addArgument(() -> asLogHash(range.startKeyHash()))
.addArgument(
() -> Optional.ofNullable(range.endKeyHash()).map(SnapServer::asLogHash).orElse("''"))
.addArgument(
() ->
range.hashes().stream()
.map(SnapServer::asLogHash)
.collect(Collectors.joining(",", "[", "]")))
.log();
try {
return worldStateStorageProvider
.apply(range.worldStateRootHash())
.map(
storage -> {
LOGGER.trace("obtained worldstate in {}", stopWatch);
// reusable predicate to limit by rec count and bytes:
var statefulPredicate =
new StatefulPredicate(
"storage",
stopWatch,
maxResponseBytes,
(pair) -> {
var slotRlpOutput = new BytesValueRLPOutput();
slotRlpOutput.startList();
slotRlpOutput.writeBytes(pair.getFirst());
slotRlpOutput.writeBytes(pair.getSecond());
slotRlpOutput.endList();
return slotRlpOutput.encodedSize();
});
// only honor start and end hash if request is for a single account's storage:
Bytes32 startKeyBytes, endKeyBytes;
boolean isPartialRange = false;
if (range.hashes().size() > 1) {
startKeyBytes = Bytes32.ZERO;
endKeyBytes = HASH_LAST;
} else {
startKeyBytes = range.startKeyHash();
endKeyBytes = range.endKeyHash();
isPartialRange =
!(startKeyBytes.equals(Hash.ZERO) && endKeyBytes.equals(HASH_LAST));
}
ArrayDeque<NavigableMap<Bytes32, Bytes>> collectedStorages = new ArrayDeque<>();
List<Bytes> proofNodes = new ArrayList<>();
final var worldStateProof =
new WorldStateProofProvider(new WorldStateStorageCoordinator(storage));
for (var forAccountHash : range.hashes()) {
var accountStorages =
storage.streamFlatStorages(
Hash.wrap(forAccountHash), startKeyBytes, endKeyBytes, statefulPredicate);
//// address partial range queries that return empty
if (accountStorages.isEmpty() && isPartialRange) {
// fetch next slot after range, if it exists
LOGGER.debug(
"found no slots in range, taking first value starting from {}",
asLogHash(range.endKeyHash()));
accountStorages =
storage.streamFlatStorages(
Hash.wrap(forAccountHash), range.endKeyHash(), UInt256.MAX_VALUE, 1L);
}
// don't send empty storage ranges
if (!accountStorages.isEmpty()) {
collectedStorages.add(accountStorages);
}
// if a partial storage range was requested, or we interrupted storage due to
// request limits, send proofs:
if (isPartialRange || !statefulPredicate.shouldGetMore()) {
// send a proof for the left side range origin
proofNodes.addAll(
worldStateProof.getStorageProofRelatedNodes(
getAccountStorageRoot(forAccountHash, storage),
forAccountHash,
Hash.wrap(startKeyBytes)));
if (!accountStorages.isEmpty()) {
// send a proof for the last key on the right
proofNodes.addAll(
worldStateProof.getStorageProofRelatedNodes(
getAccountStorageRoot(forAccountHash, storage),
forAccountHash,
Hash.wrap(accountStorages.lastKey())));
}
}
if (!statefulPredicate.shouldGetMore()) {
break;
}
}
var resp = StorageRangeMessage.create(collectedStorages, proofNodes);
LOGGER.debug(
"returned in {} storage {} to {} range {} to {} with {} storages and {} proofs, resp size {} of max {}",
stopWatch,
asLogHash(range.hashes().first()),
asLogHash(range.hashes().last()),
asLogHash(range.startKeyHash()),
asLogHash(range.endKeyHash()),
collectedStorages.size(),
proofNodes.size(),
resp.getSize(),
maxResponseBytes);
return resp;
})
.orElseGet(
() -> {
LOGGER.debug("returned empty storage range due to missing worldstate");
return EMPTY_STORAGE_RANGE;
});
} catch (Exception ex) {
LOGGER.error("Unexpected exception serving storage range request", ex);
return EMPTY_STORAGE_RANGE;
}
}
MessageData constructGetBytecodesResponse(final MessageData message) {
if (!isStarted.get()) {
return EMPTY_BYTE_CODES_MESSAGE;
}
StopWatch stopWatch = StopWatch.createStarted();
final GetByteCodesMessage getByteCodesMessage = GetByteCodesMessage.readFrom(message);
final GetByteCodesMessage.CodeHashes codeHashes = getByteCodesMessage.codeHashes(true);
final int maxResponseBytes = Math.min(codeHashes.responseBytes().intValue(), MAX_RESPONSE_SIZE);
LOGGER
.atTrace()
.setMessage("Received get bytecodes message for {} hashes")
.addArgument(codeHashes.hashes()::size)
.log();
try {
List<Bytes> codeBytes = new ArrayDeque<>();
var codeHashList =
(codeHashes.hashes().size() < MAX_CODE_LOOKUPS_PER_REQUEST)
? codeHashes.hashes()
: codeHashes.hashes().subList(0, MAX_CODE_LOOKUPS_PER_REQUEST);
for (Bytes32 codeHash : codeHashList) {
if (Hash.EMPTY.equals(codeHash)) {
codeBytes.add(Bytes.EMPTY);
} else {
Optional<Bytes> optCode = worldStateStorageCoordinator.getCode(Hash.wrap(codeHash), null);
if (optCode.isPresent()) {
if (sumListBytes(codeBytes) + optCode.get().size() > maxResponseBytes
|| stopWatch.getTime() > StatefulPredicate.MAX_MILLIS_PER_REQUEST) {
break;
}
codeBytes.add(optCode.get());
}
}
}
var resp = ByteCodesMessage.create(codeBytes);
LOGGER.debug(
"returned in {} code bytes message with {} entries, resp size {} of max {}",
stopWatch,
codeBytes.size(),
resp.getSize(),
maxResponseBytes);
return resp;
} catch (Exception ex) {
LOGGER.error("Unexpected exception serving bytecodes request", ex);
return EMPTY_BYTE_CODES_MESSAGE;
}
}
MessageData constructGetTrieNodesResponse(final MessageData message) {
if (!isStarted.get()) {
return EMPTY_TRIE_NODES_MESSAGE;
}
StopWatch stopWatch = StopWatch.createStarted();
final GetTrieNodesMessage getTrieNodesMessage = GetTrieNodesMessage.readFrom(message);
final GetTrieNodesMessage.TrieNodesPaths triePaths = getTrieNodesMessage.paths(true);
final int maxResponseBytes = Math.min(triePaths.responseBytes().intValue(), MAX_RESPONSE_SIZE);
LOGGER
.atTrace()
.setMessage("Received get trie nodes message of size {}")
.addArgument(() -> triePaths.paths().size())
.log();
try {
return worldStateStorageProvider
.apply(triePaths.worldStateRootHash())
.map(
storage -> {
LOGGER.trace("obtained worldstate in {}", stopWatch);
ArrayList<Bytes> trieNodes = new ArrayList<>();
var triePathList =
triePaths.paths().size() < MAX_TRIE_LOOKUPS_PER_REQUEST
? triePaths.paths()
: triePaths.paths().subList(0, MAX_TRIE_LOOKUPS_PER_REQUEST);
for (var triePath : triePathList) {
// first element in paths is account
if (triePath.size() == 1) {
// if there is only one path, presume it should be compact encoded account path
var optStorage =
storage.getTrieNodeUnsafe(CompactEncoding.decode(triePath.get(0)));
if (optStorage.isPresent()) {
if (sumListBytes(trieNodes) + optStorage.get().size() > maxResponseBytes
|| stopWatch.getTime() > StatefulPredicate.MAX_MILLIS_PER_REQUEST) {
break;
}
trieNodes.add(optStorage.get());
}
} else {
// otherwise the first element should be account hash, and subsequent paths
// are compact encoded account storage paths
final Bytes accountPrefix = triePath.get(0);
List<Bytes> storagePaths = triePath.subList(1, triePath.size());
for (var path : storagePaths) {
var optStorage =
storage.getTrieNodeUnsafe(
Bytes.concatenate(accountPrefix, CompactEncoding.decode(path)));
if (optStorage.isPresent()) {
if (sumListBytes(trieNodes) + optStorage.get().size() > maxResponseBytes) {
break;
}
trieNodes.add(optStorage.get());
}
}
}
}
var resp = TrieNodesMessage.create(trieNodes);
LOGGER.debug(
"returned in {} trie nodes message with {} entries, resp size {} of max {}",
stopWatch,
trieNodes.size(),
resp.getCode(),
maxResponseBytes);
return resp;
})
.orElseGet(
() -> {
LOGGER.debug("returned empty trie nodes message due to missing worldstate");
return EMPTY_TRIE_NODES_MESSAGE;
});
} catch (Exception ex) {
LOGGER.error("Unexpected exception serving trienodes request", ex);
return EMPTY_TRIE_NODES_MESSAGE;
}
}
static class StatefulPredicate implements Predicate<Pair<Bytes32, Bytes>> {
// default to a max of 4 seconds per request
static final long MAX_MILLIS_PER_REQUEST = 4000;
final AtomicInteger byteLimit = new AtomicInteger(0);
final AtomicInteger recordLimit = new AtomicInteger(0);
final AtomicBoolean shouldContinue = new AtomicBoolean(true);
final Function<Pair<Bytes32, Bytes>, Integer> encodingSizeAccumulator;
final StopWatch stopWatch;
final int maxResponseBytes;
// TODO: remove this hack, 10% is a fudge factor to account for the proof node size
final int maxResponseBytesFudgeFactor;
final String forWhat;
StatefulPredicate(
final String forWhat,
final StopWatch stopWatch,
final int maxResponseBytes,
final Function<Pair<Bytes32, Bytes>, Integer> encodingSizeAccumulator) {
this.stopWatch = stopWatch;
this.maxResponseBytes = maxResponseBytes;
this.maxResponseBytesFudgeFactor = maxResponseBytes * 9 / 10;
this.forWhat = forWhat;
this.encodingSizeAccumulator = encodingSizeAccumulator;
}
public boolean shouldGetMore() {
return shouldContinue.get();
}
@Override
public boolean test(final Pair<Bytes32, Bytes> pair) {
LOGGER
.atTrace()
.setMessage("{} pre-accumulate limits, bytes: {} , stream count: {}")
.addArgument(() -> forWhat)
.addArgument(byteLimit::get)
.addArgument(recordLimit::get)
.log();
if (stopWatch.getTime() > MAX_MILLIS_PER_REQUEST) {
shouldContinue.set(false);
LOGGER.warn(
"{} took too long, stopped at {} ms with {} records and {} bytes",
forWhat,
stopWatch.formatTime(),
recordLimit.get(),
byteLimit.get());
return false;
}
var underRecordLimit = recordLimit.addAndGet(1) <= MAX_ENTRIES_PER_REQUEST;
var underByteLimit =
byteLimit.accumulateAndGet(0, (cur, __) -> cur + encodingSizeAccumulator.apply(pair))
< maxResponseBytesFudgeFactor;
if (underRecordLimit && underByteLimit) {
return true;
} else {
shouldContinue.set(false);
LOGGER
.atDebug()
.setMessage("{} post-accumulate limits, bytes: {} , stream count: {}")
.addArgument(() -> forWhat)
.addArgument(byteLimit::get)
.addArgument(recordLimit::get)
.log();
return false;
}
}
} }
private MessageData constructGetStorageRangeResponse( Hash getAccountStorageRoot(
final WorldStateArchive worldStateArchive, final MessageData message) { final Bytes32 accountHash, final BonsaiWorldStateKeyValueStorage storage) {
// TODO implement return storage
return StorageRangeMessage.create(new ArrayDeque<>(), new ArrayDeque<>()); .getTrieNodeUnsafe(Bytes.concatenate(accountHash, Bytes.EMPTY))
.map(Hash::hash)
.orElse(Hash.EMPTY_TRIE_HASH);
} }
private MessageData constructGetBytecodesResponse( private static int sumListBytes(final List<Bytes> listOfBytes) {
final WorldStateArchive worldStateArchive, final MessageData message) { // TODO: remove hack, 10% is a fudge factor to account for the overhead of rlp encoding
// TODO implement return listOfBytes.stream().map(Bytes::size).reduce((a, b) -> a + b).orElse(0) * 11 / 10;
return ByteCodesMessage.create(new ArrayDeque<>());
} }
private MessageData constructGetTrieNodesResponse( private static String asLogHash(final Bytes32 hash) {
final WorldStateArchive worldStateArchive, final MessageData message) { var str = hash.toHexString();
return TrieNodesMessage.create(new ArrayDeque<>()); return str.substring(0, 4) + ".." + str.substring(59, 63);
} }
} }

@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLPInput; import org.hyperledger.besu.ethereum.rlp.RLPInput;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.bytes.Bytes32;
@ -95,6 +96,8 @@ public final class GetAccountRangeMessage extends AbstractSnapMessageData {
@Value.Immutable @Value.Immutable
public interface Range { public interface Range {
Optional<BigInteger> requestId();
Hash worldStateRootHash(); Hash worldStateRootHash();
Hash startKeyHash(); Hash startKeyHash();

@ -116,7 +116,7 @@ public final class GetStorageRangeMessage extends AbstractSnapMessageData {
} }
if (input.nextIsNull()) { if (input.nextIsNull()) {
input.skipNext(); input.skipNext();
range.endKeyHash(Hash.ZERO); range.endKeyHash(Hash.LAST);
} else { } else {
range.endKeyHash(Hash.wrap(Bytes32.wrap(input.readBytes32()))); range.endKeyHash(Hash.wrap(Bytes32.wrap(input.readBytes32())));
} }

@ -38,6 +38,7 @@ import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.BonsaiWorldStateProvi
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener; import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.log.FramedLogMessage; import org.hyperledger.besu.util.log.FramedLogMessage;
@ -362,6 +363,14 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
return syncState.unsubscribeSyncStatus(listenerId); return syncState.unsubscribeSyncStatus(listenerId);
} }
public long subscribeInitialSync(final BesuEvents.InitialSyncCompletionListener listener) {
return syncState.subscribeCompletionReached(listener);
}
public boolean unsubscribeInitialSync(final long listenerId) {
return syncState.unsubscribeInitialConditionReached(listenerId);
}
private Void finalizeSync(final Void unused) { private Void finalizeSync(final Void unused) {
LOG.info("Stopping block propagation."); LOG.info("Stopping block propagation.");
blockPropagationManager.ifPresent(BlockPropagationManager::stop); blockPropagationManager.ifPresent(BlockPropagationManager::stop);

@ -38,6 +38,8 @@ public class SnapSyncConfiguration {
public static final Boolean DEFAULT_IS_FLAT_DB_HEALING_ENABLED = Boolean.FALSE; public static final Boolean DEFAULT_IS_FLAT_DB_HEALING_ENABLED = Boolean.FALSE;
public static final Boolean DEFAULT_SNAP_SERVER_ENABLED = Boolean.FALSE;
public static SnapSyncConfiguration getDefault() { public static SnapSyncConfiguration getDefault() {
return ImmutableSnapSyncConfiguration.builder().build(); return ImmutableSnapSyncConfiguration.builder().build();
} }
@ -81,4 +83,9 @@ public class SnapSyncConfiguration {
public Boolean isFlatDbHealingEnabled() { public Boolean isFlatDbHealingEnabled() {
return DEFAULT_IS_FLAT_DB_HEALING_ENABLED; return DEFAULT_IS_FLAT_DB_HEALING_ENABLED;
} }
@Value.Default
public Boolean isSnapServerEnabled() {
return DEFAULT_SNAP_SERVER_ENABLED;
}
} }

@ -123,38 +123,41 @@ public class StackTrie {
proofsEntries.put(Hash.hash(proof), proof); proofsEntries.put(Hash.hash(proof), proof);
} }
final InnerNodeDiscoveryManager<Bytes> snapStoredNodeFactory = if (!keys.isEmpty()) {
new InnerNodeDiscoveryManager<>( final InnerNodeDiscoveryManager<Bytes> snapStoredNodeFactory =
(location, hash) -> Optional.ofNullable(proofsEntries.get(hash)), new InnerNodeDiscoveryManager<>(
Function.identity(), (location, hash) -> Optional.ofNullable(proofsEntries.get(hash)),
Function.identity(), Function.identity(),
startKeyHash, Function.identity(),
proofs.isEmpty() ? RangeManager.MAX_RANGE : keys.lastKey(), startKeyHash,
true); proofs.isEmpty() ? RangeManager.MAX_RANGE : keys.lastKey(),
true);
final MerkleTrie<Bytes, Bytes> trie =
new StoredMerklePatriciaTrie<>( final MerkleTrie<Bytes, Bytes> trie =
snapStoredNodeFactory, proofs.isEmpty() ? MerkleTrie.EMPTY_TRIE_NODE_HASH : rootHash); new StoredMerklePatriciaTrie<>(
snapStoredNodeFactory,
for (Map.Entry<Bytes32, Bytes> entry : keys.entrySet()) { proofs.isEmpty() ? MerkleTrie.EMPTY_TRIE_NODE_HASH : rootHash);
trie.put(entry.getKey(), entry.getValue());
} for (Map.Entry<Bytes32, Bytes> entry : keys.entrySet()) {
trie.put(entry.getKey(), entry.getValue());
keys.forEach(flatDatabaseUpdater::update); }
trie.commit( keys.forEach(flatDatabaseUpdater::update);
nodeUpdater,
(new SnapCommitVisitor<>( trie.commit(
nodeUpdater, nodeUpdater,
startKeyHash, (new SnapCommitVisitor<>(
proofs.isEmpty() ? RangeManager.MAX_RANGE : keys.lastKey()) { nodeUpdater,
@Override startKeyHash,
public void maybeStoreNode(final Bytes location, final Node<Bytes> node) { proofs.isEmpty() ? RangeManager.MAX_RANGE : keys.lastKey()) {
if (!node.isHealNeeded()) { @Override
super.maybeStoreNode(location, node); public void maybeStoreNode(final Bytes location, final Node<Bytes> node) {
if (!node.isHealNeeded()) {
super.maybeStoreNode(location, node);
}
} }
} }));
})); }
} }
} }

@ -160,6 +160,12 @@ public class AccountRangeDataRequest extends SnapDataRequest {
if (!accounts.isEmpty() || !proofs.isEmpty()) { if (!accounts.isEmpty() || !proofs.isEmpty()) {
if (!worldStateProofProvider.isValidRangeProof( if (!worldStateProofProvider.isValidRangeProof(
startKeyHash, endKeyHash, getRootHash(), proofs, accounts)) { startKeyHash, endKeyHash, getRootHash(), proofs, accounts)) {
// this happens on repivot and on bad proofs
LOG.atTrace()
.setMessage("invalid range proof received for account range {} {}")
.addArgument(accounts.firstKey())
.addArgument(accounts.lastKey())
.log();
isProofValid = Optional.of(false); isProofValid = Optional.of(false);
} else { } else {
stackTrie.addElement(startKeyHash, proofs, accounts); stackTrie.addElement(startKeyHash, proofs, accounts);

@ -136,6 +136,13 @@ public class StorageRangeDataRequest extends SnapDataRequest {
startKeyHash, endKeyHash, storageRoot, proofs, slots)) { startKeyHash, endKeyHash, storageRoot, proofs, slots)) {
// If the proof is invalid, it means that the storage will be a mix of several blocks. // If the proof is invalid, it means that the storage will be a mix of several blocks.
// Therefore, it will be necessary to heal the account's storage subsequently // Therefore, it will be necessary to heal the account's storage subsequently
LOG.atDebug()
.setMessage("invalid storage range proof received for account hash {} range {} {}")
.addArgument(() -> accountHash)
.addArgument(() -> slots.isEmpty() ? "none" : slots.firstKey())
.addArgument(() -> slots.isEmpty() ? "none" : slots.lastKey())
.log();
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash)); downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
// We will request the new storage root of the account because it is apparently no longer // We will request the new storage root of the account because it is apparently no longer
// valid with the new pivot block. // valid with the new pivot block.

@ -124,7 +124,7 @@ public class StorageFlatDatabaseHealingRangeRequest extends SnapDataRequest {
final NavigableMap<Bytes32, Bytes> slots, final NavigableMap<Bytes32, Bytes> slots,
final ArrayDeque<Bytes> proofs) { final ArrayDeque<Bytes> proofs) {
if (!slots.isEmpty() && !proofs.isEmpty()) { if (!slots.isEmpty() && !proofs.isEmpty()) {
// very proof in order to check if the local flat database is valid or not // verify proof in order to check if the local flat database is valid or not
isProofValid = isProofValid =
worldStateProofProvider.isValidRangeProof( worldStateProofProvider.isValidRangeProof(
startKeyHash, endKeyHash, storageRoot, proofs, slots); startKeyHash, endKeyHash, storageRoot, proofs, slots);

@ -0,0 +1,629 @@
/*
* Copyright Hyperledger Besu Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.snap;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.hyperledger.besu.ethereum.eth.manager.snap.SnapServer.HASH_LAST;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.ByteCodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetAccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetByteCodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetStorageRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetTrieNodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodesMessage;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.trie.MerkleTrie;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.diffbased.common.storage.flat.FlatDbStrategyProvider;
import org.hyperledger.besu.ethereum.trie.patricia.SimpleMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.patricia.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.ethereum.worldstate.FlatDbMode;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.ObservableMetricsSystem;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.services.kvstore.SegmentedInMemoryKeyValueStorage;
import java.math.BigInteger;
import java.util.Collections;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class SnapServerTest {
static Random rand = new Random();
record SnapTestAccount(
Hash addressHash,
StateTrieAccountValue accountValue,
MerkleTrie<Bytes32, Bytes> storage,
Bytes code) {
Bytes accountRLP() {
return RLP.encode(accountValue::writeTo);
}
}
static final ObservableMetricsSystem noopMetrics = new NoOpMetricsSystem();
final SegmentedInMemoryKeyValueStorage storage = new SegmentedInMemoryKeyValueStorage();
// force a full flat db with code stored by code hash:
final BonsaiWorldStateKeyValueStorage inMemoryStorage =
new BonsaiWorldStateKeyValueStorage(
new FlatDbStrategyProvider(noopMetrics, DataStorageConfiguration.DEFAULT_BONSAI_CONFIG) {
@Override
public FlatDbMode getFlatDbMode() {
return FlatDbMode.FULL;
}
@Override
protected boolean deriveUseCodeStorageByHash(
final SegmentedKeyValueStorage composedWorldStateStorage) {
return true;
}
},
storage,
new InMemoryKeyValueStorage());
final WorldStateStorageCoordinator storageCoordinator =
new WorldStateStorageCoordinator(inMemoryStorage);
final StoredMerklePatriciaTrie<Bytes, Bytes> storageTrie =
new StoredMerklePatriciaTrie<>(
inMemoryStorage::getAccountStateTrieNode, Function.identity(), Function.identity());
final WorldStateProofProvider proofProvider = new WorldStateProofProvider(storageCoordinator);
final Function<Hash, Optional<BonsaiWorldStateKeyValueStorage>> spyProvider =
spy(
new Function<Hash, Optional<BonsaiWorldStateKeyValueStorage>>() {
// explicit non-final class is necessary for Mockito to spy:
@Override
public Optional<BonsaiWorldStateKeyValueStorage> apply(final Hash hash) {
return Optional.of(inMemoryStorage);
}
});
final SnapServer snapServer =
new SnapServer(new EthMessages(), storageCoordinator, spyProvider).start();
final SnapTestAccount acct1 = createTestAccount("10");
final SnapTestAccount acct2 = createTestAccount("20");
final SnapTestAccount acct3 = createTestContractAccount("30", inMemoryStorage);
final SnapTestAccount acct4 = createTestContractAccount("40", inMemoryStorage);
@BeforeEach
public void setup() {
snapServer.start();
}
@Test
public void assertNoStartNoOp() {
// account found at startHash
insertTestAccounts(acct4, acct3, acct1, acct2);
// stop snap server so that we should not be processing snap requests
snapServer.stop();
var rangeData = requestAccountRange(acct1.addressHash, acct4.addressHash).accountData(false);
// assert empty account response and no attempt to fetch worldstate
assertThat(rangeData.accounts().isEmpty()).isTrue();
assertThat(rangeData.proofs().isEmpty()).isTrue();
verify(spyProvider, never()).apply(any());
// assert empty storage response and no attempt to fetch worldstate
var storageRange =
requestStorageRange(List.of(acct3.addressHash), Hash.ZERO, HASH_LAST).slotsData(false);
assertThat(storageRange.slots().isEmpty()).isTrue();
assertThat(storageRange.proofs().isEmpty()).isTrue();
verify(spyProvider, never()).apply(any());
// assert empty trie nodes response and no attempt to fetch worldstate
var trieNodes =
requestTrieNodes(storageTrie.getRootHash(), List.of(List.of(Bytes.fromHexString("0x01"))))
.nodes(false);
assertThat(trieNodes.isEmpty()).isTrue();
verify(spyProvider, never()).apply(any());
// assert empty code response and no attempt to fetch worldstate
var codes =
requestByteCodes(List.of(acct3.accountValue.getCodeHash())).bytecodes(false).codes();
assertThat(codes.isEmpty()).isTrue();
verify(spyProvider, never()).apply(any());
}
@Test
public void assertEmptyRangeLeftProofOfExclusionAndNextAccount() {
// for a range request that returns empty, we should return just a proof of exclusion on the
// left and the next account after the limit hash
insertTestAccounts(acct1, acct4);
var rangeData =
getAndVerifyAccountRangeData(requestAccountRange(acct2.addressHash, acct3.addressHash), 1);
// expect to find only one value acct4, outside the requested range
var outOfRangeVal = rangeData.accounts().entrySet().stream().findFirst();
assertThat(outOfRangeVal).isPresent();
assertThat(outOfRangeVal.get().getKey()).isEqualTo(acct4.addressHash());
// assert proofs are valid for the requested range
assertThat(assertIsValidAccountRangeProof(acct2.addressHash, rangeData)).isTrue();
}
@Test
public void assertAccountLimitRangeResponse() {
// assert we limit the range response according to size
final int acctCount = 2000;
final long acctRLPSize = 105;
List<Integer> randomLoad = IntStream.range(1, 4096).boxed().collect(Collectors.toList());
Collections.shuffle(randomLoad);
randomLoad.stream()
.forEach(
i ->
insertTestAccounts(
createTestAccount(
Bytes.concatenate(
Bytes.fromHexString("0x40"),
Bytes.fromHexStringLenient(Integer.toHexString(i * 256)))
.toHexString())));
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
tmp.writeBytes(storageTrie.getRootHash());
tmp.writeBytes(Hash.ZERO);
tmp.writeBytes(HASH_LAST);
tmp.writeBigIntegerScalar(BigInteger.valueOf(acctRLPSize * acctCount));
tmp.endList();
var tinyRangeLimit = new GetAccountRangeMessage(tmp.encoded()).wrapMessageData(BigInteger.ONE);
var rangeData =
getAndVerifyAccountRangeData(
(AccountRangeMessage) snapServer.constructGetAccountRangeResponse(tinyRangeLimit),
// TODO: after sorting out the request fudge factor, adjust this assertion to match
acctCount * 90 / 100 - 1);
// assert proofs are valid for the requested range
assertThat(assertIsValidAccountRangeProof(Hash.ZERO, rangeData)).isTrue();
}
@Test
public void assertLastEmptyRange() {
// When our final range request is empty, no next account is possible,
// and we should return just a proof of exclusion of the right
insertTestAccounts(acct1, acct2);
var rangeData =
getAndVerifyAccountRangeData(requestAccountRange(acct3.addressHash, acct4.addressHash), 0);
// assert proofs are valid for the requested range
assertThat(assertIsValidAccountRangeProof(acct3.addressHash, rangeData)).isTrue();
}
@Test
public void assertAccountFoundAtStartHashProof() {
// account found at startHash
insertTestAccounts(acct4, acct3, acct1, acct2);
var rangeData =
getAndVerifyAccountRangeData(requestAccountRange(acct1.addressHash, acct4.addressHash), 4);
// assert proofs are valid for requested range
assertThat(assertIsValidAccountRangeProof(acct1.addressHash, rangeData)).isTrue();
}
@Test
public void assertCompleteStorageForSingleAccount() {
insertTestAccounts(acct1, acct2, acct3, acct4);
var rangeData = requestStorageRange(List.of(acct3.addressHash), Hash.ZERO, HASH_LAST);
assertThat(rangeData).isNotNull();
var slotsData = rangeData.slotsData(false);
assertThat(slotsData).isNotNull();
assertThat(slotsData.slots()).isNotNull();
assertThat(slotsData.slots().size()).isEqualTo(1);
var firstAccountStorages = slotsData.slots().first();
assertThat(firstAccountStorages.size()).isEqualTo(10);
// no proofs for complete storage range:
assertThat(slotsData.proofs().size()).isEqualTo(0);
assertThat(
assertIsValidStorageProof(acct3, Hash.ZERO, firstAccountStorages, slotsData.proofs()))
.isTrue();
}
@Test
public void assertPartialStorageForSingleAccountEmptyRange() {
insertTestAccounts(acct3);
var rangeData =
requestStorageRange(
List.of(acct3.addressHash), Hash.ZERO, Hash.fromHexStringLenient("0x00ff"));
assertThat(rangeData).isNotNull();
var slotsData = rangeData.slotsData(false);
assertThat(slotsData).isNotNull();
assertThat(slotsData.slots()).isNotNull();
// expect 1 slot PAST the requested empty range
assertThat(slotsData.slots().size()).isEqualTo(1);
// expect left and right proofs for empty storage range:
assertThat(slotsData.proofs().size()).isGreaterThan(0);
// assert proofs are valid for the requested range
assertThat(
assertIsValidStorageProof(
acct3, Hash.ZERO, slotsData.slots().first(), slotsData.proofs()))
.isTrue();
}
@Test
public void assertLastEmptyPartialStorageForSingleAccount() {
// When our final range request is empty, no next account is possible,
// and we should return just a proof of exclusion of the right
insertTestAccounts(acct3);
var rangeData = requestStorageRange(List.of(acct3.addressHash), HASH_LAST, HASH_LAST);
assertThat(rangeData).isNotNull();
var slotsData = rangeData.slotsData(false);
assertThat(slotsData).isNotNull();
assertThat(slotsData.slots()).isNotNull();
// expect no slots PAST the requested empty range
assertThat(slotsData.slots().size()).isEqualTo(0);
// expect left and right proofs for empty storage range:
assertThat(slotsData.proofs().size()).isGreaterThan(0);
// assert proofs are valid for the requested range
assertThat(
assertIsValidStorageProof(
acct3,
Hash.fromHexStringLenient("0xFF"),
Collections.emptyNavigableMap(),
slotsData.proofs()))
.isTrue();
}
@Test
public void assertStorageLimitRangeResponse() {
// assert we limit the range response according to bytessize
final int storageSlotSize = 70;
final int storageSlotCount = 16;
insertTestAccounts(acct1, acct2, acct3, acct4);
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
tmp.writeBigIntegerScalar(BigInteger.ONE);
tmp.writeBytes(storageTrie.getRootHash());
tmp.writeList(
List.of(acct3.addressHash, acct4.addressHash),
(hash, rlpOutput) -> rlpOutput.writeBytes(hash));
tmp.writeBytes(Hash.ZERO);
tmp.writeBytes(HASH_LAST);
tmp.writeBigIntegerScalar(BigInteger.valueOf(storageSlotCount * storageSlotSize));
tmp.endList();
var tinyRangeLimit = new GetStorageRangeMessage(tmp.encoded());
var rangeData =
(StorageRangeMessage) snapServer.constructGetStorageRangeResponse(tinyRangeLimit);
// assert proofs are valid for the requested range
assertThat(rangeData).isNotNull();
var slotsData = rangeData.slotsData(false);
assertThat(slotsData).isNotNull();
assertThat(slotsData.slots()).isNotNull();
assertThat(slotsData.slots().size()).isEqualTo(2);
var firstAccountStorages = slotsData.slots().first();
// expecting to see complete 10 slot storage for acct3
assertThat(firstAccountStorages.size()).isEqualTo(10);
var secondAccountStorages = slotsData.slots().last();
// expecting to see only 6 since request was limited to 16 slots
// TODO: after sorting out the request fudge factor, adjust this assertion to match
assertThat(secondAccountStorages.size()).isEqualTo(6 * 90 / 100 - 1);
// proofs required for interrupted storage range:
assertThat(slotsData.proofs().size()).isNotEqualTo(0);
assertThat(
assertIsValidStorageProof(acct4, Hash.ZERO, secondAccountStorages, slotsData.proofs()))
.isTrue();
}
@Test
public void assertAccountTriePathRequest() {
insertTestAccounts(acct1, acct2, acct3, acct4);
var partialPathToAcct2 = CompactEncoding.bytesToPath(acct2.addressHash).slice(0, 1);
var partialPathToAcct1 = Bytes.fromHexString("0x01"); // first nibble is 1
var trieNodeRequest =
requestTrieNodes(
storageTrie.getRootHash(),
List.of(List.of(partialPathToAcct2), List.of(partialPathToAcct1)));
assertThat(trieNodeRequest).isNotNull();
List<Bytes> trieNodes = trieNodeRequest.nodes(false);
assertThat(trieNodes).isNotNull();
assertThat(trieNodes.size()).isEqualTo(2);
}
@Test
public void assertAccountTrieLimitRequest() {
insertTestAccounts(acct1, acct2, acct3, acct4);
final int accountNodeSize = 147;
final int accountNodeLimit = 3;
var partialPathToAcct1 = Bytes.fromHexString("0x01"); // first nibble is 1
var partialPathToAcct2 = CompactEncoding.bytesToPath(acct2.addressHash).slice(0, 1);
var partialPathToAcct3 = Bytes.fromHexString("0x03"); // first nibble is 1
var partialPathToAcct4 = Bytes.fromHexString("0x04"); // first nibble is 1
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
tmp.writeBigIntegerScalar(BigInteger.ONE);
tmp.writeBytes(storageTrie.getRootHash());
tmp.writeList(
List.of(
List.of(partialPathToAcct4),
List.of(partialPathToAcct3),
List.of(partialPathToAcct2),
List.of(partialPathToAcct1)),
(path, rlpOutput) ->
rlpOutput.writeList(path, (b, subRlpOutput) -> subRlpOutput.writeBytes(b)));
tmp.writeBigIntegerScalar(BigInteger.valueOf(accountNodeLimit * accountNodeSize));
tmp.endList();
var trieNodeRequest =
(TrieNodesMessage)
snapServer.constructGetTrieNodesResponse(new GetTrieNodesMessage(tmp.encoded()));
assertThat(trieNodeRequest).isNotNull();
List<Bytes> trieNodes = trieNodeRequest.nodes(false);
assertThat(trieNodes).isNotNull();
// TODO: adjust this assertion after sorting out the request fudge factor
assertThat(trieNodes.size()).isEqualTo(accountNodeLimit * 90 / 100);
}
@Test
public void assertStorageTriePathRequest() {
insertTestAccounts(acct1, acct2, acct3, acct4);
var pathToSlot11 = CompactEncoding.encode(Bytes.fromHexStringLenient("0x0101"));
var pathToSlot12 = CompactEncoding.encode(Bytes.fromHexStringLenient("0x0102"));
var pathToSlot1a = CompactEncoding.encode(Bytes.fromHexStringLenient("0x010A")); // not present
var trieNodeRequest =
requestTrieNodes(
storageTrie.getRootHash(),
List.of(
List.of(acct3.addressHash, pathToSlot11, pathToSlot12, pathToSlot1a),
List.of(acct4.addressHash, pathToSlot11, pathToSlot12, pathToSlot1a)));
assertThat(trieNodeRequest).isNotNull();
List<Bytes> trieNodes = trieNodeRequest.nodes(false);
assertThat(trieNodes).isNotNull();
assertThat(trieNodes.size()).isEqualTo(4);
}
@Test
public void assertStorageTrieLimitRequest() {
insertTestAccounts(acct1, acct2, acct3, acct4);
final int trieNodeSize = 69;
final int trieNodeLimit = 3;
var pathToSlot11 = CompactEncoding.encode(Bytes.fromHexStringLenient("0x0101"));
var pathToSlot12 = CompactEncoding.encode(Bytes.fromHexStringLenient("0x0102"));
var pathToSlot1a = CompactEncoding.encode(Bytes.fromHexStringLenient("0x010A")); // not present
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
tmp.writeBigIntegerScalar(BigInteger.ONE);
tmp.writeBytes(storageTrie.getRootHash());
tmp.writeList(
List.of(
List.of(acct3.addressHash, pathToSlot11, pathToSlot12, pathToSlot1a),
List.of(acct4.addressHash, pathToSlot11, pathToSlot12, pathToSlot1a)),
(path, rlpOutput) ->
rlpOutput.writeList(path, (b, subRlpOutput) -> subRlpOutput.writeBytes(b)));
tmp.writeBigIntegerScalar(BigInteger.valueOf(trieNodeLimit * trieNodeSize));
tmp.endList();
var trieNodeRequest =
(TrieNodesMessage)
snapServer.constructGetTrieNodesResponse(new GetTrieNodesMessage(tmp.encoded()));
assertThat(trieNodeRequest).isNotNull();
List<Bytes> trieNodes = trieNodeRequest.nodes(false);
assertThat(trieNodes).isNotNull();
// TODO: adjust this assertion after sorting out the request fudge factor
assertThat(trieNodes.size()).isEqualTo(trieNodeLimit * 90 / 100);
}
@Test
public void assertCodePresent() {
insertTestAccounts(acct1, acct2, acct3, acct4);
var codeRequest =
requestByteCodes(
List.of(acct3.accountValue.getCodeHash(), acct4.accountValue.getCodeHash()));
assertThat(codeRequest).isNotNull();
ByteCodesMessage.ByteCodes codes = codeRequest.bytecodes(false);
assertThat(codes).isNotNull();
assertThat(codes.codes().size()).isEqualTo(2);
}
@Test
public void assertCodeLimitRequest() {
insertTestAccounts(acct1, acct2, acct3, acct4);
final int codeSize = 32;
final int codeLimit = 2;
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
tmp.writeBigIntegerScalar(BigInteger.ONE);
tmp.writeList(
List.of(acct3.accountValue.getCodeHash(), acct4.accountValue.getCodeHash()),
(hash, rlpOutput) -> rlpOutput.writeBytes(hash));
tmp.writeBigIntegerScalar(BigInteger.valueOf(codeSize * codeLimit));
tmp.endList();
var codeRequest =
(ByteCodesMessage)
snapServer.constructGetBytecodesResponse(new GetByteCodesMessage(tmp.encoded()));
assertThat(codeRequest).isNotNull();
ByteCodesMessage.ByteCodes codes = codeRequest.bytecodes(false);
assertThat(codes).isNotNull();
// TODO adjust this assertion after sorting out the request fudge factor
assertThat(codes.codes().size()).isEqualTo(codeLimit * 90 / 100);
}
static SnapTestAccount createTestAccount(final String hexAddr) {
return new SnapTestAccount(
Hash.wrap(Bytes32.rightPad(Bytes.fromHexString(hexAddr))),
new StateTrieAccountValue(
rand.nextInt(0, 1), Wei.of(rand.nextLong(0L, 1L)), Hash.EMPTY_TRIE_HASH, Hash.EMPTY),
new SimpleMerklePatriciaTrie<>(a -> a),
Bytes.EMPTY);
}
static SnapTestAccount createTestContractAccount(
final String hexAddr, final BonsaiWorldStateKeyValueStorage storage) {
Hash acctHash = Hash.wrap(Bytes32.rightPad(Bytes.fromHexString(hexAddr)));
MerkleTrie<Bytes32, Bytes> trie =
new StoredMerklePatriciaTrie<>(
(loc, hash) -> storage.getAccountStorageTrieNode(acctHash, loc, hash),
Hash.EMPTY_TRIE_HASH,
a -> a,
a -> a);
Bytes32 mockCode = Bytes32.random();
// mock some storage data
var flatdb = storage.getFlatDbStrategy();
var updater = storage.updater();
updater.putCode(Hash.hash(mockCode), mockCode);
IntStream.range(10, 20)
.boxed()
.forEach(
i -> {
Bytes32 mockBytes32 = Bytes32.rightPad(Bytes.fromHexString(i.toString()));
var rlpOut = new BytesValueRLPOutput();
rlpOut.writeBytes(mockBytes32);
trie.put(mockBytes32, rlpOut.encoded());
flatdb.putFlatAccountStorageValueByStorageSlotHash(
updater.getWorldStateTransaction(),
acctHash,
Hash.wrap(mockBytes32),
mockBytes32);
});
trie.commit(
(location, key, value) ->
updater.putAccountStorageTrieNode(acctHash, location, key, value));
updater.commit();
return new SnapTestAccount(
acctHash,
new StateTrieAccountValue(
rand.nextInt(0, 1), Wei.of(rand.nextLong(0L, 1L)),
Hash.wrap(trie.getRootHash()), Hash.hash(mockCode)),
trie,
mockCode);
}
void insertTestAccounts(final SnapTestAccount... accounts) {
final var updater = inMemoryStorage.updater();
for (SnapTestAccount account : accounts) {
updater.putAccountInfoState(account.addressHash(), account.accountRLP());
storageTrie.put(account.addressHash(), account.accountRLP());
}
storageTrie.commit(updater::putAccountStateTrieNode);
updater.commit();
}
boolean assertIsValidAccountRangeProof(
final Hash startHash, final AccountRangeMessage.AccountRangeData accountRange) {
Bytes32 lastKey =
Optional.of(accountRange.accounts())
.filter(z -> z.size() > 0)
.map(NavigableMap::lastKey)
.orElse(startHash);
return proofProvider.isValidRangeProof(
startHash,
lastKey,
storageTrie.getRootHash(),
accountRange.proofs(),
accountRange.accounts());
}
boolean assertIsValidStorageProof(
final SnapTestAccount account,
final Hash startHash,
final NavigableMap<Bytes32, Bytes> slotRangeData,
final List<Bytes> proofs) {
Bytes32 lastKey =
Optional.of(slotRangeData)
.filter(z -> z.size() > 0)
.map(NavigableMap::lastKey)
.orElse(startHash);
// this is only working for single account ranges for now
return proofProvider.isValidRangeProof(
startHash, lastKey, account.accountValue.getStorageRoot(), proofs, slotRangeData);
}
AccountRangeMessage requestAccountRange(final Hash startHash, final Hash limitHash) {
return (AccountRangeMessage)
snapServer.constructGetAccountRangeResponse(
GetAccountRangeMessage.create(
Hash.wrap(storageTrie.getRootHash()), startHash, limitHash)
.wrapMessageData(BigInteger.ONE));
}
StorageRangeMessage requestStorageRange(
final List<Bytes32> accountHashes, final Hash startHash, final Hash limitHash) {
return (StorageRangeMessage)
snapServer.constructGetStorageRangeResponse(
GetStorageRangeMessage.create(
Hash.wrap(storageTrie.getRootHash()), accountHashes, startHash, limitHash)
.wrapMessageData(BigInteger.ONE));
}
TrieNodesMessage requestTrieNodes(final Bytes32 rootHash, final List<List<Bytes>> trieNodesList) {
return (TrieNodesMessage)
snapServer.constructGetTrieNodesResponse(
GetTrieNodesMessage.create(Hash.wrap(rootHash), trieNodesList)
.wrapMessageData(BigInteger.ONE));
}
ByteCodesMessage requestByteCodes(final List<Bytes32> codeHashes) {
return (ByteCodesMessage)
snapServer.constructGetBytecodesResponse(
GetByteCodesMessage.create(codeHashes).wrapMessageData(BigInteger.ONE));
}
AccountRangeMessage.AccountRangeData getAndVerifyAccountRangeData(
final AccountRangeMessage range, final int expectedSize) {
assertThat(range).isNotNull();
var accountData = range.accountData(false);
assertThat(accountData).isNotNull();
assertThat(accountData.accounts().size()).isEqualTo(expectedSize);
return accountData;
}
}

@ -1,61 +0,0 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;
import java.util.Collections;
public class SnapProtocolManagerTestUtil {
public static SnapProtocolManager create(final EthPeers ethPeers) {
return create(
BlockchainSetupUtil.forTesting(DataStorageFormat.FOREST).getWorldArchive(), ethPeers);
}
public static SnapProtocolManager create(
final WorldStateArchive worldStateArchive, final EthPeers ethPeers) {
EthMessages messages = new EthMessages();
return new SnapProtocolManager(Collections.emptyList(), ethPeers, messages, worldStateArchive);
}
public static SnapProtocolManager create(
final WorldStateArchive worldStateArchive,
final EthPeers ethPeers,
final EthMessages snapMessages) {
return new SnapProtocolManager(
Collections.emptyList(), ethPeers, snapMessages, worldStateArchive);
}
public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager,
final SnapProtocolManager snapProtocolManager,
final long estimatedHeight) {
return RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.snapProtocolManager(snapProtocolManager)
.estimatedHeight(estimatedHeight)
.build();
}
}

@ -24,6 +24,13 @@ public abstract class CompactEncoding {
public static final byte LEAF_TERMINATOR = 0x10; public static final byte LEAF_TERMINATOR = 0x10;
/**
* Converts a byte sequence into a path by splitting each byte into two nibbles. The resulting
* path is terminated with a leaf terminator.
*
* @param bytes the byte sequence to convert into a path
* @return the resulting path
*/
public static Bytes bytesToPath(final Bytes bytes) { public static Bytes bytesToPath(final Bytes bytes) {
final MutableBytes path = MutableBytes.create(bytes.size() * 2 + 1); final MutableBytes path = MutableBytes.create(bytes.size() * 2 + 1);
int j = 0; int j = 0;
@ -36,6 +43,15 @@ public abstract class CompactEncoding {
return path; return path;
} }
/**
* Converts a path into a byte sequence by combining each pair of nibbles into a byte. The path
* must be a leaf path, i.e., it must be terminated with a leaf terminator.
*
* @param path the path to convert into a byte sequence
* @return the resulting byte sequence
* @throws IllegalArgumentException if the path is empty or not a leaf path, or if it contains
* elements larger than a nibble
*/
public static Bytes pathToBytes(final Bytes path) { public static Bytes pathToBytes(final Bytes path) {
checkArgument(!path.isEmpty(), "Path must not be empty"); checkArgument(!path.isEmpty(), "Path must not be empty");
checkArgument(path.get(path.size() - 1) == LEAF_TERMINATOR, "Path must be a leaf path"); checkArgument(path.get(path.size() - 1) == LEAF_TERMINATOR, "Path must be a leaf path");
@ -52,6 +68,14 @@ public abstract class CompactEncoding {
return bytes; return bytes;
} }
/**
* Encodes a path into a compact form. The encoding includes a metadata byte that indicates
* whether the path is a leaf path and whether its length is odd or even.
*
* @param path the path to encode
* @return the encoded path
* @throws IllegalArgumentException if the path contains elements larger than a nibble
*/
public static Bytes encode(final Bytes path) { public static Bytes encode(final Bytes path) {
int size = path.size(); int size = path.size();
final boolean isLeaf = size > 0 && path.get(size - 1) == LEAF_TERMINATOR; final boolean isLeaf = size > 0 && path.get(size - 1) == LEAF_TERMINATOR;
@ -88,6 +112,14 @@ public abstract class CompactEncoding {
return encoded; return encoded;
} }
/**
* Decodes a path from its compact form. The decoding process takes into account the metadata byte
* that indicates whether the path is a leaf path and whether its length is odd or even.
*
* @param encoded the encoded path to decode
* @return the decoded path
* @throws IllegalArgumentException if the encoded path is empty or its metadata byte is invalid
*/
public static Bytes decode(final Bytes encoded) { public static Bytes decode(final Bytes encoded) {
final int size = encoded.size(); final int size = encoded.size();
checkArgument(size > 0); checkArgument(size > 0);

@ -18,7 +18,7 @@ import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import java.io.PrintStream; import java.io.PrintStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.NavigableMap;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -57,9 +57,9 @@ public class InMemoryKeyValueStorage extends SegmentedKeyValueStorageAdapter {
} }
}; };
private static ConcurrentMap<SegmentIdentifier, Map<Bytes, Optional<byte[]>>> asSegmentMap( private static ConcurrentMap<SegmentIdentifier, NavigableMap<Bytes, Optional<byte[]>>>
final Map<Bytes, Optional<byte[]>> initialMap) { asSegmentMap(final NavigableMap<Bytes, Optional<byte[]>> initialMap) {
final ConcurrentMap<SegmentIdentifier, Map<Bytes, Optional<byte[]>>> segmentMap = final ConcurrentMap<SegmentIdentifier, NavigableMap<Bytes, Optional<byte[]>>> segmentMap =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
segmentMap.put(SEGMENT_IDENTIFIER, initialMap); segmentMap.put(SEGMENT_IDENTIFIER, initialMap);
return segmentMap; return segmentMap;
@ -78,7 +78,7 @@ public class InMemoryKeyValueStorage extends SegmentedKeyValueStorageAdapter {
* *
* @param initialMap the initial map * @param initialMap the initial map
*/ */
public InMemoryKeyValueStorage(final Map<Bytes, Optional<byte[]>> initialMap) { public InMemoryKeyValueStorage(final NavigableMap<Bytes, Optional<byte[]>> initialMap) {
super(SEGMENT_IDENTIFIER, new SegmentedInMemoryKeyValueStorage(asSegmentMap(initialMap))); super(SEGMENT_IDENTIFIER, new SegmentedInMemoryKeyValueStorage(asSegmentMap(initialMap)));
rwLock = ((SegmentedInMemoryKeyValueStorage) storage).rwLock; rwLock = ((SegmentedInMemoryKeyValueStorage) storage).rwLock;
} }

@ -15,6 +15,10 @@
*/ */
package org.hyperledger.besu.services.kvstore; package org.hyperledger.besu.services.kvstore;
import static java.util.Spliterator.DISTINCT;
import static java.util.Spliterator.ORDERED;
import static java.util.Spliterator.SORTED;
import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage;
@ -22,13 +26,17 @@ import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTran
import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional; import java.util.Optional;
import java.util.Spliterators;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import com.google.common.collect.Streams; import com.google.common.collect.Streams;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@ -60,7 +68,7 @@ public class LayeredKeyValueStorage extends SegmentedInMemoryKeyValueStorage
* @param parent the parent key value storage for this layered storage. * @param parent the parent key value storage for this layered storage.
*/ */
public LayeredKeyValueStorage( public LayeredKeyValueStorage(
final ConcurrentMap<SegmentIdentifier, Map<Bytes, Optional<byte[]>>> map, final ConcurrentMap<SegmentIdentifier, NavigableMap<Bytes, Optional<byte[]>>> map,
final SegmentedKeyValueStorage parent) { final SegmentedKeyValueStorage parent) {
super(map); super(map);
this.parent = parent; this.parent = parent;
@ -82,7 +90,7 @@ public class LayeredKeyValueStorage extends SegmentedInMemoryKeyValueStorage
try { try {
Bytes wrapKey = Bytes.wrap(key); Bytes wrapKey = Bytes.wrap(key);
final Optional<byte[]> foundKey = final Optional<byte[]> foundKey =
hashValueStore.computeIfAbsent(segmentId, __ -> new HashMap<>()).get(wrapKey); hashValueStore.computeIfAbsent(segmentId, __ -> newSegmentMap()).get(wrapKey);
if (foundKey == null) { if (foundKey == null) {
return parent.get(segmentId, key); return parent.get(segmentId, key);
} else { } else {
@ -116,28 +124,63 @@ public class LayeredKeyValueStorage extends SegmentedInMemoryKeyValueStorage
@Override @Override
public Stream<Pair<byte[], byte[]>> stream(final SegmentIdentifier segmentId) { public Stream<Pair<byte[], byte[]>> stream(final SegmentIdentifier segmentId) {
throwIfClosed(); throwIfClosed();
var ourLayerState = hashValueStore.computeIfAbsent(segmentId, s -> newSegmentMap());
final Lock lock = rwLock.readLock(); // otherwise, interleave the sorted streams:
lock.lock(); final PeekingIterator<Map.Entry<Bytes, Optional<byte[]>>> ourIterator =
try { new PeekingIterator<>(
// copy of our in memory store to use for streaming and filtering: ourLayerState.entrySet().stream()
var ourLayerState = .filter(entry -> entry.getValue().isPresent())
Optional.ofNullable(hashValueStore.get(segmentId)) .iterator());
.map(HashMap::new)
.orElse(new HashMap<>());
return Streams.concat( final PeekingIterator<Pair<byte[], byte[]>> parentIterator =
ourLayerState.entrySet().stream() new PeekingIterator<>(parent.stream(segmentId).iterator());
.filter(entry -> entry.getValue().isPresent())
.map( return StreamSupport.stream(
bytesEntry -> Spliterators.spliteratorUnknownSize(
Pair.of(bytesEntry.getKey().toArrayUnsafe(), bytesEntry.getValue().get())) new Iterator<>() {
// since we are layered, concat a parent stream filtered by our map entries: @Override
, public boolean hasNext() {
parent.stream(segmentId).filter(e -> !ourLayerState.containsKey(Bytes.of(e.getLeft())))); return ourIterator.hasNext() || parentIterator.hasNext();
} finally { }
lock.unlock();
} private Pair<byte[], byte[]> mapEntryToPair(
final Map.Entry<Bytes, Optional<byte[]>> entry) {
return Optional.of(entry)
.map(
e ->
Pair.of(
e.getKey().toArrayUnsafe(),
e.getValue().orElseGet(() -> new byte[0])))
.get();
}
@Override
public Pair<byte[], byte[]> next() {
var ourPeek = ourIterator.peek();
var parentPeek = parentIterator.peek();
if (ourPeek == null || parentPeek == null) {
return ourPeek == null
? parentIterator.next()
: mapEntryToPair(ourIterator.next());
}
// otherwise compare:
int comparison = ourPeek.getKey().compareTo(Bytes.wrap(parentPeek.getKey()));
if (comparison < 0) {
return mapEntryToPair(ourIterator.next());
} else if (comparison == 0) {
// skip dupe key from parent, return ours:
parentIterator.next();
return mapEntryToPair(ourIterator.next());
} else {
return parentIterator.next();
}
}
},
ORDERED | SORTED | DISTINCT),
false);
} }
@Override @Override
@ -186,7 +229,7 @@ public class LayeredKeyValueStorage extends SegmentedInMemoryKeyValueStorage
@Override @Override
public boolean tryDelete(final SegmentIdentifier segmentId, final byte[] key) { public boolean tryDelete(final SegmentIdentifier segmentId, final byte[] key) {
hashValueStore hashValueStore
.computeIfAbsent(segmentId, __ -> new HashMap<>()) .computeIfAbsent(segmentId, __ -> newSegmentMap())
.put(Bytes.wrap(key), Optional.empty()); .put(Bytes.wrap(key), Optional.empty());
return true; return true;
} }
@ -206,7 +249,7 @@ public class LayeredKeyValueStorage extends SegmentedInMemoryKeyValueStorage
.forEach( .forEach(
entry -> entry ->
hashValueStore hashValueStore
.computeIfAbsent(entry.getKey(), __ -> new HashMap<>()) .computeIfAbsent(entry.getKey(), __ -> newSegmentMap())
.putAll(entry.getValue())); .putAll(entry.getValue()));
// put empty rather than remove in order to not ask parent in case of deletion // put empty rather than remove in order to not ask parent in case of deletion
@ -214,7 +257,7 @@ public class LayeredKeyValueStorage extends SegmentedInMemoryKeyValueStorage
.forEach( .forEach(
segmentEntry -> segmentEntry ->
hashValueStore hashValueStore
.computeIfAbsent(segmentEntry.getKey(), __ -> new HashMap<>()) .computeIfAbsent(segmentEntry.getKey(), __ -> newSegmentMap())
.putAll( .putAll(
segmentEntry.getValue().stream() segmentEntry.getValue().stream()
.collect( .collect(
@ -246,4 +289,30 @@ public class LayeredKeyValueStorage extends SegmentedInMemoryKeyValueStorage
throw new StorageException("Storage has been closed"); throw new StorageException("Storage has been closed");
} }
} }
private static class PeekingIterator<E> implements Iterator<E> {
private final Iterator<E> iterator;
private E next;
public PeekingIterator(final Iterator<E> iterator) {
this.iterator = iterator;
this.next = iterator.hasNext() ? iterator.next() : null;
}
public E peek() {
return next;
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public E next() {
E oldNext = next;
next = iterator.hasNext() ? iterator.next() : null;
return oldNext;
}
}
} }

@ -24,15 +24,18 @@ import org.hyperledger.besu.plugin.services.storage.SnappableKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -49,11 +52,35 @@ import org.apache.tuweni.bytes.Bytes;
public class SegmentedInMemoryKeyValueStorage public class SegmentedInMemoryKeyValueStorage
implements SnappedKeyValueStorage, SnappableKeyValueStorage, SegmentedKeyValueStorage { implements SnappedKeyValueStorage, SnappableKeyValueStorage, SegmentedKeyValueStorage {
/** protected access for the backing hash map. */ /** protected access for the backing hash map. */
final ConcurrentMap<SegmentIdentifier, Map<Bytes, Optional<byte[]>>> hashValueStore; final ConcurrentMap<SegmentIdentifier, NavigableMap<Bytes, Optional<byte[]>>> hashValueStore;
/** protected access to the rw lock. */ /** protected access to the rw lock. */
protected final ReadWriteLock rwLock = new ReentrantReadWriteLock(); protected final ReadWriteLock rwLock = new ReentrantReadWriteLock();
/**
* Create a navigable segment map, with a compatible Bytes comparator
*
* @return segment map
*/
protected static NavigableMap<Bytes, Optional<byte[]>> newSegmentMap() {
return newSegmentMap(Collections.emptyMap());
}
/**
* Create and populate a navigable segment map, with a compatible Bytes comparator.
*
* @param sourceMap sourcemap to initialize the segmentmap with.
* @return populated segment map
*/
protected static NavigableMap<Bytes, Optional<byte[]>> newSegmentMap(
final Map<Bytes, Optional<byte[]>> sourceMap) {
// comparing by string to prevent Bytes comparator from collapsing zeroes
NavigableMap<Bytes, Optional<byte[]>> segMap =
new ConcurrentSkipListMap<>(Comparator.comparing(Bytes::toHexString));
segMap.putAll(sourceMap);
return segMap;
}
/** Instantiates a new In memory key value storage. */ /** Instantiates a new In memory key value storage. */
public SegmentedInMemoryKeyValueStorage() { public SegmentedInMemoryKeyValueStorage() {
this(new ConcurrentHashMap<>()); this(new ConcurrentHashMap<>());
@ -65,7 +92,8 @@ public class SegmentedInMemoryKeyValueStorage
* @param hashValueStore the hash value store * @param hashValueStore the hash value store
*/ */
protected SegmentedInMemoryKeyValueStorage( protected SegmentedInMemoryKeyValueStorage(
final ConcurrentMap<SegmentIdentifier, Map<Bytes, Optional<byte[]>>> hashValueStore) { final ConcurrentMap<SegmentIdentifier, NavigableMap<Bytes, Optional<byte[]>>>
hashValueStore) {
this.hashValueStore = hashValueStore; this.hashValueStore = hashValueStore;
} }
@ -79,8 +107,8 @@ public class SegmentedInMemoryKeyValueStorage
segments.stream() segments.stream()
.collect( .collect(
Collectors Collectors
.<SegmentIdentifier, SegmentIdentifier, Map<Bytes, Optional<byte[]>>> .<SegmentIdentifier, SegmentIdentifier, NavigableMap<Bytes, Optional<byte[]>>>
toConcurrentMap(s -> s, s -> new ConcurrentHashMap<>()))); toConcurrentMap(s -> s, s -> newSegmentMap())));
} }
@Override @Override
@ -107,7 +135,7 @@ public class SegmentedInMemoryKeyValueStorage
lock.lock(); lock.lock();
try { try {
return hashValueStore return hashValueStore
.computeIfAbsent(segmentIdentifier, s -> new HashMap<>()) .computeIfAbsent(segmentIdentifier, s -> newSegmentMap())
.getOrDefault(Bytes.wrap(key), Optional.empty()); .getOrDefault(Bytes.wrap(key), Optional.empty());
} finally { } finally {
lock.unlock(); lock.unlock();
@ -127,7 +155,7 @@ public class SegmentedInMemoryKeyValueStorage
(Map.Entry<Bytes, Optional<byte[]>> a) -> a.getKey().commonPrefixLength(key)) (Map.Entry<Bytes, Optional<byte[]>> a) -> a.getKey().commonPrefixLength(key))
.thenComparing(Map.Entry.comparingByKey()); .thenComparing(Map.Entry.comparingByKey());
return this.hashValueStore return this.hashValueStore
.computeIfAbsent(segmentIdentifier, s -> new HashMap<>()) .computeIfAbsent(segmentIdentifier, s -> newSegmentMap())
.entrySet() .entrySet()
.stream() .stream()
// only return keys equal to or less than // only return keys equal to or less than
@ -164,9 +192,10 @@ public class SegmentedInMemoryKeyValueStorage
lock.lock(); lock.lock();
try { try {
return ImmutableSet.copyOf( return ImmutableSet.copyOf(
hashValueStore.computeIfAbsent(segmentIdentifier, s -> new HashMap<>()).entrySet()) hashValueStore.computeIfAbsent(segmentIdentifier, s -> newSegmentMap()).entrySet())
.stream() .stream()
.filter(bytesEntry -> bytesEntry.getValue().isPresent()) .filter(bytesEntry -> bytesEntry.getValue().isPresent())
.sorted(Map.Entry.comparingByKey())
.map( .map(
bytesEntry -> bytesEntry ->
Pair.of(bytesEntry.getKey().toArrayUnsafe(), bytesEntry.getValue().get())); Pair.of(bytesEntry.getKey().toArrayUnsafe(), bytesEntry.getValue().get()));
@ -199,7 +228,7 @@ public class SegmentedInMemoryKeyValueStorage
lock.lock(); lock.lock();
try { try {
return ImmutableMap.copyOf( return ImmutableMap.copyOf(
hashValueStore.computeIfAbsent(segmentIdentifier, s -> new HashMap<>())) hashValueStore.computeIfAbsent(segmentIdentifier, s -> newSegmentMap()))
.entrySet() .entrySet()
.stream() .stream()
.filter(bytesEntry -> bytesEntry.getValue().isPresent()) .filter(bytesEntry -> bytesEntry.getValue().isPresent())
@ -244,8 +273,7 @@ public class SegmentedInMemoryKeyValueStorage
return new SegmentedInMemoryKeyValueStorage( return new SegmentedInMemoryKeyValueStorage(
hashValueStore.entrySet().stream() hashValueStore.entrySet().stream()
.collect( .collect(
Collectors.toConcurrentMap( Collectors.toConcurrentMap(Map.Entry::getKey, e -> newSegmentMap(e.getValue()))));
Map.Entry::getKey, e -> new ConcurrentHashMap<>(e.getValue()))));
} }
@Override @Override
@ -287,7 +315,7 @@ public class SegmentedInMemoryKeyValueStorage
.forEach( .forEach(
entry -> entry ->
hashValueStore hashValueStore
.computeIfAbsent(entry.getKey(), __ -> new HashMap<>()) .computeIfAbsent(entry.getKey(), __ -> newSegmentMap())
.putAll(entry.getValue())); .putAll(entry.getValue()));
removedKeys.entrySet().stream() removedKeys.entrySet().stream()
@ -295,7 +323,7 @@ public class SegmentedInMemoryKeyValueStorage
entry -> { entry -> {
var keyset = var keyset =
hashValueStore hashValueStore
.computeIfAbsent(entry.getKey(), __ -> new HashMap<>()) .computeIfAbsent(entry.getKey(), __ -> newSegmentMap())
.keySet(); .keySet();
keyset.removeAll(entry.getValue()); keyset.removeAll(entry.getValue());
}); });

Loading…
Cancel
Save