From da9b10767a2423921dedcac11350af5190748a68 Mon Sep 17 00:00:00 2001 From: matkt Date: Tue, 1 Nov 2022 14:18:15 +0100 Subject: [PATCH] Snapsync persist state (#4381) This PR avoids restarting the download of the world state from scratch when restarting Besu Signed-off-by: Karim TAAM --- CHANGELOG.md | 1 + .../controller/BesuControllerBuilder.java | 1 + ethereum/core/build.gradle | 1 + .../keyvalue/KeyValueSegmentIdentifier.java | 4 +- .../keyvalue/WorldStateKeyValueStorage.java | 4 +- .../eth/sync/DefaultSynchronizer.java | 5 + .../GenericKeyValueStorageFacade.java | 17 + .../CheckpointDownloaderFactory.java | 4 +- .../sync/snapsync/SnapDownloaderFactory.java | 3 +- .../sync/snapsync/SnapPersistedContext.java | 138 +++++++++ .../sync/snapsync/SnapWorldDownloadState.java | 50 ++- .../snapsync/SnapWorldStateDownloader.java | 39 ++- .../request/AccountRangeDataRequest.java | 23 ++ .../snapsync/request/SnapDataRequest.java | 2 - .../snapsync/SnapWorldDownloadStateTest.java | 2 + plugin-api/build.gradle | 3 +- .../services/storage/KeyValueStorage.java | 12 + ...bKeyIterator.java => RocksDbIterator.java} | 59 +++- .../RocksDBColumnarKeyValueSnapshot.java | 14 + .../RocksDBColumnarKeyValueStorage.java | 26 +- .../segmented/RocksDBSnapshotTransaction.java | 11 +- .../unsegmented/RocksDBKeyValueStorage.java | 25 +- .../RocksDBColumnarKeyValueStorageTest.java | 11 +- .../kvstore/InMemoryKeyValueStorage.java | 29 +- .../LimitedInMemoryKeyValueStorage.java | 29 +- .../kvstore/SegmentedKeyValueStorage.java | 7 +- .../SegmentedKeyValueStorageAdapter.java | 13 + .../tasks/FlatFileTaskCollection.java | 290 ------------------ .../services/tasks/InMemoryTaskQueue.java | 6 + .../tasks/FlatFileTaskCollectionTest.java | 90 ------ .../kvstore/AbstractKeyValueStorageTest.java | 3 +- 31 files changed, 489 insertions(+), 433 deletions(-) create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapPersistedContext.java rename plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/{RocksDbKeyIterator.java => RocksDbIterator.java} (60%) delete mode 100644 services/tasks/src/main/java/org/hyperledger/besu/services/tasks/FlatFileTaskCollection.java delete mode 100644 services/tasks/src/test/java/org/hyperledger/besu/services/tasks/FlatFileTaskCollectionTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index fa9ac08650..24e1f4a7dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ### Additions and Improvements - Updated jackson-databind library to version 2.13.4.2 addressing [CVE-2022-42003](https://nvd.nist.gov/vuln/detail/CVE-2022-42003) - Gradle task allows custom docker image configs e.g. `./gradlew distDocker -PdockerImageName=my/besu -PdockerVariants=openjdk-17,openjdk-19` +- Update snapsync feature to avoid restarting the download of the world state from scratch when restarting Besu [#4381](https://github.com/hyperledger/besu/pull/4381) ### Bug Fixes - Fixed default fromBlock value and improved parameter interpretation in eth_getLogs RPC handler [#4513](https://github.com/hyperledger/besu/pull/4513) diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index 0325f8cae8..7befb787f2 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -487,6 +487,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides ethContext, syncState, dataDirectory, + storageProvider, clock, metricsSystem, getFullSyncTerminationCondition(protocolContext.getBlockchain()), diff --git a/ethereum/core/build.gradle b/ethereum/core/build.gradle index 4bc0a83bc7..cc3521c10f 100644 --- a/ethereum/core/build.gradle +++ b/ethereum/core/build.gradle @@ -48,6 +48,7 @@ dependencies { implementation 'io.opentelemetry:opentelemetry-api' implementation 'io.vertx:vertx-core' implementation 'net.java.dev.jna:jna' + implementation 'org.apache.commons:commons-lang3' implementation 'org.apache.tuweni:tuweni-bytes' implementation 'org.apache.tuweni:tuweni-concurrent' implementation 'org.apache.tuweni:tuweni-units' diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java index 5c624acece..316316fcc7 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java @@ -33,7 +33,9 @@ public enum KeyValueSegmentIdentifier implements SegmentIdentifier { GOQUORUM_PRIVATE_STORAGE(new byte[] {12}), BACKWARD_SYNC_HEADERS(new byte[] {13}), BACKWARD_SYNC_BLOCKS(new byte[] {14}), - BACKWARD_SYNC_CHAIN(new byte[] {15}); + BACKWARD_SYNC_CHAIN(new byte[] {15}), + SNAPSYNC_MISSING_ACCOUNT_RANGE(new byte[] {16}), + SNAPSYNC_ACCOUNT_TO_FIX(new byte[] {17}); private final byte[] id; private final int[] versionList; diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/WorldStateKeyValueStorage.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/WorldStateKeyValueStorage.java index c67db7f62e..362bd51413 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/WorldStateKeyValueStorage.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/WorldStateKeyValueStorage.java @@ -110,8 +110,8 @@ public class WorldStateKeyValueStorage implements WorldStateStorage { @Override public long prune(final Predicate inUseCheck) { final AtomicInteger prunedKeys = new AtomicInteger(0); - try (final Stream keys = keyValueStorage.streamKeys()) { - keys.forEach( + try (final Stream entry = keyValueStorage.streamKeys()) { + entry.forEach( key -> { lock.lock(); try { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 5d334f785e..340e19a2ee 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -30,10 +30,12 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastDownloader import org.hyperledger.besu.ethereum.eth.sync.fullsync.FullSyncDownloader; import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition; import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapDownloaderFactory; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapPersistedContext; import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStatePeerTrieNodeFinder; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder; import org.hyperledger.besu.ethereum.worldstate.Pruner; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; @@ -78,6 +80,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi final EthContext ethContext, final SyncState syncState, final Path dataDirectory, + final StorageProvider storageProvider, final Clock clock, final MetricsSystem metricsSystem, final SyncTerminationCondition terminationCondition, @@ -141,6 +144,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi } else if (SyncMode.X_CHECKPOINT.equals(syncConfig.getSyncMode())) { this.fastSyncDownloader = CheckpointDownloaderFactory.createCheckpointDownloader( + new SnapPersistedContext(storageProvider), pivotBlockSelector, syncConfig, dataDirectory, @@ -154,6 +158,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi } else { this.fastSyncDownloader = SnapDownloaderFactory.createSnapDownloader( + new SnapPersistedContext(storageProvider), pivotBlockSelector, syncConfig, dataDirectory, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/GenericKeyValueStorageFacade.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/GenericKeyValueStorageFacade.java index eae0b4e3bf..c6c929d0d2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/GenericKeyValueStorageFacade.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/GenericKeyValueStorageFacade.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.sync.backwardsync; +import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; @@ -22,6 +23,9 @@ import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Stream; public class GenericKeyValueStorageFacade implements Closeable { protected final KeyValueStorage storage; @@ -57,6 +61,13 @@ public class GenericKeyValueStorageFacade implements Closeable { keyValueStorageTransaction.commit(); } + public void putAll( + final Consumer keyValueStorageTransactionConsumer) { + final KeyValueStorageTransaction keyValueStorageTransaction = storage.startTransaction(); + keyValueStorageTransactionConsumer.accept(keyValueStorageTransaction); + keyValueStorageTransaction.commit(); + } + public void drop(final K key) { storage.tryDelete(keyConvertor.toBytes(key)); } @@ -78,4 +89,10 @@ public class GenericKeyValueStorageFacade implements Closeable { } keyValueStorageTransaction.commit(); } + + public Stream streamValuesFromKeysThat(final Predicate returnCondition) + throws StorageException { + return storage.getAllValuesFromKeysThat(returnCondition).stream() + .map(valueConvertor::fromBytes); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java index 9ab28a524f..781162a36f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java @@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncStateStorage; import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapDownloaderFactory; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapPersistedContext; import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncDownloader; import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState; import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldStateDownloader; @@ -49,6 +50,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory { private static final Logger LOG = LoggerFactory.getLogger(CheckpointDownloaderFactory.class); public static Optional> createCheckpointDownloader( + final SnapPersistedContext snapContext, final PivotBlockSelector pivotBlockSelector, final SynchronizerConfiguration syncConfig, final Path dataDirectory, @@ -119,13 +121,13 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory { new SnapSyncState( fastSyncStateStorage.loadState( ScheduleBasedBlockHeaderFunctions.create(protocolSchedule))); - worldStateStorage.clear(); final InMemoryTasksPriorityQueues snapTaskCollection = createSnapWorldStateDownloaderTaskCollection(); final WorldStateDownloader snapWorldStateDownloader = new SnapWorldStateDownloader( ethContext, + snapContext, protocolContext, worldStateStorage, snapTaskCollection, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java index 3d3b04ba5d..e977eb736c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java @@ -46,6 +46,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory { private static final Logger LOG = LoggerFactory.getLogger(SnapDownloaderFactory.class); public static Optional> createSnapDownloader( + final SnapPersistedContext snapContext, final PivotBlockSelector pivotBlockSelector, final SynchronizerConfiguration syncConfig, final Path dataDirectory, @@ -86,13 +87,13 @@ public class SnapDownloaderFactory extends FastDownloaderFactory { new SnapSyncState( fastSyncStateStorage.loadState( ScheduleBasedBlockHeaderFunctions.create(protocolSchedule))); - worldStateStorage.clear(); final InMemoryTasksPriorityQueues snapTaskCollection = createSnapWorldStateDownloaderTaskCollection(); final WorldStateDownloader snapWorldStateDownloader = new SnapWorldStateDownloader( ethContext, + snapContext, protocolContext, worldStateStorage, snapTaskCollection, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapPersistedContext.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapPersistedContext.java new file mode 100644 index 0000000000..905ce6d54a --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapPersistedContext.java @@ -0,0 +1,138 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import org.hyperledger.besu.ethereum.eth.sync.backwardsync.GenericKeyValueStorageFacade; +import org.hyperledger.besu.ethereum.eth.sync.backwardsync.ValueConvertor; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput; +import org.hyperledger.besu.ethereum.storage.StorageProvider; +import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier; + +import java.io.IOException; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.tuweni.bytes.Bytes; + +public class SnapPersistedContext { + + private final byte[] SNAP_INCONSISTENT_ACCOUNT_INDEX = + "snapInconsistentAccountsStorageIndex".getBytes(StandardCharsets.UTF_8); + + private final GenericKeyValueStorageFacade + accountRangeToDownload; + private final GenericKeyValueStorageFacade inconsistentAccounts; + + public SnapPersistedContext(final StorageProvider storageProvider) { + this.accountRangeToDownload = + new GenericKeyValueStorageFacade<>( + BigInteger::toByteArray, + new ValueConvertor<>() { + @Override + public AccountRangeDataRequest fromBytes(final byte[] bytes) { + return AccountRangeDataRequest.deserialize( + new BytesValueRLPInput(Bytes.of(bytes), false)); + } + + @Override + public byte[] toBytes(final AccountRangeDataRequest value) { + return value.serialize().toArrayUnsafe(); + } + }, + storageProvider.getStorageBySegmentIdentifier( + KeyValueSegmentIdentifier.SNAPSYNC_MISSING_ACCOUNT_RANGE)); + this.inconsistentAccounts = + new GenericKeyValueStorageFacade<>( + BigInteger::toByteArray, + new ValueConvertor<>() { + @Override + public Bytes fromBytes(final byte[] bytes) { + return Bytes.of(bytes); + } + + @Override + public byte[] toBytes(final Bytes value) { + return value.toArrayUnsafe(); + } + }, + storageProvider.getStorageBySegmentIdentifier( + KeyValueSegmentIdentifier.SNAPSYNC_ACCOUNT_TO_FIX)); + } + + public void updatePersistedTasks(final List accountRangeDataRequests) { + accountRangeToDownload.clear(); + accountRangeToDownload.putAll( + keyValueStorageTransaction -> + IntStream.range(0, accountRangeDataRequests.size()) + .forEach( + index -> + keyValueStorageTransaction.put( + BigInteger.valueOf(index).toByteArray(), + ((AccountRangeDataRequest) accountRangeDataRequests.get(index)) + .serialize() + .toArrayUnsafe()))); + } + + public void addInconsistentAccount(final Bytes inconsistentAccount) { + final BigInteger index = + inconsistentAccounts + .get(SNAP_INCONSISTENT_ACCOUNT_INDEX) + .map(bytes -> new BigInteger(bytes.toArrayUnsafe()).add(BigInteger.ONE)) + .orElse(BigInteger.ZERO); + inconsistentAccounts.putAll( + keyValueStorageTransaction -> { + keyValueStorageTransaction.put(SNAP_INCONSISTENT_ACCOUNT_INDEX, index.toByteArray()); + keyValueStorageTransaction.put(index.toByteArray(), inconsistentAccount.toArrayUnsafe()); + }); + } + + public List getPersistedTasks() { + return accountRangeToDownload + .streamValuesFromKeysThat(bytes -> true) + .collect(Collectors.toList()); + } + + public HashSet getInconsistentAccounts() { + return inconsistentAccounts + .streamValuesFromKeysThat(notEqualsTo(SNAP_INCONSISTENT_ACCOUNT_INDEX)) + .collect(Collectors.toCollection(HashSet::new)); + } + + public void clearAccountRangeTasks() { + accountRangeToDownload.clear(); + } + + public void clear() { + accountRangeToDownload.clear(); + inconsistentAccounts.clear(); + } + + public void close() throws IOException { + accountRangeToDownload.close(); + inconsistentAccounts.close(); + } + + private Predicate notEqualsTo(final byte[] name) { + return key -> !Arrays.equals(key, name); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java index b9b8a78f46..9a5867275d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.OptionalLong; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Stream; @@ -57,9 +58,11 @@ public class SnapWorldDownloadState extends WorldDownloadState new InMemoryTaskQueue<>(); protected final InMemoryTasksPriorityQueues pendingTrieNodeRequests = new InMemoryTasksPriorityQueues<>(); - public final HashSet inconsistentAccounts = new HashSet<>(); + public HashSet inconsistentAccounts = new HashSet<>(); private DynamicPivotBlockManager dynamicPivotBlockManager; + + private final SnapPersistedContext snapContext; private final SnapSyncState snapSyncState; // blockchain @@ -71,6 +74,7 @@ public class SnapWorldDownloadState extends WorldDownloadState public SnapWorldDownloadState( final WorldStateStorage worldStateStorage, + final SnapPersistedContext snapContext, final Blockchain blockchain, final SnapSyncState snapSyncState, final InMemoryTasksPriorityQueues pendingRequests, @@ -84,6 +88,7 @@ public class SnapWorldDownloadState extends WorldDownloadState maxRequestsWithoutProgress, minMillisBeforeStalling, clock); + this.snapContext = snapContext; this.blockchain = blockchain; this.snapSyncState = snapSyncState; this.metricsManager = metricsManager; @@ -156,6 +161,7 @@ public class SnapWorldDownloadState extends WorldDownloadState updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData); updater.commit(); metricsManager.notifySnapSyncCompleted(); + snapContext.clear(); internalFuture.complete(null); return true; } @@ -175,10 +181,12 @@ public class SnapWorldDownloadState extends WorldDownloadState } public synchronized void startHeal() { + snapContext.clearAccountRangeTasks(); snapSyncState.setHealStatus(true); // try to find new pivot block before healing dynamicPivotBlockManager.switchToNewPivotBlock( (blockHeader, newPivotBlockFound) -> { + snapContext.clearAccountRangeTasks(); LOG.info( "Running world state heal process from peers with pivot block {}", blockHeader.getNumber()); @@ -216,8 +224,15 @@ public class SnapWorldDownloadState extends WorldDownloadState } } - public void addInconsistentAccount(final Bytes account) { - inconsistentAccounts.add(account); + public synchronized void setInconsistentAccounts(final HashSet inconsistentAccounts) { + this.inconsistentAccounts = inconsistentAccounts; + } + + public synchronized void addInconsistentAccount(final Bytes account) { + if (!inconsistentAccounts.contains(account)) { + snapContext.addInconsistentAccount(account); + inconsistentAccounts.add(account); + } } @Override @@ -229,23 +244,28 @@ public class SnapWorldDownloadState extends WorldDownloadState public synchronized Task dequeueRequestBlocking( final List> queueDependencies, - final List> queues) { + final TaskCollection queue, + final Consumer unBlocked) { + boolean isWaiting = false; while (!internalFuture.isDone()) { while (queueDependencies.stream() .map(TaskCollection::allTasksCompleted) .anyMatch(Predicate.isEqual(false))) { try { + isWaiting = true; wait(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); return null; } } - for (TaskCollection queue : queues) { - Task task = queue.remove(); - if (task != null) { - return task; - } + if (isWaiting) { + unBlocked.accept(null); + } + isWaiting = false; + Task task = queue.remove(); + if (task != null) { + return task; } try { @@ -261,25 +281,27 @@ public class SnapWorldDownloadState extends WorldDownloadState public synchronized Task dequeueAccountRequestBlocking() { return dequeueRequestBlocking( List.of(pendingStorageRequests, pendingBigStorageRequests, pendingCodeRequests), - List.of(pendingAccountRequests)); + pendingAccountRequests, + unused -> snapContext.updatePersistedTasks(pendingAccountRequests.asList())); } public synchronized Task dequeueBigStorageRequestBlocking() { - return dequeueRequestBlocking(Collections.emptyList(), List.of(pendingBigStorageRequests)); + return dequeueRequestBlocking(Collections.emptyList(), pendingBigStorageRequests, __ -> {}); } public synchronized Task dequeueStorageRequestBlocking() { - return dequeueRequestBlocking(Collections.emptyList(), List.of(pendingStorageRequests)); + return dequeueRequestBlocking(Collections.emptyList(), pendingStorageRequests, __ -> {}); } public synchronized Task dequeueCodeRequestBlocking() { - return dequeueRequestBlocking(List.of(pendingStorageRequests), List.of(pendingCodeRequests)); + return dequeueRequestBlocking(List.of(pendingStorageRequests), pendingCodeRequests, __ -> {}); } public synchronized Task dequeueTrieNodeRequestBlocking() { return dequeueRequestBlocking( List.of(pendingAccountRequests, pendingStorageRequests, pendingBigStorageRequests), - List.of(pendingTrieNodeRequests)); + pendingTrieNodeRequests, + __ -> {}); } public SnapsyncMetricsManager getMetricsManager() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java index 4d4539f4b9..25c999185f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java @@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; @@ -30,6 +31,8 @@ import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues; import java.time.Clock; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.IntSupplier; +import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +53,7 @@ public class SnapWorldStateDownloader implements WorldStateDownloader { private final MetricsSystem metricsSystem; private final EthContext ethContext; + private final SnapPersistedContext snapContext; private final InMemoryTasksPriorityQueues snapTaskCollection; private final SnapSyncConfiguration snapSyncConfiguration; private final int maxOutstandingRequests; @@ -60,6 +65,7 @@ public class SnapWorldStateDownloader implements WorldStateDownloader { public SnapWorldStateDownloader( final EthContext ethContext, + final SnapPersistedContext snapContext, final ProtocolContext protocolContext, final WorldStateStorage worldStateStorage, final InMemoryTasksPriorityQueues snapTaskCollection, @@ -72,6 +78,7 @@ public class SnapWorldStateDownloader implements WorldStateDownloader { this.ethContext = ethContext; this.protocolContext = protocolContext; this.worldStateStorage = worldStateStorage; + this.snapContext = snapContext; this.snapTaskCollection = snapTaskCollection; this.snapSyncConfiguration = snapSyncConfiguration; this.maxOutstandingRequests = maxOutstandingRequests; @@ -128,6 +135,7 @@ public class SnapWorldStateDownloader implements WorldStateDownloader { final SnapWorldDownloadState newDownloadState = new SnapWorldDownloadState( worldStateStorage, + snapContext, protocolContext.getBlockchain(), snapSyncState, snapTaskCollection, @@ -138,10 +146,33 @@ public class SnapWorldStateDownloader implements WorldStateDownloader { final Map ranges = RangeManager.generateAllRanges(16); snapsyncMetricsManager.initRange(ranges); - ranges.forEach( - (key, value) -> - newDownloadState.enqueueRequest( - createAccountRangeDataRequest(stateRoot, key, value))); + + final List persistedTasks = snapContext.getPersistedTasks(); + final HashSet inconsistentAccounts = snapContext.getInconsistentAccounts(); + + if (!persistedTasks.isEmpty()) { // continue to download worldstate ranges + newDownloadState.setInconsistentAccounts(inconsistentAccounts); + snapContext + .getPersistedTasks() + .forEach( + snapDataRequest -> { + snapsyncMetricsManager.notifyStateDownloaded( + snapDataRequest.getStartKeyHash(), snapDataRequest.getEndKeyHash()); + newDownloadState.enqueueRequest(snapDataRequest); + }); + } else if (!inconsistentAccounts.isEmpty()) { // restart only the heal step + snapSyncState.setHealStatus(true); + newDownloadState.setInconsistentAccounts(inconsistentAccounts); + newDownloadState.enqueueRequest( + SnapDataRequest.createAccountTrieNodeDataRequest( + stateRoot, Bytes.EMPTY, snapContext.getInconsistentAccounts())); + } else { // start from scratch + worldStateStorage.clear(); + ranges.forEach( + (key, value) -> + newDownloadState.enqueueRequest( + createAccountRangeDataRequest(stateRoot, key, value))); + } Optional maybeCompleteTask = Optional.of(new CompleteTaskStep(snapSyncState, metricsSystem)); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java index 519135a10a..433b624139 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java @@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState; import org.hyperledger.besu.ethereum.eth.sync.snapsync.StackTrie; import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider; import org.hyperledger.besu.ethereum.rlp.RLP; +import org.hyperledger.besu.ethereum.rlp.RLPInput; import org.hyperledger.besu.ethereum.trie.NodeUpdater; import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; @@ -197,4 +198,26 @@ public class AccountRangeDataRequest extends SnapDataRequest { public TreeMap getAccounts() { return stackTrie.getElement(startKeyHash).keys(); } + + public Bytes serialize() { + return RLP.encode( + out -> { + out.startList(); + out.writeByte(getRequestType().getValue()); + out.writeBytes(getRootHash()); + out.writeBytes(getStartKeyHash()); + out.writeBytes(getEndKeyHash()); + out.endList(); + }); + } + + public static AccountRangeDataRequest deserialize(final RLPInput in) { + in.enterList(); + in.skipNext(); // skip request type + final Hash rootHash = Hash.wrap(in.readBytes32()); + final Bytes32 startKeyHash = in.readBytes32(); + final Bytes32 endKeyHash = in.readBytes32(); + in.leaveList(); + return createAccountRangeDataRequest(rootHash, startKeyHash, endKeyHash); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/SnapDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/SnapDataRequest.java index a230d54905..68b6f57ea7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/SnapDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/SnapDataRequest.java @@ -34,8 +34,6 @@ import org.apache.tuweni.bytes.Bytes32; public abstract class SnapDataRequest implements TasksPriorityProvider { - public static final int MAX_CHILD = 16; - protected Optional possibleParent = Optional.empty(); protected int depth; protected long priority; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java index 195082863b..be7e8dafe0 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java @@ -78,6 +78,7 @@ public class SnapWorldDownloadStateTest { private final WorldStateDownloadProcess worldStateDownloadProcess = mock(WorldStateDownloadProcess.class); private final SnapSyncState snapSyncState = mock(SnapSyncState.class); + private final SnapPersistedContext snapContext = mock(SnapPersistedContext.class); private final SnapsyncMetricsManager metricsManager = mock(SnapsyncMetricsManager.class); private final Blockchain blockchain = mock(Blockchain.class); private final DynamicPivotBlockManager dynamicPivotBlockManager = @@ -113,6 +114,7 @@ public class SnapWorldDownloadStateTest { downloadState = new SnapWorldDownloadState( worldStateStorage, + snapContext, blockchain, snapSyncState, pendingRequests, diff --git a/plugin-api/build.gradle b/plugin-api/build.gradle index 6d44679323..6376c11912 100644 --- a/plugin-api/build.gradle +++ b/plugin-api/build.gradle @@ -28,6 +28,7 @@ jar { } dependencies { + api 'org.apache.commons:commons-lang3' api 'org.apache.tuweni:tuweni-bytes' api 'org.apache.tuweni:tuweni-units' } @@ -65,7 +66,7 @@ Calculated : ${currentHash} tasks.register('checkAPIChanges', FileStateChecker) { description = "Checks that the API for the Plugin-API project does not change without deliberate thought" files = sourceSets.main.allJava.files - knownHash = 't1ECxSKuhCHrMq9uKYC9datxfFqqTpCPc6GFmUfC8Pg=' + knownHash = 'QC/7QGfjlWA5tfyfQdf/esATYzLfZbeJ9AnLKkaCy3Q=' } check.dependsOn('checkAPIChanges') diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorage.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorage.java index bf6cfa2b5c..dd84434969 100644 --- a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorage.java +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorage.java @@ -23,6 +23,8 @@ import java.util.Set; import java.util.function.Predicate; import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; + /** * Responsible for storing values against keys. * @@ -61,6 +63,14 @@ public interface KeyValueStorage extends Closeable { */ Optional get(byte[] key) throws StorageException; + /** + * Returns a stream of all keys and values. + * + * @return A stream of all keys and values in storage. + * @throws StorageException problem encountered during the retrieval attempt. + */ + Stream> stream() throws StorageException; + /** * Returns a stream of all keys. * @@ -89,6 +99,8 @@ public interface KeyValueStorage extends Closeable { */ Set getAllKeysThat(Predicate returnCondition); + Set getAllValuesFromKeysThat(final Predicate returnCondition); + /** * Begins a fresh transaction, for sequencing operations for later atomic execution. * diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDbKeyIterator.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDbIterator.java similarity index 60% rename from plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDbKeyIterator.java rename to plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDbIterator.java index 23e1cbcd0f..8be7f157ad 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDbKeyIterator.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDbIterator.java @@ -25,23 +25,24 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.apache.commons.lang3.tuple.Pair; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RocksDbKeyIterator implements Iterator, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(RocksDbKeyIterator.class); +public class RocksDbIterator implements Iterator>, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDbIterator.class); private final RocksIterator rocksIterator; private final AtomicBoolean closed = new AtomicBoolean(false); - private RocksDbKeyIterator(final RocksIterator rocksIterator) { + private RocksDbIterator(final RocksIterator rocksIterator) { this.rocksIterator = rocksIterator; } - public static RocksDbKeyIterator create(final RocksIterator rocksIterator) { - return new RocksDbKeyIterator(rocksIterator); + public static RocksDbIterator create(final RocksIterator rocksIterator) { + return new RocksDbIterator(rocksIterator); } @Override @@ -51,7 +52,25 @@ public class RocksDbKeyIterator implements Iterator, AutoCloseable { } @Override - public byte[] next() { + public Pair next() { + assertOpen(); + try { + rocksIterator.status(); + } catch (final RocksDBException e) { + LOG.error( + String.format("%s encountered a problem while iterating.", getClass().getSimpleName()), + e); + } + if (!hasNext()) { + throw new NoSuchElementException(); + } + final byte[] key = rocksIterator.key(); + final byte[] value = rocksIterator.value(); + rocksIterator.next(); + return Pair.of(key, value); + } + + public byte[] nextKey() { assertOpen(); try { rocksIterator.status(); @@ -68,9 +87,9 @@ public class RocksDbKeyIterator implements Iterator, AutoCloseable { return key; } - public Stream toStream() { + public Stream> toStream() { assertOpen(); - final Spliterator spliterator = + final Spliterator> spliterator = Spliterators.spliteratorUnknownSize( this, Spliterator.IMMUTABLE @@ -82,6 +101,30 @@ public class RocksDbKeyIterator implements Iterator, AutoCloseable { return StreamSupport.stream(spliterator, false).onClose(this::close); } + public Stream toStreamKeys() { + assertOpen(); + final Spliterator spliterator = + Spliterators.spliteratorUnknownSize( + new Iterator<>() { + @Override + public boolean hasNext() { + return RocksDbIterator.this.hasNext(); + } + + @Override + public byte[] next() { + return RocksDbIterator.this.nextKey(); + } + }, + Spliterator.IMMUTABLE + | Spliterator.DISTINCT + | Spliterator.NONNULL + | Spliterator.ORDERED + | Spliterator.SORTED); + + return StreamSupport.stream(spliterator, false).onClose(this::close); + } + private void assertOpen() { checkState( !closed.get(), diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueSnapshot.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueSnapshot.java index 834b14f941..b4554ffa18 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueSnapshot.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueSnapshot.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.function.Predicate; import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; import org.rocksdb.OptimisticTransactionDB; public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage { @@ -54,6 +55,11 @@ public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage { return snapTx.get(key); } + @Override + public Stream> stream() { + return snapTx.stream(); + } + @Override public Stream streamKeys() { return snapTx.streamKeys(); @@ -70,6 +76,14 @@ public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage { return streamKeys().filter(returnCondition).collect(toUnmodifiableSet()); } + @Override + public Set getAllValuesFromKeysThat(final Predicate returnCondition) { + return stream() + .filter(pair -> returnCondition.test(pair.getKey())) + .map(Pair::getValue) + .collect(toUnmodifiableSet()); + } + @Override public KeyValueStorageTransaction startTransaction() throws StorageException { // The use of a transaction on a transaction based key value store is dubious diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java index f86a14dfed..2333e53c1d 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java @@ -23,7 +23,7 @@ import org.hyperledger.besu.plugin.services.metrics.OperationTimer; import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory; -import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbKeyIterator; +import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbIterator; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbUtil; import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration; @@ -42,6 +42,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang3.tuple.Pair; import org.apache.tuweni.bytes.Bytes; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.ColumnFamilyDescriptor; @@ -223,11 +224,18 @@ public class RocksDBColumnarKeyValueStorage new RocksDbTransaction(db.beginTransaction(writeOptions), writeOptions)); } + @Override + public Stream> stream(final RocksDbSegmentIdentifier segmentHandle) { + final RocksIterator rocksIterator = db.newIterator(segmentHandle.get()); + rocksIterator.seekToFirst(); + return RocksDbIterator.create(rocksIterator).toStream(); + } + @Override public Stream streamKeys(final RocksDbSegmentIdentifier segmentHandle) { final RocksIterator rocksIterator = db.newIterator(segmentHandle.get()); rocksIterator.seekToFirst(); - return RocksDbKeyIterator.create(rocksIterator).toStream(); + return RocksDbIterator.create(rocksIterator).toStreamKeys(); } @Override @@ -247,7 +255,19 @@ public class RocksDBColumnarKeyValueStorage @Override public Set getAllKeysThat( final RocksDbSegmentIdentifier segmentHandle, final Predicate returnCondition) { - return streamKeys(segmentHandle).filter(returnCondition).collect(toUnmodifiableSet()); + return stream(segmentHandle) + .filter(pair -> returnCondition.test(pair.getKey())) + .map(Pair::getKey) + .collect(toUnmodifiableSet()); + } + + @Override + public Set getAllValuesFromKeysThat( + final RocksDbSegmentIdentifier segmentHandle, final Predicate returnCondition) { + return stream(segmentHandle) + .filter(pair -> returnCondition.test(pair.getKey())) + .map(Pair::getValue) + .collect(toUnmodifiableSet()); } @Override diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBSnapshotTransaction.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBSnapshotTransaction.java index 5f6d55ee30..659d4c5ce1 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBSnapshotTransaction.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBSnapshotTransaction.java @@ -19,11 +19,12 @@ import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.metrics.OperationTimer; import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics; -import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbKeyIterator; +import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbIterator; import java.util.Optional; import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.ReadOptions; @@ -108,10 +109,16 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A } } + public Stream> stream() { + final RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions); + rocksIterator.seekToFirst(); + return RocksDbIterator.create(rocksIterator).toStream(); + } + public Stream streamKeys() { final RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions); rocksIterator.seekToFirst(); - return RocksDbKeyIterator.create(rocksIterator).toStream(); + return RocksDbIterator.create(rocksIterator).toStreamKeys(); } @Override diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBKeyValueStorage.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBKeyValueStorage.java index a1fe47dccf..7b8d8aa3a1 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBKeyValueStorage.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBKeyValueStorage.java @@ -23,7 +23,7 @@ import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory; -import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbKeyIterator; +import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbIterator; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbUtil; import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration; import org.hyperledger.besu.services.kvstore.KeyValueStorageTransactionTransitionValidatorDecorator; @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.LRUCache; import org.rocksdb.OptimisticTransactionDB; @@ -121,14 +122,32 @@ public class RocksDBKeyValueStorage implements KeyValueStorage { @Override public Set getAllKeysThat(final Predicate returnCondition) { - return streamKeys().filter(returnCondition).collect(toUnmodifiableSet()); + return stream() + .filter(pair -> returnCondition.test(pair.getKey())) + .map(Pair::getKey) + .collect(toUnmodifiableSet()); + } + + @Override + public Stream> stream() { + final RocksIterator rocksIterator = db.newIterator(); + rocksIterator.seekToFirst(); + return RocksDbIterator.create(rocksIterator).toStream(); } @Override public Stream streamKeys() { final RocksIterator rocksIterator = db.newIterator(); rocksIterator.seekToFirst(); - return RocksDbKeyIterator.create(rocksIterator).toStream(); + return RocksDbIterator.create(rocksIterator).toStreamKeys(); + } + + @Override + public Set getAllValuesFromKeysThat(final Predicate returnCondition) { + return stream() + .filter(pair -> returnCondition.test(pair.getKey())) + .map(Pair::getValue) + .collect(toUnmodifiableSet()); } @Override diff --git a/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java index d1c0d7fe7e..39ec648104 100644 --- a/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java +++ b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.Consumer; +import org.apache.commons.lang3.tuple.Pair; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -114,21 +115,21 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT tx.put(barSegment, bytesOf(6), bytesOf(6)); tx.commit(); - store - .streamKeys(fooSegment) + store.stream(fooSegment) + .map(Pair::getKey) .forEach( key -> { if (!Arrays.equals(key, bytesOf(3))) store.tryDelete(fooSegment, key); }); - store - .streamKeys(barSegment) + store.stream(barSegment) + .map(Pair::getKey) .forEach( key -> { if (!Arrays.equals(key, bytesOf(4))) store.tryDelete(barSegment, key); }); for (final RocksDbSegmentIdentifier segment : Set.of(fooSegment, barSegment)) { - assertThat(store.streamKeys(segment).count()).isEqualTo(1); + assertThat(store.stream(segment).count()).isEqualTo(1); } assertThat(store.get(fooSegment, bytesOf(1))).isEmpty(); diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java index 8610b930b3..f496813c39 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java @@ -33,6 +33,7 @@ import java.util.function.Predicate; import java.util.stream.Stream; import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang3.tuple.Pair; import org.apache.tuweni.bytes.Bytes; public class InMemoryKeyValueStorage implements KeyValueStorage { @@ -77,7 +78,30 @@ public class InMemoryKeyValueStorage implements KeyValueStorage { @Override public Set getAllKeysThat(final Predicate returnCondition) { - return streamKeys().filter(returnCondition).collect(toUnmodifiableSet()); + return stream() + .filter(pair -> returnCondition.test(pair.getKey())) + .map(Pair::getKey) + .collect(toUnmodifiableSet()); + } + + @Override + public Set getAllValuesFromKeysThat(final Predicate returnCondition) { + return stream() + .filter(pair -> returnCondition.test(pair.getKey())) + .map(Pair::getValue) + .collect(toUnmodifiableSet()); + } + + @Override + public Stream> stream() { + final Lock lock = rwLock.readLock(); + lock.lock(); + try { + return ImmutableSet.copyOf(hashValueStore.entrySet()).stream() + .map(bytesEntry -> Pair.of(bytesEntry.getKey().toArrayUnsafe(), bytesEntry.getValue())); + } finally { + lock.unlock(); + } } @Override @@ -85,7 +109,8 @@ public class InMemoryKeyValueStorage implements KeyValueStorage { final Lock lock = rwLock.readLock(); lock.lock(); try { - return ImmutableSet.copyOf(hashValueStore.keySet()).stream().map(Bytes::toArrayUnsafe); + return ImmutableSet.copyOf(hashValueStore.entrySet()).stream() + .map(bytesEntry -> bytesEntry.getKey().toArrayUnsafe()); } finally { lock.unlock(); } diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LimitedInMemoryKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LimitedInMemoryKeyValueStorage.java index 63dce40a48..4b431e6f81 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LimitedInMemoryKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LimitedInMemoryKeyValueStorage.java @@ -34,6 +34,7 @@ import java.util.stream.Stream; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang3.tuple.Pair; import org.apache.tuweni.bytes.Bytes; /** @@ -81,7 +82,30 @@ public class LimitedInMemoryKeyValueStorage implements KeyValueStorage { @Override public Set getAllKeysThat(final Predicate returnCondition) { - return streamKeys().filter(returnCondition).collect(toUnmodifiableSet()); + return stream() + .filter(pair -> returnCondition.test(pair.getKey())) + .map(Pair::getKey) + .collect(toUnmodifiableSet()); + } + + @Override + public Set getAllValuesFromKeysThat(final Predicate returnCondition) { + return stream() + .filter(pair -> returnCondition.test(pair.getKey())) + .map(Pair::getValue) + .collect(toUnmodifiableSet()); + } + + @Override + public Stream> stream() { + final Lock lock = rwLock.readLock(); + lock.lock(); + try { + return ImmutableSet.copyOf(storage.asMap().entrySet()).stream() + .map(bytesEntry -> Pair.of(bytesEntry.getKey().toArrayUnsafe(), bytesEntry.getValue())); + } finally { + lock.unlock(); + } } @Override @@ -89,7 +113,8 @@ public class LimitedInMemoryKeyValueStorage implements KeyValueStorage { final Lock lock = rwLock.readLock(); lock.lock(); try { - return ImmutableSet.copyOf(storage.asMap().keySet()).stream().map(Bytes::toArrayUnsafe); + return ImmutableSet.copyOf(storage.asMap().entrySet()).stream() + .map(bytesEntry -> bytesEntry.getKey().toArrayUnsafe()); } finally { lock.unlock(); } diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java index 38af4b6bf0..a00d950ac1 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java @@ -23,6 +23,8 @@ import java.util.Set; import java.util.function.Predicate; import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; + /** * Service provided by Besu to facilitate persistent data storage. * @@ -58,8 +60,9 @@ public interface SegmentedKeyValueStorage extends Closeable { * @param segmentHandle The segment handle whose keys we want to stream. * @return A stream of all keys in the specified segment. */ - Stream streamKeys(final S segmentHandle); + Stream> stream(final S segmentHandle); + Stream streamKeys(final S segmentHandle); /** * Delete the value corresponding to the given key in the given segment if a write lock can be * instantly acquired on the underlying storage. Do nothing otherwise. @@ -74,6 +77,8 @@ public interface SegmentedKeyValueStorage extends Closeable { Set getAllKeysThat(S segmentHandle, Predicate returnCondition); + Set getAllValuesFromKeysThat(final S segmentHandle, Predicate returnCondition); + void clear(S segmentHandle); /** diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java index 23157f584d..737e715370 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java @@ -27,7 +27,10 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; + public class SegmentedKeyValueStorageAdapter implements SnappableKeyValueStorage { + private final S segmentHandle; private final SegmentedKeyValueStorage storage; private final Supplier snapshotSupplier; @@ -71,6 +74,16 @@ public class SegmentedKeyValueStorageAdapter implements SnappableKeyValueStor return storage.getAllKeysThat(segmentHandle, returnCondition); } + @Override + public Set getAllValuesFromKeysThat(final Predicate returnCondition) { + return storage.getAllValuesFromKeysThat(segmentHandle, returnCondition); + } + + @Override + public Stream> stream() { + return storage.stream(segmentHandle); + } + @Override public Stream streamKeys() { return storage.streamKeys(segmentHandle); diff --git a/services/tasks/src/main/java/org/hyperledger/besu/services/tasks/FlatFileTaskCollection.java b/services/tasks/src/main/java/org/hyperledger/besu/services/tasks/FlatFileTaskCollection.java deleted file mode 100644 index 579d418550..0000000000 --- a/services/tasks/src/main/java/org/hyperledger/besu/services/tasks/FlatFileTaskCollection.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.services.tasks; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.tuweni.bytes.Bytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FlatFileTaskCollection implements TaskCollection { - private static final Logger LOG = LoggerFactory.getLogger(FlatFileTaskCollection.class); - private static final long DEFAULT_FILE_ROLL_SIZE_BYTES = 1024 * 1024 * 10; // 10Mb - static final String FILENAME_PREFIX = "tasks"; - private final Set> outstandingTasks = new HashSet<>(); - - private final Path storageDirectory; - private final Function serializer; - private final Function deserializer; - private final long rollWhenFileSizeExceedsBytes; - - private final ByteBuffer lengthBuffer = ByteBuffer.allocate(Integer.BYTES); - - private FileChannel readFileChannel; - private FileChannel writeFileChannel; - - private long size = 0; - private int readFileNumber = 0; - private int writeFileNumber = 0; - - public FlatFileTaskCollection( - final Path storageDirectory, - final Function serializer, - final Function deserializer) { - this(storageDirectory, serializer, deserializer, DEFAULT_FILE_ROLL_SIZE_BYTES); - } - - FlatFileTaskCollection( - final Path storageDirectory, - final Function serializer, - final Function deserializer, - final long rollWhenFileSizeExceedsBytes) { - this.storageDirectory = storageDirectory; - this.serializer = serializer; - this.deserializer = deserializer; - this.rollWhenFileSizeExceedsBytes = rollWhenFileSizeExceedsBytes; - writeFileChannel = openWriteFileChannel(writeFileNumber); - readFileChannel = openReadFileChannel(readFileNumber); - } - - private FileChannel openReadFileChannel(final int fileNumber) { - try { - return FileChannel.open( - pathForFileNumber(fileNumber), - StandardOpenOption.DELETE_ON_CLOSE, - StandardOpenOption.READ); - } catch (final IOException e) { - throw new StorageException(e); - } - } - - private FileChannel openWriteFileChannel(final int fileNumber) { - try { - return FileChannel.open( - pathForFileNumber(fileNumber), - StandardOpenOption.TRUNCATE_EXISTING, - StandardOpenOption.WRITE, - StandardOpenOption.CREATE); - } catch (final IOException e) { - throw new StorageException( - "There was a problem opening FileChannel " + pathForFileNumber(fileNumber), e); - } - } - - @Override - public synchronized void add(final T taskData) { - final Bytes data = serializer.apply(taskData); - try { - writeTaskData(data); - size++; - if (writeFileChannel.size() > rollWhenFileSizeExceedsBytes) { - LOG.debug("Writing reached end of file {}", writeFileNumber); - writeFileChannel.close(); - writeFileNumber++; - writeFileChannel = openWriteFileChannel(writeFileNumber); - } - } catch (final IOException e) { - throw new StorageException( - "There was a problem adding to FileChannel " + pathForFileNumber(writeFileNumber), e); - } - } - - @Override - public synchronized Task remove() { - if (isEmpty()) { - return null; - } - try { - final ByteBuffer dataBuffer = readNextTaskData(); - final T data = deserializer.apply(Bytes.wrapByteBuffer(dataBuffer)); - final FlatFileTask task = new FlatFileTask<>(this, data); - outstandingTasks.add(task); - size--; - return task; - } catch (final IOException e) { - throw new StorageException( - "There was a problem removing from FileChannel " + pathForFileNumber(readFileNumber), e); - } - } - - private ByteBuffer readNextTaskData() throws IOException { - final int dataLength = readDataLength(); - final ByteBuffer dataBuffer = ByteBuffer.allocate(dataLength); - readBytes(dataBuffer, dataLength); - return dataBuffer; - } - - private void writeTaskData(final Bytes data) throws IOException { - final long offset = writeFileChannel.size(); - writeDataLength(data.size(), offset); - writeFileChannel.write(ByteBuffer.wrap(data.toArrayUnsafe()), offset + Integer.BYTES); - } - - private int readDataLength() throws IOException { - lengthBuffer.position(0); - lengthBuffer.limit(Integer.BYTES); - readBytes(lengthBuffer, Integer.BYTES); - return lengthBuffer.getInt(0); - } - - private void writeDataLength(final int size, final long offset) throws IOException { - lengthBuffer.position(0); - lengthBuffer.putInt(size); - lengthBuffer.flip(); - writeFileChannel.write(lengthBuffer, offset); - } - - private void readBytes(final ByteBuffer buffer, final int expectedLength) throws IOException { - int readBytes = readFileChannel.read(buffer); - - if (readBytes == -1 && writeFileNumber > readFileNumber) { - LOG.debug("Reading reached end of file {}", readFileNumber); - readFileChannel.close(); - readFileNumber++; - readFileChannel = openReadFileChannel(readFileNumber); - - readBytes = readFileChannel.read(buffer); - } - if (readBytes != expectedLength) { - throw new IllegalStateException( - "Task queue corrupted. Expected to read " - + expectedLength - + " bytes but only got " - + readBytes); - } - } - - @Override - public synchronized long size() { - return size; - } - - @Override - public synchronized boolean isEmpty() { - return size() == 0; - } - - @VisibleForTesting - int getReadFileNumber() { - return readFileNumber; - } - - @VisibleForTesting - int getWriteFileNumber() { - return writeFileNumber; - } - - @Override - public synchronized void clear() { - outstandingTasks.clear(); - try { - readFileChannel.close(); - writeFileChannel.close(); - for (int i = readFileNumber; i <= writeFileNumber; i++) { - final File file = pathForFileNumber(i).toFile(); - if (!file.delete() && file.exists()) { - LOG.error("Failed to delete tasks file {}", file.getAbsolutePath()); - } - } - readFileNumber = 0; - writeFileNumber = 0; - writeFileChannel = openWriteFileChannel(writeFileNumber); - readFileChannel = openReadFileChannel(readFileNumber); - size = 0; - } catch (final IOException e) { - throw new StorageException(e); - } - } - - @Override - public synchronized boolean allTasksCompleted() { - return isEmpty() && outstandingTasks.isEmpty(); - } - - @Override - public synchronized void close() { - try { - readFileChannel.close(); - writeFileChannel.close(); - } catch (final IOException e) { - throw new StorageException(e); - } - } - - private Path pathForFileNumber(final int fileNumber) { - return storageDirectory.resolve(FILENAME_PREFIX + fileNumber); - } - - private synchronized boolean markTaskCompleted(final FlatFileTask task) { - return outstandingTasks.remove(task); - } - - private synchronized void handleFailedTask(final FlatFileTask task) { - if (markTaskCompleted(task)) { - add(task.getData()); - } - } - - public static class StorageException extends RuntimeException { - StorageException(final Throwable t) { - super(t); - } - - StorageException(final String m, final Throwable t) { - super(m, t); - } - } - - private static class FlatFileTask implements Task { - private final AtomicBoolean completed = new AtomicBoolean(false); - private final FlatFileTaskCollection parentQueue; - private final T data; - - private FlatFileTask(final FlatFileTaskCollection parentQueue, final T data) { - this.parentQueue = parentQueue; - this.data = data; - } - - @Override - public T getData() { - return data; - } - - @Override - public void markCompleted() { - if (completed.compareAndSet(false, true)) { - parentQueue.markTaskCompleted(this); - } - } - - @Override - public void markFailed() { - if (completed.compareAndSet(false, true)) { - parentQueue.handleFailedTask(this); - } - } - } -} diff --git a/services/tasks/src/main/java/org/hyperledger/besu/services/tasks/InMemoryTaskQueue.java b/services/tasks/src/main/java/org/hyperledger/besu/services/tasks/InMemoryTaskQueue.java index 227ce0f36c..28418931b6 100644 --- a/services/tasks/src/main/java/org/hyperledger/besu/services/tasks/InMemoryTaskQueue.java +++ b/services/tasks/src/main/java/org/hyperledger/besu/services/tasks/InMemoryTaskQueue.java @@ -15,7 +15,9 @@ package org.hyperledger.besu.services.tasks; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -85,6 +87,10 @@ public class InMemoryTaskQueue implements TaskCollection { } } + public synchronized List asList() { + return new ArrayList<>(internalQueue); + } + private synchronized void handleFailedTask(final InMemoryTask task) { if (markTaskCompleted(task)) { add(task.getData()); diff --git a/services/tasks/src/test/java/org/hyperledger/besu/services/tasks/FlatFileTaskCollectionTest.java b/services/tasks/src/test/java/org/hyperledger/besu/services/tasks/FlatFileTaskCollectionTest.java deleted file mode 100644 index 0f9479065c..0000000000 --- a/services/tasks/src/test/java/org/hyperledger/besu/services/tasks/FlatFileTaskCollectionTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.services.tasks; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Function; - -import org.apache.tuweni.bytes.Bytes; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class FlatFileTaskCollectionTest - extends AbstractTaskQueueTest> { - - private static final int ROLL_SIZE = 10; - @Rule public final TemporaryFolder folder = new TemporaryFolder(); - - @Override - protected FlatFileTaskCollection createQueue() throws IOException { - final Path dataDir = folder.newFolder().toPath(); - return createQueue(dataDir); - } - - private FlatFileTaskCollection createQueue(final Path dataDir) { - return new FlatFileTaskCollection<>( - dataDir, Function.identity(), Function.identity(), ROLL_SIZE); - } - - @Test - public void shouldRollFilesWhenSizeExceeded() throws Exception { - final Path dataDir = folder.newFolder().toPath(); - try (final FlatFileTaskCollection queue = createQueue(dataDir)) { - final List tasks = new ArrayList<>(); - - addItem(queue, tasks, 0); - assertThat(queue.getWriteFileNumber()).isEqualTo(0); - int tasksInFirstFile = 1; - while (queue.getWriteFileNumber() == 0) { - addItem(queue, tasks, tasksInFirstFile); - tasksInFirstFile++; - } - - assertThat(queue.getWriteFileNumber()).isGreaterThan(0); - assertThat(queue.getReadFileNumber()).isEqualTo(0); - - // Add extra items to be sure we have at least one in a later file - addItem(queue, tasks, 123); - addItem(queue, tasks, 124); - - final List removedTasks = new ArrayList<>(); - // Read through all the items in the first file. - for (int i = 0; i < tasksInFirstFile; i++) { - removedTasks.add(queue.remove().getData()); - } - - // read one more to make sure we are reading from the next file - removedTasks.add(queue.remove().getData()); - assertThat(queue.getReadFileNumber()).isEqualTo(1); - - // Check that all tasks were read correctly. - removedTasks.add(queue.remove().getData()); - assertThat(queue.isEmpty()).isTrue(); - assertThat(removedTasks).isEqualTo(tasks); - } - } - - private void addItem( - final FlatFileTaskCollection queue, final List tasks, final int value) { - tasks.add(Bytes.of(value)); - queue.add(Bytes.of(value)); - } -} diff --git a/testutil/src/main/java/org/hyperledger/besu/kvstore/AbstractKeyValueStorageTest.java b/testutil/src/main/java/org/hyperledger/besu/kvstore/AbstractKeyValueStorageTest.java index c2668156e7..6801b43955 100644 --- a/testutil/src/main/java/org/hyperledger/besu/kvstore/AbstractKeyValueStorageTest.java +++ b/testutil/src/main/java/org/hyperledger/besu/kvstore/AbstractKeyValueStorageTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch; import java.util.function.Function; import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; import org.apache.tuweni.bytes.Bytes; import org.junit.Ignore; import org.junit.Test; @@ -83,7 +84,7 @@ public abstract class AbstractKeyValueStorageTest { .collect(toUnmodifiableList()); keys.forEach(key -> tx.put(key, bytesFromHexString("0ABC"))); tx.commit(); - assertThat(store.streamKeys().collect(toUnmodifiableSet())) + assertThat(store.stream().map(Pair::getKey).collect(toUnmodifiableSet())) .containsExactlyInAnyOrder(keys.toArray(new byte[][] {})); }