Snapsync persist state (#4381)

This PR avoids restarting the download of the world state from scratch when restarting Besu

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
pull/4586/head
matkt 2 years ago committed by GitHub
parent 6f20060182
commit da9b10767a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 1
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  3. 1
      ethereum/core/build.gradle
  4. 4
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java
  5. 4
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/WorldStateKeyValueStorage.java
  6. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  7. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/GenericKeyValueStorageFacade.java
  8. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java
  9. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java
  10. 138
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapPersistedContext.java
  11. 42
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java
  12. 31
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java
  13. 23
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java
  14. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/SnapDataRequest.java
  15. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java
  16. 3
      plugin-api/build.gradle
  17. 12
      plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorage.java
  18. 59
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDbIterator.java
  19. 14
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueSnapshot.java
  20. 26
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java
  21. 11
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBSnapshotTransaction.java
  22. 25
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBKeyValueStorage.java
  23. 11
      plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java
  24. 29
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java
  25. 29
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LimitedInMemoryKeyValueStorage.java
  26. 7
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java
  27. 13
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java
  28. 290
      services/tasks/src/main/java/org/hyperledger/besu/services/tasks/FlatFileTaskCollection.java
  29. 6
      services/tasks/src/main/java/org/hyperledger/besu/services/tasks/InMemoryTaskQueue.java
  30. 90
      services/tasks/src/test/java/org/hyperledger/besu/services/tasks/FlatFileTaskCollectionTest.java
  31. 3
      testutil/src/main/java/org/hyperledger/besu/kvstore/AbstractKeyValueStorageTest.java

@ -6,6 +6,7 @@
### Additions and Improvements
- Updated jackson-databind library to version 2.13.4.2 addressing [CVE-2022-42003](https://nvd.nist.gov/vuln/detail/CVE-2022-42003)
- Gradle task allows custom docker image configs e.g. `./gradlew distDocker -PdockerImageName=my/besu -PdockerVariants=openjdk-17,openjdk-19`
- Update snapsync feature to avoid restarting the download of the world state from scratch when restarting Besu [#4381](https://github.com/hyperledger/besu/pull/4381)
### Bug Fixes
- Fixed default fromBlock value and improved parameter interpretation in eth_getLogs RPC handler [#4513](https://github.com/hyperledger/besu/pull/4513)

@ -487,6 +487,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
ethContext,
syncState,
dataDirectory,
storageProvider,
clock,
metricsSystem,
getFullSyncTerminationCondition(protocolContext.getBlockchain()),

@ -48,6 +48,7 @@ dependencies {
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'io.vertx:vertx-core'
implementation 'net.java.dev.jna:jna'
implementation 'org.apache.commons:commons-lang3'
implementation 'org.apache.tuweni:tuweni-bytes'
implementation 'org.apache.tuweni:tuweni-concurrent'
implementation 'org.apache.tuweni:tuweni-units'

@ -33,7 +33,9 @@ public enum KeyValueSegmentIdentifier implements SegmentIdentifier {
GOQUORUM_PRIVATE_STORAGE(new byte[] {12}),
BACKWARD_SYNC_HEADERS(new byte[] {13}),
BACKWARD_SYNC_BLOCKS(new byte[] {14}),
BACKWARD_SYNC_CHAIN(new byte[] {15});
BACKWARD_SYNC_CHAIN(new byte[] {15}),
SNAPSYNC_MISSING_ACCOUNT_RANGE(new byte[] {16}),
SNAPSYNC_ACCOUNT_TO_FIX(new byte[] {17});
private final byte[] id;
private final int[] versionList;

@ -110,8 +110,8 @@ public class WorldStateKeyValueStorage implements WorldStateStorage {
@Override
public long prune(final Predicate<byte[]> inUseCheck) {
final AtomicInteger prunedKeys = new AtomicInteger(0);
try (final Stream<byte[]> keys = keyValueStorage.streamKeys()) {
keys.forEach(
try (final Stream<byte[]> entry = keyValueStorage.streamKeys()) {
entry.forEach(
key -> {
lock.lock();
try {

@ -30,10 +30,12 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastDownloader
import org.hyperledger.besu.ethereum.eth.sync.fullsync.FullSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapPersistedContext;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStatePeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.worldstate.Pruner;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
@ -78,6 +80,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
final EthContext ethContext,
final SyncState syncState,
final Path dataDirectory,
final StorageProvider storageProvider,
final Clock clock,
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition,
@ -141,6 +144,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
} else if (SyncMode.X_CHECKPOINT.equals(syncConfig.getSyncMode())) {
this.fastSyncDownloader =
CheckpointDownloaderFactory.createCheckpointDownloader(
new SnapPersistedContext(storageProvider),
pivotBlockSelector,
syncConfig,
dataDirectory,
@ -154,6 +158,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
} else {
this.fastSyncDownloader =
SnapDownloaderFactory.createSnapDownloader(
new SnapPersistedContext(storageProvider),
pivotBlockSelector,
syncConfig,
dataDirectory,

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
@ -22,6 +23,9 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
public class GenericKeyValueStorageFacade<K, V> implements Closeable {
protected final KeyValueStorage storage;
@ -57,6 +61,13 @@ public class GenericKeyValueStorageFacade<K, V> implements Closeable {
keyValueStorageTransaction.commit();
}
public void putAll(
final Consumer<KeyValueStorageTransaction> keyValueStorageTransactionConsumer) {
final KeyValueStorageTransaction keyValueStorageTransaction = storage.startTransaction();
keyValueStorageTransactionConsumer.accept(keyValueStorageTransaction);
keyValueStorageTransaction.commit();
}
public void drop(final K key) {
storage.tryDelete(keyConvertor.toBytes(key));
}
@ -78,4 +89,10 @@ public class GenericKeyValueStorageFacade<K, V> implements Closeable {
}
keyValueStorageTransaction.commit();
}
public Stream<V> streamValuesFromKeysThat(final Predicate<byte[]> returnCondition)
throws StorageException {
return storage.getAllValuesFromKeysThat(returnCondition).stream()
.map(valueConvertor::fromBytes);
}
}

@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncStateStorage;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapPersistedContext;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldStateDownloader;
@ -49,6 +50,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointDownloaderFactory.class);
public static Optional<FastSyncDownloader<?>> createCheckpointDownloader(
final SnapPersistedContext snapContext,
final PivotBlockSelector pivotBlockSelector,
final SynchronizerConfiguration syncConfig,
final Path dataDirectory,
@ -119,13 +121,13 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
new SnapSyncState(
fastSyncStateStorage.loadState(
ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)));
worldStateStorage.clear();
final InMemoryTasksPriorityQueues<SnapDataRequest> snapTaskCollection =
createSnapWorldStateDownloaderTaskCollection();
final WorldStateDownloader snapWorldStateDownloader =
new SnapWorldStateDownloader(
ethContext,
snapContext,
protocolContext,
worldStateStorage,
snapTaskCollection,

@ -46,6 +46,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
private static final Logger LOG = LoggerFactory.getLogger(SnapDownloaderFactory.class);
public static Optional<FastSyncDownloader<?>> createSnapDownloader(
final SnapPersistedContext snapContext,
final PivotBlockSelector pivotBlockSelector,
final SynchronizerConfiguration syncConfig,
final Path dataDirectory,
@ -86,13 +87,13 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
new SnapSyncState(
fastSyncStateStorage.loadState(
ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)));
worldStateStorage.clear();
final InMemoryTasksPriorityQueues<SnapDataRequest> snapTaskCollection =
createSnapWorldStateDownloaderTaskCollection();
final WorldStateDownloader snapWorldStateDownloader =
new SnapWorldStateDownloader(
ethContext,
snapContext,
protocolContext,
worldStateStorage,
snapTaskCollection,

@ -0,0 +1,138 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.GenericKeyValueStorageFacade;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.ValueConvertor;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.tuweni.bytes.Bytes;
public class SnapPersistedContext {
private final byte[] SNAP_INCONSISTENT_ACCOUNT_INDEX =
"snapInconsistentAccountsStorageIndex".getBytes(StandardCharsets.UTF_8);
private final GenericKeyValueStorageFacade<BigInteger, AccountRangeDataRequest>
accountRangeToDownload;
private final GenericKeyValueStorageFacade<BigInteger, Bytes> inconsistentAccounts;
public SnapPersistedContext(final StorageProvider storageProvider) {
this.accountRangeToDownload =
new GenericKeyValueStorageFacade<>(
BigInteger::toByteArray,
new ValueConvertor<>() {
@Override
public AccountRangeDataRequest fromBytes(final byte[] bytes) {
return AccountRangeDataRequest.deserialize(
new BytesValueRLPInput(Bytes.of(bytes), false));
}
@Override
public byte[] toBytes(final AccountRangeDataRequest value) {
return value.serialize().toArrayUnsafe();
}
},
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.SNAPSYNC_MISSING_ACCOUNT_RANGE));
this.inconsistentAccounts =
new GenericKeyValueStorageFacade<>(
BigInteger::toByteArray,
new ValueConvertor<>() {
@Override
public Bytes fromBytes(final byte[] bytes) {
return Bytes.of(bytes);
}
@Override
public byte[] toBytes(final Bytes value) {
return value.toArrayUnsafe();
}
},
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.SNAPSYNC_ACCOUNT_TO_FIX));
}
public void updatePersistedTasks(final List<? extends SnapDataRequest> accountRangeDataRequests) {
accountRangeToDownload.clear();
accountRangeToDownload.putAll(
keyValueStorageTransaction ->
IntStream.range(0, accountRangeDataRequests.size())
.forEach(
index ->
keyValueStorageTransaction.put(
BigInteger.valueOf(index).toByteArray(),
((AccountRangeDataRequest) accountRangeDataRequests.get(index))
.serialize()
.toArrayUnsafe())));
}
public void addInconsistentAccount(final Bytes inconsistentAccount) {
final BigInteger index =
inconsistentAccounts
.get(SNAP_INCONSISTENT_ACCOUNT_INDEX)
.map(bytes -> new BigInteger(bytes.toArrayUnsafe()).add(BigInteger.ONE))
.orElse(BigInteger.ZERO);
inconsistentAccounts.putAll(
keyValueStorageTransaction -> {
keyValueStorageTransaction.put(SNAP_INCONSISTENT_ACCOUNT_INDEX, index.toByteArray());
keyValueStorageTransaction.put(index.toByteArray(), inconsistentAccount.toArrayUnsafe());
});
}
public List<AccountRangeDataRequest> getPersistedTasks() {
return accountRangeToDownload
.streamValuesFromKeysThat(bytes -> true)
.collect(Collectors.toList());
}
public HashSet<Bytes> getInconsistentAccounts() {
return inconsistentAccounts
.streamValuesFromKeysThat(notEqualsTo(SNAP_INCONSISTENT_ACCOUNT_INDEX))
.collect(Collectors.toCollection(HashSet::new));
}
public void clearAccountRangeTasks() {
accountRangeToDownload.clear();
}
public void clear() {
accountRangeToDownload.clear();
inconsistentAccounts.clear();
}
public void close() throws IOException {
accountRangeToDownload.close();
inconsistentAccounts.close();
}
private Predicate<byte[]> notEqualsTo(final byte[] name) {
return key -> !Arrays.equals(key, name);
}
}

@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
@ -57,9 +58,11 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
new InMemoryTaskQueue<>();
protected final InMemoryTasksPriorityQueues<SnapDataRequest> pendingTrieNodeRequests =
new InMemoryTasksPriorityQueues<>();
public final HashSet<Bytes> inconsistentAccounts = new HashSet<>();
public HashSet<Bytes> inconsistentAccounts = new HashSet<>();
private DynamicPivotBlockManager dynamicPivotBlockManager;
private final SnapPersistedContext snapContext;
private final SnapSyncState snapSyncState;
// blockchain
@ -71,6 +74,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
public SnapWorldDownloadState(
final WorldStateStorage worldStateStorage,
final SnapPersistedContext snapContext,
final Blockchain blockchain,
final SnapSyncState snapSyncState,
final InMemoryTasksPriorityQueues<SnapDataRequest> pendingRequests,
@ -84,6 +88,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
maxRequestsWithoutProgress,
minMillisBeforeStalling,
clock);
this.snapContext = snapContext;
this.blockchain = blockchain;
this.snapSyncState = snapSyncState;
this.metricsManager = metricsManager;
@ -156,6 +161,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
updater.commit();
metricsManager.notifySnapSyncCompleted();
snapContext.clear();
internalFuture.complete(null);
return true;
}
@ -175,10 +181,12 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
}
public synchronized void startHeal() {
snapContext.clearAccountRangeTasks();
snapSyncState.setHealStatus(true);
// try to find new pivot block before healing
dynamicPivotBlockManager.switchToNewPivotBlock(
(blockHeader, newPivotBlockFound) -> {
snapContext.clearAccountRangeTasks();
LOG.info(
"Running world state heal process from peers with pivot block {}",
blockHeader.getNumber());
@ -216,9 +224,16 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
}
}
public void addInconsistentAccount(final Bytes account) {
public synchronized void setInconsistentAccounts(final HashSet<Bytes> inconsistentAccounts) {
this.inconsistentAccounts = inconsistentAccounts;
}
public synchronized void addInconsistentAccount(final Bytes account) {
if (!inconsistentAccounts.contains(account)) {
snapContext.addInconsistentAccount(account);
inconsistentAccounts.add(account);
}
}
@Override
public synchronized void enqueueRequests(final Stream<SnapDataRequest> requests) {
@ -229,24 +244,29 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
public synchronized Task<SnapDataRequest> dequeueRequestBlocking(
final List<TaskCollection<SnapDataRequest>> queueDependencies,
final List<TaskCollection<SnapDataRequest>> queues) {
final TaskCollection<SnapDataRequest> queue,
final Consumer<Void> unBlocked) {
boolean isWaiting = false;
while (!internalFuture.isDone()) {
while (queueDependencies.stream()
.map(TaskCollection::allTasksCompleted)
.anyMatch(Predicate.isEqual(false))) {
try {
isWaiting = true;
wait();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
for (TaskCollection<SnapDataRequest> queue : queues) {
if (isWaiting) {
unBlocked.accept(null);
}
isWaiting = false;
Task<SnapDataRequest> task = queue.remove();
if (task != null) {
return task;
}
}
try {
wait();
@ -261,25 +281,27 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
public synchronized Task<SnapDataRequest> dequeueAccountRequestBlocking() {
return dequeueRequestBlocking(
List.of(pendingStorageRequests, pendingBigStorageRequests, pendingCodeRequests),
List.of(pendingAccountRequests));
pendingAccountRequests,
unused -> snapContext.updatePersistedTasks(pendingAccountRequests.asList()));
}
public synchronized Task<SnapDataRequest> dequeueBigStorageRequestBlocking() {
return dequeueRequestBlocking(Collections.emptyList(), List.of(pendingBigStorageRequests));
return dequeueRequestBlocking(Collections.emptyList(), pendingBigStorageRequests, __ -> {});
}
public synchronized Task<SnapDataRequest> dequeueStorageRequestBlocking() {
return dequeueRequestBlocking(Collections.emptyList(), List.of(pendingStorageRequests));
return dequeueRequestBlocking(Collections.emptyList(), pendingStorageRequests, __ -> {});
}
public synchronized Task<SnapDataRequest> dequeueCodeRequestBlocking() {
return dequeueRequestBlocking(List.of(pendingStorageRequests), List.of(pendingCodeRequests));
return dequeueRequestBlocking(List.of(pendingStorageRequests), pendingCodeRequests, __ -> {});
}
public synchronized Task<SnapDataRequest> dequeueTrieNodeRequestBlocking() {
return dequeueRequestBlocking(
List.of(pendingAccountRequests, pendingStorageRequests, pendingBigStorageRequests),
List.of(pendingTrieNodeRequests));
pendingTrieNodeRequests,
__ -> {});
}
public SnapsyncMetricsManager getMetricsManager() {

@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
@ -30,6 +31,8 @@ import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import java.time.Clock;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.IntSupplier;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,6 +53,7 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
private final MetricsSystem metricsSystem;
private final EthContext ethContext;
private final SnapPersistedContext snapContext;
private final InMemoryTasksPriorityQueues<SnapDataRequest> snapTaskCollection;
private final SnapSyncConfiguration snapSyncConfiguration;
private final int maxOutstandingRequests;
@ -60,6 +65,7 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
public SnapWorldStateDownloader(
final EthContext ethContext,
final SnapPersistedContext snapContext,
final ProtocolContext protocolContext,
final WorldStateStorage worldStateStorage,
final InMemoryTasksPriorityQueues<SnapDataRequest> snapTaskCollection,
@ -72,6 +78,7 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
this.ethContext = ethContext;
this.protocolContext = protocolContext;
this.worldStateStorage = worldStateStorage;
this.snapContext = snapContext;
this.snapTaskCollection = snapTaskCollection;
this.snapSyncConfiguration = snapSyncConfiguration;
this.maxOutstandingRequests = maxOutstandingRequests;
@ -128,6 +135,7 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
final SnapWorldDownloadState newDownloadState =
new SnapWorldDownloadState(
worldStateStorage,
snapContext,
protocolContext.getBlockchain(),
snapSyncState,
snapTaskCollection,
@ -138,10 +146,33 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16);
snapsyncMetricsManager.initRange(ranges);
final List<AccountRangeDataRequest> persistedTasks = snapContext.getPersistedTasks();
final HashSet<Bytes> inconsistentAccounts = snapContext.getInconsistentAccounts();
if (!persistedTasks.isEmpty()) { // continue to download worldstate ranges
newDownloadState.setInconsistentAccounts(inconsistentAccounts);
snapContext
.getPersistedTasks()
.forEach(
snapDataRequest -> {
snapsyncMetricsManager.notifyStateDownloaded(
snapDataRequest.getStartKeyHash(), snapDataRequest.getEndKeyHash());
newDownloadState.enqueueRequest(snapDataRequest);
});
} else if (!inconsistentAccounts.isEmpty()) { // restart only the heal step
snapSyncState.setHealStatus(true);
newDownloadState.setInconsistentAccounts(inconsistentAccounts);
newDownloadState.enqueueRequest(
SnapDataRequest.createAccountTrieNodeDataRequest(
stateRoot, Bytes.EMPTY, snapContext.getInconsistentAccounts()));
} else { // start from scratch
worldStateStorage.clear();
ranges.forEach(
(key, value) ->
newDownloadState.enqueueRequest(
createAccountRangeDataRequest(stateRoot, key, value)));
}
Optional<CompleteTaskStep> maybeCompleteTask =
Optional.of(new CompleteTaskStep(snapSyncState, metricsSystem));

@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.StackTrie;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import org.hyperledger.besu.ethereum.trie.NodeUpdater;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
@ -197,4 +198,26 @@ public class AccountRangeDataRequest extends SnapDataRequest {
public TreeMap<Bytes32, Bytes> getAccounts() {
return stackTrie.getElement(startKeyHash).keys();
}
public Bytes serialize() {
return RLP.encode(
out -> {
out.startList();
out.writeByte(getRequestType().getValue());
out.writeBytes(getRootHash());
out.writeBytes(getStartKeyHash());
out.writeBytes(getEndKeyHash());
out.endList();
});
}
public static AccountRangeDataRequest deserialize(final RLPInput in) {
in.enterList();
in.skipNext(); // skip request type
final Hash rootHash = Hash.wrap(in.readBytes32());
final Bytes32 startKeyHash = in.readBytes32();
final Bytes32 endKeyHash = in.readBytes32();
in.leaveList();
return createAccountRangeDataRequest(rootHash, startKeyHash, endKeyHash);
}
}

@ -34,8 +34,6 @@ import org.apache.tuweni.bytes.Bytes32;
public abstract class SnapDataRequest implements TasksPriorityProvider {
public static final int MAX_CHILD = 16;
protected Optional<TrieNodeDataRequest> possibleParent = Optional.empty();
protected int depth;
protected long priority;

@ -78,6 +78,7 @@ public class SnapWorldDownloadStateTest {
private final WorldStateDownloadProcess worldStateDownloadProcess =
mock(WorldStateDownloadProcess.class);
private final SnapSyncState snapSyncState = mock(SnapSyncState.class);
private final SnapPersistedContext snapContext = mock(SnapPersistedContext.class);
private final SnapsyncMetricsManager metricsManager = mock(SnapsyncMetricsManager.class);
private final Blockchain blockchain = mock(Blockchain.class);
private final DynamicPivotBlockManager dynamicPivotBlockManager =
@ -113,6 +114,7 @@ public class SnapWorldDownloadStateTest {
downloadState =
new SnapWorldDownloadState(
worldStateStorage,
snapContext,
blockchain,
snapSyncState,
pendingRequests,

@ -28,6 +28,7 @@ jar {
}
dependencies {
api 'org.apache.commons:commons-lang3'
api 'org.apache.tuweni:tuweni-bytes'
api 'org.apache.tuweni:tuweni-units'
}
@ -65,7 +66,7 @@ Calculated : ${currentHash}
tasks.register('checkAPIChanges', FileStateChecker) {
description = "Checks that the API for the Plugin-API project does not change without deliberate thought"
files = sourceSets.main.allJava.files
knownHash = 't1ECxSKuhCHrMq9uKYC9datxfFqqTpCPc6GFmUfC8Pg='
knownHash = 'QC/7QGfjlWA5tfyfQdf/esATYzLfZbeJ9AnLKkaCy3Q='
}
check.dependsOn('checkAPIChanges')

@ -23,6 +23,8 @@ import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
/**
* Responsible for storing values against keys.
*
@ -61,6 +63,14 @@ public interface KeyValueStorage extends Closeable {
*/
Optional<byte[]> get(byte[] key) throws StorageException;
/**
* Returns a stream of all keys and values.
*
* @return A stream of all keys and values in storage.
* @throws StorageException problem encountered during the retrieval attempt.
*/
Stream<Pair<byte[], byte[]>> stream() throws StorageException;
/**
* Returns a stream of all keys.
*
@ -89,6 +99,8 @@ public interface KeyValueStorage extends Closeable {
*/
Set<byte[]> getAllKeysThat(Predicate<byte[]> returnCondition);
Set<byte[]> getAllValuesFromKeysThat(final Predicate<byte[]> returnCondition);
/**
* Begins a fresh transaction, for sequencing operations for later atomic execution.
*

@ -25,23 +25,24 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RocksDbKeyIterator implements Iterator<byte[]>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RocksDbKeyIterator.class);
public class RocksDbIterator implements Iterator<Pair<byte[], byte[]>>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RocksDbIterator.class);
private final RocksIterator rocksIterator;
private final AtomicBoolean closed = new AtomicBoolean(false);
private RocksDbKeyIterator(final RocksIterator rocksIterator) {
private RocksDbIterator(final RocksIterator rocksIterator) {
this.rocksIterator = rocksIterator;
}
public static RocksDbKeyIterator create(final RocksIterator rocksIterator) {
return new RocksDbKeyIterator(rocksIterator);
public static RocksDbIterator create(final RocksIterator rocksIterator) {
return new RocksDbIterator(rocksIterator);
}
@Override
@ -51,7 +52,25 @@ public class RocksDbKeyIterator implements Iterator<byte[]>, AutoCloseable {
}
@Override
public byte[] next() {
public Pair<byte[], byte[]> next() {
assertOpen();
try {
rocksIterator.status();
} catch (final RocksDBException e) {
LOG.error(
String.format("%s encountered a problem while iterating.", getClass().getSimpleName()),
e);
}
if (!hasNext()) {
throw new NoSuchElementException();
}
final byte[] key = rocksIterator.key();
final byte[] value = rocksIterator.value();
rocksIterator.next();
return Pair.of(key, value);
}
public byte[] nextKey() {
assertOpen();
try {
rocksIterator.status();
@ -68,9 +87,9 @@ public class RocksDbKeyIterator implements Iterator<byte[]>, AutoCloseable {
return key;
}
public Stream<byte[]> toStream() {
public Stream<Pair<byte[], byte[]>> toStream() {
assertOpen();
final Spliterator<byte[]> spliterator =
final Spliterator<Pair<byte[], byte[]>> spliterator =
Spliterators.spliteratorUnknownSize(
this,
Spliterator.IMMUTABLE
@ -82,6 +101,30 @@ public class RocksDbKeyIterator implements Iterator<byte[]>, AutoCloseable {
return StreamSupport.stream(spliterator, false).onClose(this::close);
}
public Stream<byte[]> toStreamKeys() {
assertOpen();
final Spliterator<byte[]> spliterator =
Spliterators.spliteratorUnknownSize(
new Iterator<>() {
@Override
public boolean hasNext() {
return RocksDbIterator.this.hasNext();
}
@Override
public byte[] next() {
return RocksDbIterator.this.nextKey();
}
},
Spliterator.IMMUTABLE
| Spliterator.DISTINCT
| Spliterator.NONNULL
| Spliterator.ORDERED
| Spliterator.SORTED);
return StreamSupport.stream(spliterator, false).onClose(this::close);
}
private void assertOpen() {
checkState(
!closed.get(),

@ -29,6 +29,7 @@ import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.OptimisticTransactionDB;
public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage {
@ -54,6 +55,11 @@ public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage {
return snapTx.get(key);
}
@Override
public Stream<Pair<byte[], byte[]>> stream() {
return snapTx.stream();
}
@Override
public Stream<byte[]> streamKeys() {
return snapTx.streamKeys();
@ -70,6 +76,14 @@ public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage {
return streamKeys().filter(returnCondition).collect(toUnmodifiableSet());
}
@Override
public Set<byte[]> getAllValuesFromKeysThat(final Predicate<byte[]> returnCondition) {
return stream()
.filter(pair -> returnCondition.test(pair.getKey()))
.map(Pair::getValue)
.collect(toUnmodifiableSet());
}
@Override
public KeyValueStorageTransaction startTransaction() throws StorageException {
// The use of a transaction on a transaction based key value store is dubious

@ -23,7 +23,7 @@ import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbKeyIterator;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbIterator;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbUtil;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration;
@ -42,6 +42,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tuweni.bytes.Bytes;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.ColumnFamilyDescriptor;
@ -223,11 +224,18 @@ public class RocksDBColumnarKeyValueStorage
new RocksDbTransaction(db.beginTransaction(writeOptions), writeOptions));
}
@Override
public Stream<Pair<byte[], byte[]>> stream(final RocksDbSegmentIdentifier segmentHandle) {
final RocksIterator rocksIterator = db.newIterator(segmentHandle.get());
rocksIterator.seekToFirst();
return RocksDbIterator.create(rocksIterator).toStream();
}
@Override
public Stream<byte[]> streamKeys(final RocksDbSegmentIdentifier segmentHandle) {
final RocksIterator rocksIterator = db.newIterator(segmentHandle.get());
rocksIterator.seekToFirst();
return RocksDbKeyIterator.create(rocksIterator).toStream();
return RocksDbIterator.create(rocksIterator).toStreamKeys();
}
@Override
@ -247,7 +255,19 @@ public class RocksDBColumnarKeyValueStorage
@Override
public Set<byte[]> getAllKeysThat(
final RocksDbSegmentIdentifier segmentHandle, final Predicate<byte[]> returnCondition) {
return streamKeys(segmentHandle).filter(returnCondition).collect(toUnmodifiableSet());
return stream(segmentHandle)
.filter(pair -> returnCondition.test(pair.getKey()))
.map(Pair::getKey)
.collect(toUnmodifiableSet());
}
@Override
public Set<byte[]> getAllValuesFromKeysThat(
final RocksDbSegmentIdentifier segmentHandle, final Predicate<byte[]> returnCondition) {
return stream(segmentHandle)
.filter(pair -> returnCondition.test(pair.getKey()))
.map(Pair::getValue)
.collect(toUnmodifiableSet());
}
@Override

@ -19,11 +19,12 @@ import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbKeyIterator;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbIterator;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.ReadOptions;
@ -108,10 +109,16 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A
}
}
public Stream<Pair<byte[], byte[]>> stream() {
final RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
rocksIterator.seekToFirst();
return RocksDbIterator.create(rocksIterator).toStream();
}
public Stream<byte[]> streamKeys() {
final RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
rocksIterator.seekToFirst();
return RocksDbKeyIterator.create(rocksIterator).toStream();
return RocksDbIterator.create(rocksIterator).toStreamKeys();
}
@Override

@ -23,7 +23,7 @@ import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbKeyIterator;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbIterator;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbUtil;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration;
import org.hyperledger.besu.services.kvstore.KeyValueStorageTransactionTransitionValidatorDecorator;
@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.LRUCache;
import org.rocksdb.OptimisticTransactionDB;
@ -121,14 +122,32 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
@Override
public Set<byte[]> getAllKeysThat(final Predicate<byte[]> returnCondition) {
return streamKeys().filter(returnCondition).collect(toUnmodifiableSet());
return stream()
.filter(pair -> returnCondition.test(pair.getKey()))
.map(Pair::getKey)
.collect(toUnmodifiableSet());
}
@Override
public Stream<Pair<byte[], byte[]>> stream() {
final RocksIterator rocksIterator = db.newIterator();
rocksIterator.seekToFirst();
return RocksDbIterator.create(rocksIterator).toStream();
}
@Override
public Stream<byte[]> streamKeys() {
final RocksIterator rocksIterator = db.newIterator();
rocksIterator.seekToFirst();
return RocksDbKeyIterator.create(rocksIterator).toStream();
return RocksDbIterator.create(rocksIterator).toStreamKeys();
}
@Override
public Set<byte[]> getAllValuesFromKeysThat(final Predicate<byte[]> returnCondition) {
return stream()
.filter(pair -> returnCondition.test(pair.getKey()))
.map(Pair::getValue)
.collect(toUnmodifiableSet());
}
@Override

@ -34,6 +34,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@ -114,21 +115,21 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT
tx.put(barSegment, bytesOf(6), bytesOf(6));
tx.commit();
store
.streamKeys(fooSegment)
store.stream(fooSegment)
.map(Pair::getKey)
.forEach(
key -> {
if (!Arrays.equals(key, bytesOf(3))) store.tryDelete(fooSegment, key);
});
store
.streamKeys(barSegment)
store.stream(barSegment)
.map(Pair::getKey)
.forEach(
key -> {
if (!Arrays.equals(key, bytesOf(4))) store.tryDelete(barSegment, key);
});
for (final RocksDbSegmentIdentifier segment : Set.of(fooSegment, barSegment)) {
assertThat(store.streamKeys(segment).count()).isEqualTo(1);
assertThat(store.stream(segment).count()).isEqualTo(1);
}
assertThat(store.get(fooSegment, bytesOf(1))).isEmpty();

@ -33,6 +33,7 @@ import java.util.function.Predicate;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tuweni.bytes.Bytes;
public class InMemoryKeyValueStorage implements KeyValueStorage {
@ -77,7 +78,30 @@ public class InMemoryKeyValueStorage implements KeyValueStorage {
@Override
public Set<byte[]> getAllKeysThat(final Predicate<byte[]> returnCondition) {
return streamKeys().filter(returnCondition).collect(toUnmodifiableSet());
return stream()
.filter(pair -> returnCondition.test(pair.getKey()))
.map(Pair::getKey)
.collect(toUnmodifiableSet());
}
@Override
public Set<byte[]> getAllValuesFromKeysThat(final Predicate<byte[]> returnCondition) {
return stream()
.filter(pair -> returnCondition.test(pair.getKey()))
.map(Pair::getValue)
.collect(toUnmodifiableSet());
}
@Override
public Stream<Pair<byte[], byte[]>> stream() {
final Lock lock = rwLock.readLock();
lock.lock();
try {
return ImmutableSet.copyOf(hashValueStore.entrySet()).stream()
.map(bytesEntry -> Pair.of(bytesEntry.getKey().toArrayUnsafe(), bytesEntry.getValue()));
} finally {
lock.unlock();
}
}
@Override
@ -85,7 +109,8 @@ public class InMemoryKeyValueStorage implements KeyValueStorage {
final Lock lock = rwLock.readLock();
lock.lock();
try {
return ImmutableSet.copyOf(hashValueStore.keySet()).stream().map(Bytes::toArrayUnsafe);
return ImmutableSet.copyOf(hashValueStore.entrySet()).stream()
.map(bytesEntry -> bytesEntry.getKey().toArrayUnsafe());
} finally {
lock.unlock();
}

@ -34,6 +34,7 @@ import java.util.stream.Stream;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tuweni.bytes.Bytes;
/**
@ -81,7 +82,30 @@ public class LimitedInMemoryKeyValueStorage implements KeyValueStorage {
@Override
public Set<byte[]> getAllKeysThat(final Predicate<byte[]> returnCondition) {
return streamKeys().filter(returnCondition).collect(toUnmodifiableSet());
return stream()
.filter(pair -> returnCondition.test(pair.getKey()))
.map(Pair::getKey)
.collect(toUnmodifiableSet());
}
@Override
public Set<byte[]> getAllValuesFromKeysThat(final Predicate<byte[]> returnCondition) {
return stream()
.filter(pair -> returnCondition.test(pair.getKey()))
.map(Pair::getValue)
.collect(toUnmodifiableSet());
}
@Override
public Stream<Pair<byte[], byte[]>> stream() {
final Lock lock = rwLock.readLock();
lock.lock();
try {
return ImmutableSet.copyOf(storage.asMap().entrySet()).stream()
.map(bytesEntry -> Pair.of(bytesEntry.getKey().toArrayUnsafe(), bytesEntry.getValue()));
} finally {
lock.unlock();
}
}
@Override
@ -89,7 +113,8 @@ public class LimitedInMemoryKeyValueStorage implements KeyValueStorage {
final Lock lock = rwLock.readLock();
lock.lock();
try {
return ImmutableSet.copyOf(storage.asMap().keySet()).stream().map(Bytes::toArrayUnsafe);
return ImmutableSet.copyOf(storage.asMap().entrySet()).stream()
.map(bytesEntry -> bytesEntry.getKey().toArrayUnsafe());
} finally {
lock.unlock();
}

@ -23,6 +23,8 @@ import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
/**
* Service provided by Besu to facilitate persistent data storage.
*
@ -58,8 +60,9 @@ public interface SegmentedKeyValueStorage<S> extends Closeable {
* @param segmentHandle The segment handle whose keys we want to stream.
* @return A stream of all keys in the specified segment.
*/
Stream<byte[]> streamKeys(final S segmentHandle);
Stream<Pair<byte[], byte[]>> stream(final S segmentHandle);
Stream<byte[]> streamKeys(final S segmentHandle);
/**
* Delete the value corresponding to the given key in the given segment if a write lock can be
* instantly acquired on the underlying storage. Do nothing otherwise.
@ -74,6 +77,8 @@ public interface SegmentedKeyValueStorage<S> extends Closeable {
Set<byte[]> getAllKeysThat(S segmentHandle, Predicate<byte[]> returnCondition);
Set<byte[]> getAllValuesFromKeysThat(final S segmentHandle, Predicate<byte[]> returnCondition);
void clear(S segmentHandle);
/**

@ -27,7 +27,10 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
public class SegmentedKeyValueStorageAdapter<S> implements SnappableKeyValueStorage {
private final S segmentHandle;
private final SegmentedKeyValueStorage<S> storage;
private final Supplier<SnappedKeyValueStorage> snapshotSupplier;
@ -71,6 +74,16 @@ public class SegmentedKeyValueStorageAdapter<S> implements SnappableKeyValueStor
return storage.getAllKeysThat(segmentHandle, returnCondition);
}
@Override
public Set<byte[]> getAllValuesFromKeysThat(final Predicate<byte[]> returnCondition) {
return storage.getAllValuesFromKeysThat(segmentHandle, returnCondition);
}
@Override
public Stream<Pair<byte[], byte[]>> stream() {
return storage.stream(segmentHandle);
}
@Override
public Stream<byte[]> streamKeys() {
return storage.streamKeys(segmentHandle);

@ -1,290 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.services.tasks;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlatFileTaskCollection<T> implements TaskCollection<T> {
private static final Logger LOG = LoggerFactory.getLogger(FlatFileTaskCollection.class);
private static final long DEFAULT_FILE_ROLL_SIZE_BYTES = 1024 * 1024 * 10; // 10Mb
static final String FILENAME_PREFIX = "tasks";
private final Set<FlatFileTask<T>> outstandingTasks = new HashSet<>();
private final Path storageDirectory;
private final Function<T, Bytes> serializer;
private final Function<Bytes, T> deserializer;
private final long rollWhenFileSizeExceedsBytes;
private final ByteBuffer lengthBuffer = ByteBuffer.allocate(Integer.BYTES);
private FileChannel readFileChannel;
private FileChannel writeFileChannel;
private long size = 0;
private int readFileNumber = 0;
private int writeFileNumber = 0;
public FlatFileTaskCollection(
final Path storageDirectory,
final Function<T, Bytes> serializer,
final Function<Bytes, T> deserializer) {
this(storageDirectory, serializer, deserializer, DEFAULT_FILE_ROLL_SIZE_BYTES);
}
FlatFileTaskCollection(
final Path storageDirectory,
final Function<T, Bytes> serializer,
final Function<Bytes, T> deserializer,
final long rollWhenFileSizeExceedsBytes) {
this.storageDirectory = storageDirectory;
this.serializer = serializer;
this.deserializer = deserializer;
this.rollWhenFileSizeExceedsBytes = rollWhenFileSizeExceedsBytes;
writeFileChannel = openWriteFileChannel(writeFileNumber);
readFileChannel = openReadFileChannel(readFileNumber);
}
private FileChannel openReadFileChannel(final int fileNumber) {
try {
return FileChannel.open(
pathForFileNumber(fileNumber),
StandardOpenOption.DELETE_ON_CLOSE,
StandardOpenOption.READ);
} catch (final IOException e) {
throw new StorageException(e);
}
}
private FileChannel openWriteFileChannel(final int fileNumber) {
try {
return FileChannel.open(
pathForFileNumber(fileNumber),
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
} catch (final IOException e) {
throw new StorageException(
"There was a problem opening FileChannel " + pathForFileNumber(fileNumber), e);
}
}
@Override
public synchronized void add(final T taskData) {
final Bytes data = serializer.apply(taskData);
try {
writeTaskData(data);
size++;
if (writeFileChannel.size() > rollWhenFileSizeExceedsBytes) {
LOG.debug("Writing reached end of file {}", writeFileNumber);
writeFileChannel.close();
writeFileNumber++;
writeFileChannel = openWriteFileChannel(writeFileNumber);
}
} catch (final IOException e) {
throw new StorageException(
"There was a problem adding to FileChannel " + pathForFileNumber(writeFileNumber), e);
}
}
@Override
public synchronized Task<T> remove() {
if (isEmpty()) {
return null;
}
try {
final ByteBuffer dataBuffer = readNextTaskData();
final T data = deserializer.apply(Bytes.wrapByteBuffer(dataBuffer));
final FlatFileTask<T> task = new FlatFileTask<>(this, data);
outstandingTasks.add(task);
size--;
return task;
} catch (final IOException e) {
throw new StorageException(
"There was a problem removing from FileChannel " + pathForFileNumber(readFileNumber), e);
}
}
private ByteBuffer readNextTaskData() throws IOException {
final int dataLength = readDataLength();
final ByteBuffer dataBuffer = ByteBuffer.allocate(dataLength);
readBytes(dataBuffer, dataLength);
return dataBuffer;
}
private void writeTaskData(final Bytes data) throws IOException {
final long offset = writeFileChannel.size();
writeDataLength(data.size(), offset);
writeFileChannel.write(ByteBuffer.wrap(data.toArrayUnsafe()), offset + Integer.BYTES);
}
private int readDataLength() throws IOException {
lengthBuffer.position(0);
lengthBuffer.limit(Integer.BYTES);
readBytes(lengthBuffer, Integer.BYTES);
return lengthBuffer.getInt(0);
}
private void writeDataLength(final int size, final long offset) throws IOException {
lengthBuffer.position(0);
lengthBuffer.putInt(size);
lengthBuffer.flip();
writeFileChannel.write(lengthBuffer, offset);
}
private void readBytes(final ByteBuffer buffer, final int expectedLength) throws IOException {
int readBytes = readFileChannel.read(buffer);
if (readBytes == -1 && writeFileNumber > readFileNumber) {
LOG.debug("Reading reached end of file {}", readFileNumber);
readFileChannel.close();
readFileNumber++;
readFileChannel = openReadFileChannel(readFileNumber);
readBytes = readFileChannel.read(buffer);
}
if (readBytes != expectedLength) {
throw new IllegalStateException(
"Task queue corrupted. Expected to read "
+ expectedLength
+ " bytes but only got "
+ readBytes);
}
}
@Override
public synchronized long size() {
return size;
}
@Override
public synchronized boolean isEmpty() {
return size() == 0;
}
@VisibleForTesting
int getReadFileNumber() {
return readFileNumber;
}
@VisibleForTesting
int getWriteFileNumber() {
return writeFileNumber;
}
@Override
public synchronized void clear() {
outstandingTasks.clear();
try {
readFileChannel.close();
writeFileChannel.close();
for (int i = readFileNumber; i <= writeFileNumber; i++) {
final File file = pathForFileNumber(i).toFile();
if (!file.delete() && file.exists()) {
LOG.error("Failed to delete tasks file {}", file.getAbsolutePath());
}
}
readFileNumber = 0;
writeFileNumber = 0;
writeFileChannel = openWriteFileChannel(writeFileNumber);
readFileChannel = openReadFileChannel(readFileNumber);
size = 0;
} catch (final IOException e) {
throw new StorageException(e);
}
}
@Override
public synchronized boolean allTasksCompleted() {
return isEmpty() && outstandingTasks.isEmpty();
}
@Override
public synchronized void close() {
try {
readFileChannel.close();
writeFileChannel.close();
} catch (final IOException e) {
throw new StorageException(e);
}
}
private Path pathForFileNumber(final int fileNumber) {
return storageDirectory.resolve(FILENAME_PREFIX + fileNumber);
}
private synchronized boolean markTaskCompleted(final FlatFileTask<T> task) {
return outstandingTasks.remove(task);
}
private synchronized void handleFailedTask(final FlatFileTask<T> task) {
if (markTaskCompleted(task)) {
add(task.getData());
}
}
public static class StorageException extends RuntimeException {
StorageException(final Throwable t) {
super(t);
}
StorageException(final String m, final Throwable t) {
super(m, t);
}
}
private static class FlatFileTask<T> implements Task<T> {
private final AtomicBoolean completed = new AtomicBoolean(false);
private final FlatFileTaskCollection<T> parentQueue;
private final T data;
private FlatFileTask(final FlatFileTaskCollection<T> parentQueue, final T data) {
this.parentQueue = parentQueue;
this.data = data;
}
@Override
public T getData() {
return data;
}
@Override
public void markCompleted() {
if (completed.compareAndSet(false, true)) {
parentQueue.markTaskCompleted(this);
}
}
@Override
public void markFailed() {
if (completed.compareAndSet(false, true)) {
parentQueue.handleFailedTask(this);
}
}
}
}

@ -15,7 +15,9 @@
package org.hyperledger.besu.services.tasks;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@ -85,6 +87,10 @@ public class InMemoryTaskQueue<T> implements TaskCollection<T> {
}
}
public synchronized List<T> asList() {
return new ArrayList<>(internalQueue);
}
private synchronized void handleFailedTask(final InMemoryTask<T> task) {
if (markTaskCompleted(task)) {
add(task.getData());

@ -1,90 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.services.tasks;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.apache.tuweni.bytes.Bytes;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class FlatFileTaskCollectionTest
extends AbstractTaskQueueTest<FlatFileTaskCollection<Bytes>> {
private static final int ROLL_SIZE = 10;
@Rule public final TemporaryFolder folder = new TemporaryFolder();
@Override
protected FlatFileTaskCollection<Bytes> createQueue() throws IOException {
final Path dataDir = folder.newFolder().toPath();
return createQueue(dataDir);
}
private FlatFileTaskCollection<Bytes> createQueue(final Path dataDir) {
return new FlatFileTaskCollection<>(
dataDir, Function.identity(), Function.identity(), ROLL_SIZE);
}
@Test
public void shouldRollFilesWhenSizeExceeded() throws Exception {
final Path dataDir = folder.newFolder().toPath();
try (final FlatFileTaskCollection<Bytes> queue = createQueue(dataDir)) {
final List<Bytes> tasks = new ArrayList<>();
addItem(queue, tasks, 0);
assertThat(queue.getWriteFileNumber()).isEqualTo(0);
int tasksInFirstFile = 1;
while (queue.getWriteFileNumber() == 0) {
addItem(queue, tasks, tasksInFirstFile);
tasksInFirstFile++;
}
assertThat(queue.getWriteFileNumber()).isGreaterThan(0);
assertThat(queue.getReadFileNumber()).isEqualTo(0);
// Add extra items to be sure we have at least one in a later file
addItem(queue, tasks, 123);
addItem(queue, tasks, 124);
final List<Bytes> removedTasks = new ArrayList<>();
// Read through all the items in the first file.
for (int i = 0; i < tasksInFirstFile; i++) {
removedTasks.add(queue.remove().getData());
}
// read one more to make sure we are reading from the next file
removedTasks.add(queue.remove().getData());
assertThat(queue.getReadFileNumber()).isEqualTo(1);
// Check that all tasks were read correctly.
removedTasks.add(queue.remove().getData());
assertThat(queue.isEmpty()).isTrue();
assertThat(removedTasks).isEqualTo(tasks);
}
}
private void addItem(
final FlatFileTaskCollection<Bytes> queue, final List<Bytes> tasks, final int value) {
tasks.add(Bytes.of(value));
queue.add(Bytes.of(value));
}
}

@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tuweni.bytes.Bytes;
import org.junit.Ignore;
import org.junit.Test;
@ -83,7 +84,7 @@ public abstract class AbstractKeyValueStorageTest {
.collect(toUnmodifiableList());
keys.forEach(key -> tx.put(key, bytesFromHexString("0ABC")));
tx.commit();
assertThat(store.streamKeys().collect(toUnmodifiableSet()))
assertThat(store.stream().map(Pair::getKey).collect(toUnmodifiableSet()))
.containsExactlyInAnyOrder(keys.toArray(new byte[][] {}));
}

Loading…
Cancel
Save