diff --git a/CHANGELOG.md b/CHANGELOG.md index d713519194..3cbb30538f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ permissions on the directory allow other users and groups to r/w. Ideally this s ### Bug Fixes + - MerkleTrieException when pruning should no longer occur. Using `--pruning-enabled=true` will no longer accidentaly delete keys in certain edge cases. [\#760](https://github.com/hyperledger/besu/pull/760). - Full help not displayed unless explicitly requested. [\#437](https://github.com/hyperledger/besu/pull/437) - Compatibility with undocumented Geth `eth_subscribe` fields [\#654](https://github.com/hyperledger/besu/pull/654) - Current block number included as part of `eth_getWork` response [\#849](https://github.com/hyperledger/besu/pull/849) diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/WorldStateKeyValueStorage.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/WorldStateKeyValueStorage.java index 9aaa124fb1..4f43853f19 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/WorldStateKeyValueStorage.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/WorldStateKeyValueStorage.java @@ -21,10 +21,14 @@ import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; import org.hyperledger.besu.util.Subscribers; -import java.util.ArrayList; -import java.util.List; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; +import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; @@ -33,6 +37,7 @@ public class WorldStateKeyValueStorage implements WorldStateStorage { private final Subscribers nodeAddedListeners = Subscribers.create(); private final KeyValueStorage keyValueStorage; + private final ReentrantLock lock = new ReentrantLock(); public WorldStateKeyValueStorage(final KeyValueStorage keyValueStorage) { this.keyValueStorage = keyValueStorage; @@ -83,12 +88,27 @@ public class WorldStateKeyValueStorage implements WorldStateStorage { @Override public Updater updater() { - return new Updater(keyValueStorage.startTransaction(), nodeAddedListeners); + return new Updater(lock, keyValueStorage.startTransaction(), nodeAddedListeners); } @Override public long prune(final Predicate inUseCheck) { - return keyValueStorage.removeAllKeysUnless(inUseCheck); + final AtomicInteger prunedKeys = new AtomicInteger(0); + try (final Stream keys = keyValueStorage.streamKeys()) { + keys.forEach( + key -> { + lock.lock(); + try { + if (!inUseCheck.test(key) && keyValueStorage.tryDelete(key)) { + prunedKeys.incrementAndGet(); + } + } finally { + lock.unlock(); + } + }); + } + + return prunedKeys.get(); } @Override @@ -105,11 +125,14 @@ public class WorldStateKeyValueStorage implements WorldStateStorage { private final KeyValueStorageTransaction transaction; private final Subscribers nodeAddedListeners; - private final List addedNodes = new ArrayList<>(); + private final Set addedNodes = new HashSet<>(); + private final Lock lock; public Updater( + final Lock lock, final KeyValueStorageTransaction transaction, final Subscribers nodeAddedListeners) { + this.lock = lock; this.transaction = transaction; this.nodeAddedListeners = nodeAddedListeners; } @@ -156,12 +179,18 @@ public class WorldStateKeyValueStorage implements WorldStateStorage { @Override public void commit() { - nodeAddedListeners.forEach(listener -> listener.onNodesAdded(addedNodes)); - transaction.commit(); + lock.lock(); + try { + nodeAddedListeners.forEach(listener -> listener.onNodesAdded(addedNodes)); + transaction.commit(); + } finally { + lock.unlock(); + } } @Override public void rollback() { + addedNodes.clear(); transaction.rollback(); } } diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java index 9d540af5a3..4c317080ac 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java @@ -56,8 +56,9 @@ public class MarkSweepPrunerTest { private final BlockDataGenerator gen = new BlockDataGenerator(); private final NoOpMetricsSystem metricsSystem = new NoOpMetricsSystem(); private final Map hashValueStore = spy(new HashMap<>()); - private final InMemoryKeyValueStorage stateStorage = spy(new TestInMemoryStorage(hashValueStore)); - private final WorldStateStorage worldStateStorage = new WorldStateKeyValueStorage(stateStorage); + private final InMemoryKeyValueStorage stateStorage = new TestInMemoryStorage(hashValueStore); + private final WorldStateStorage worldStateStorage = + spy(new WorldStateKeyValueStorage(stateStorage)); private final WorldStateArchive worldStateArchive = new WorldStateArchive( worldStateStorage, new WorldStatePreimageKeyValueStorage(new InMemoryKeyValueStorage())); @@ -137,11 +138,11 @@ public class MarkSweepPrunerTest { pruner.sweepBefore(markBlock.getNumber()); // Check stateRoots are marked first - InOrder inOrder = inOrder(hashValueStore, stateStorage); + InOrder inOrder = inOrder(hashValueStore, worldStateStorage); for (Bytes32 stateRoot : stateRoots) { inOrder.verify(hashValueStore).remove(stateRoot); } - inOrder.verify(stateStorage).removeAllKeysUnless(any()); + inOrder.verify(worldStateStorage).prune(any()); } @Test @@ -172,11 +173,11 @@ public class MarkSweepPrunerTest { pruner.sweepBefore(markBlock.getNumber()); // Check stateRoots are marked first - InOrder inOrder = inOrder(hashValueStore, stateStorage); + InOrder inOrder = inOrder(hashValueStore, worldStateStorage); for (Bytes32 stateRoot : stateRoots) { inOrder.verify(hashValueStore).remove(stateRoot); } - inOrder.verify(stateStorage).removeAllKeysUnless(any()); + inOrder.verify(worldStateStorage).prune(any()); assertThat(stateStorage.containsKey(markedRoot.toArray())).isTrue(); } diff --git a/plugin-api/build.gradle b/plugin-api/build.gradle index 6df9977a88..808d9ebd6b 100644 --- a/plugin-api/build.gradle +++ b/plugin-api/build.gradle @@ -64,7 +64,7 @@ Calculated : ${currentHash} tasks.register('checkAPIChanges', FileStateChecker) { description = "Checks that the API for the Plugin-API project does not change without deliberate thought" files = sourceSets.main.allJava.files - knownHash = 'wjORmE9I35CVqZIk78t8zgZUo9oIRy2fvu0x+CFnzaU=' + knownHash = 'x6hLH75JBKLM8mNFz7n0ALRw4TWiDONJuum53da4jTY=' } check.dependsOn('checkAPIChanges') diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorage.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorage.java index 5d4fd4a344..6ad03399b0 100644 --- a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorage.java +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorage.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Stream; /** * Responsible for storing values against keys. @@ -60,6 +61,14 @@ public interface KeyValueStorage extends Closeable { */ Optional get(byte[] key) throws StorageException; + /** + * Returns a stream of all keys. + * + * @return A stream of all keys in storage. + * @throws StorageException problem encountered during the retrieval attempt. + */ + Stream streamKeys() throws StorageException; + /** * Performs an evaluation against each key in the store, keeping the entries that pass, removing * those that fail. @@ -67,10 +76,21 @@ public interface KeyValueStorage extends Closeable { * @param retainCondition predicate to evaluate each key against, unless the result is {@code * null}, both the key and associated value must be removed. * @return the number of keys removed. - * @throws StorageException problem encountered when removing data. + * @throws StorageException problem encountered during the retrieval attempt. */ long removeAllKeysUnless(Predicate retainCondition) throws StorageException; + /** + * Delete the value corresponding to the given key if a write lock can be instantly acquired on + * the underlying storage. Do nothing otherwise. + * + * @param key The key to delete. + * @throws StorageException any problem encountered during the deletion attempt. + * @return false if the lock on the underlying storage could not be instantly acquired, true + * otherwise + */ + boolean tryDelete(byte[] key) throws StorageException; + /** * Performs an evaluation against each key in the store, returning the set of entries that pass. * diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDbKeyIterator.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDbKeyIterator.java new file mode 100644 index 0000000000..98064a6819 --- /dev/null +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDbKeyIterator.java @@ -0,0 +1,97 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.hyperledger.besu.plugin.services.storage.rocksdb; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; + +public class RocksDbKeyIterator implements Iterator, AutoCloseable { + private static final Logger LOG = LogManager.getLogger(); + + private final RocksIterator rocksIterator; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private RocksDbKeyIterator(final RocksIterator rocksIterator) { + this.rocksIterator = rocksIterator; + } + + public static RocksDbKeyIterator create(final RocksIterator rocksIterator) { + return new RocksDbKeyIterator(rocksIterator); + } + + @Override + public boolean hasNext() { + assertOpen(); + return rocksIterator.isValid(); + } + + @Override + public byte[] next() { + assertOpen(); + try { + rocksIterator.status(); + } catch (final RocksDBException e) { + LOG.error( + String.format("%s encountered a problem while iterating.", getClass().getSimpleName()), + e); + } + if (!hasNext()) { + throw new NoSuchElementException(); + } + final byte[] key = rocksIterator.key(); + rocksIterator.next(); + return key; + } + + public Stream toStream() { + assertOpen(); + final Spliterator spliterator = + Spliterators.spliteratorUnknownSize( + this, + Spliterator.IMMUTABLE + | Spliterator.DISTINCT + | Spliterator.NONNULL + | Spliterator.ORDERED + | Spliterator.SORTED); + + return StreamSupport.stream(spliterator, false).onClose(this::close); + } + + private void assertOpen() { + checkState( + !closed.get(), + String.format("Attempt to read from a closed %s", getClass().getSimpleName())); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + rocksIterator.close(); + } + } +} diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java index 300bb8c4bd..4bb7b7fc08 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented; import static java.util.Objects.requireNonNullElse; +import static java.util.stream.Collectors.toUnmodifiableSet; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.exception.StorageException; @@ -22,12 +23,12 @@ import org.hyperledger.besu.plugin.services.metrics.OperationTimer; import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory; +import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbKeyIterator; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbUtil; import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration; import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorage; import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageTransactionTransitionValidatorDecorator; -import java.io.Closeable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -37,9 +38,9 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes; @@ -53,12 +54,13 @@ import org.rocksdb.LRUCache; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.rocksdb.Statistics; +import org.rocksdb.Status; import org.rocksdb.TransactionDB; import org.rocksdb.TransactionDBOptions; import org.rocksdb.WriteOptions; public class RocksDBColumnarKeyValueStorage - implements SegmentedKeyValueStorage, Closeable { + implements SegmentedKeyValueStorage { static { RocksDbUtil.loadNativeLibrary(); @@ -73,6 +75,7 @@ public class RocksDBColumnarKeyValueStorage private final AtomicBoolean closed = new AtomicBoolean(false); private final Map columnHandlesByName; private final RocksDBMetrics metrics; + private final WriteOptions tryDeleteOptions = new WriteOptions().setNoSlowdown(true); public RocksDBColumnarKeyValueStorage( final RocksDBConfiguration configuration, @@ -128,7 +131,6 @@ public class RocksDBColumnarKeyValueStorage builder.put(segmentName, columnHandle); } columnHandlesByName = builder.build(); - } catch (final RocksDBException e) { throw new StorageException(e); } @@ -164,6 +166,13 @@ public class RocksDBColumnarKeyValueStorage new RocksDbTransaction(db.beginTransaction(options), options)); } + @Override + public Stream streamKeys(final ColumnFamilyHandle segmentHandle) { + final RocksIterator rocksIterator = db.newIterator(segmentHandle); + rocksIterator.seekToFirst(); + return RocksDbKeyIterator.create(rocksIterator).toStream(); + } + @Override public long removeAllKeysUnless( final ColumnFamilyHandle segmentHandle, final Predicate inUseCheck) { @@ -183,18 +192,23 @@ public class RocksDBColumnarKeyValueStorage } @Override - public Set getAllKeysThat( - final ColumnFamilyHandle segmentHandle, final Predicate returnCondition) { - final Set returnedKeys = Sets.newIdentityHashSet(); - try (final RocksIterator rocksIterator = db.newIterator(segmentHandle)) { - for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) { - final byte[] key = rocksIterator.key(); - if (returnCondition.test(key)) { - returnedKeys.add(key); - } + public boolean tryDelete(final ColumnFamilyHandle segmentHandle, final byte[] key) { + try { + db.delete(segmentHandle, tryDeleteOptions, key); + return true; + } catch (RocksDBException e) { + if (e.getStatus().getCode() == Status.Code.Incomplete) { + return false; + } else { + throw new StorageException(e); } } - return returnedKeys; + } + + @Override + public Set getAllKeysThat( + final ColumnFamilyHandle segmentHandle, final Predicate returnCondition) { + return streamKeys(segmentHandle).filter(returnCondition).collect(toUnmodifiableSet()); } @Override @@ -284,6 +298,7 @@ public class RocksDBColumnarKeyValueStorage } private void close() { + tryDeleteOptions.close(); innerTx.close(); options.close(); } diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBKeyValueStorage.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBKeyValueStorage.java index fede00bffd..f220bf63f0 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBKeyValueStorage.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBKeyValueStorage.java @@ -14,6 +14,8 @@ */ package org.hyperledger.besu.plugin.services.storage.rocksdb.unsegmented; +import static java.util.stream.Collectors.toUnmodifiableSet; + import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.metrics.OperationTimer; @@ -21,6 +23,7 @@ import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory; +import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbKeyIterator; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbUtil; import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration; import org.hyperledger.besu.services.kvstore.KeyValueStorageTransactionTransitionValidatorDecorator; @@ -29,8 +32,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; +import java.util.stream.Stream; -import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.rocksdb.BlockBasedTableConfig; @@ -39,6 +42,7 @@ import org.rocksdb.Options; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.rocksdb.Statistics; +import org.rocksdb.Status; import org.rocksdb.TransactionDB; import org.rocksdb.TransactionDBOptions; import org.rocksdb.WriteOptions; @@ -56,6 +60,7 @@ public class RocksDBKeyValueStorage implements KeyValueStorage { private final TransactionDB db; private final AtomicBoolean closed = new AtomicBoolean(false); private final RocksDBMetrics rocksDBMetrics; + private final WriteOptions tryDeleteOptions = new WriteOptions().setNoSlowdown(true); public RocksDBKeyValueStorage( final RocksDBConfiguration configuration, @@ -116,6 +121,18 @@ public class RocksDBKeyValueStorage implements KeyValueStorage { } } + @Override + public Set getAllKeysThat(final Predicate returnCondition) { + return streamKeys().filter(returnCondition).collect(toUnmodifiableSet()); + } + + @Override + public Stream streamKeys() { + final RocksIterator rocksIterator = db.newIterator(); + rocksIterator.seekToFirst(); + return RocksDbKeyIterator.create(rocksIterator).toStream(); + } + @Override public long removeAllKeysUnless(final Predicate retainCondition) throws StorageException { long removedNodeCounter = 0; @@ -134,17 +151,17 @@ public class RocksDBKeyValueStorage implements KeyValueStorage { } @Override - public Set getAllKeysThat(final Predicate returnCondition) { - final Set returnedKeys = Sets.newIdentityHashSet(); - try (final RocksIterator rocksIterator = db.newIterator()) { - for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) { - final byte[] key = rocksIterator.key(); - if (returnCondition.test(key)) { - returnedKeys.add(key); - } + public boolean tryDelete(final byte[] key) { + try { + db.delete(tryDeleteOptions, key); + return true; + } catch (RocksDBException e) { + if (e.getStatus().getCode() == Status.Code.Incomplete) { + return false; + } else { + throw new StorageException(e); } } - return returnedKeys; } @Override @@ -158,6 +175,7 @@ public class RocksDBKeyValueStorage implements KeyValueStorage { @Override public void close() { if (closed.compareAndSet(false, true)) { + tryDeleteOptions.close(); txOptions.close(); options.close(); db.close(); diff --git a/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java index e4c10c3ae3..00a632fde2 100644 --- a/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java +++ b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java @@ -73,13 +73,22 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT tx.put(barSegment, bytesOf(6), bytesOf(6)); tx.commit(); - final long removedFromFoo = - store.removeAllKeysUnless(fooSegment, x -> Arrays.equals(x, bytesOf(3))); - final long removedFromBar = - store.removeAllKeysUnless(barSegment, x -> Arrays.equals(x, bytesOf(4))); - - assertThat(removedFromFoo).isEqualTo(2); - assertThat(removedFromBar).isEqualTo(2); + store + .streamKeys(fooSegment) + .forEach( + key -> { + if (!Arrays.equals(key, bytesOf(3))) store.tryDelete(fooSegment, key); + }); + store + .streamKeys(barSegment) + .forEach( + key -> { + if (!Arrays.equals(key, bytesOf(4))) store.tryDelete(barSegment, key); + }); + + for (final ColumnFamilyHandle segment : Set.of(fooSegment, barSegment)) { + assertThat(store.streamKeys(segment).count()).isEqualTo(1); + } assertThat(store.get(fooSegment, bytesOf(1))).isEmpty(); assertThat(store.get(fooSegment, bytesOf(2))).isEmpty(); diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java index 2fcf52e1c1..e50c2574dd 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java @@ -29,7 +29,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; +import java.util.stream.Stream; +import com.google.common.collect.ImmutableSet; import org.apache.tuweni.bytes.Bytes; public class InMemoryKeyValueStorage implements KeyValueStorage { @@ -72,6 +74,22 @@ public class InMemoryKeyValueStorage implements KeyValueStorage { } } + @Override + public Set getAllKeysThat(final Predicate returnCondition) { + return streamKeys().filter(returnCondition).collect(toUnmodifiableSet()); + } + + @Override + public Stream streamKeys() { + final Lock lock = rwLock.readLock(); + lock.lock(); + try { + return ImmutableSet.copyOf(hashValueStore.keySet()).stream().map(Bytes::toArrayUnsafe); + } finally { + lock.unlock(); + } + } + @Override public long removeAllKeysUnless(final Predicate retainCondition) throws StorageException { final Lock lock = rwLock.writeLock(); @@ -86,17 +104,17 @@ public class InMemoryKeyValueStorage implements KeyValueStorage { } @Override - public Set getAllKeysThat(final Predicate returnCondition) { - final Lock lock = rwLock.readLock(); - lock.lock(); - try { - return hashValueStore.keySet().stream() - .map(Bytes::toArrayUnsafe) - .filter(returnCondition) - .collect(toUnmodifiableSet()); - } finally { - lock.unlock(); + public boolean tryDelete(final byte[] key) { + final Lock lock = rwLock.writeLock(); + if (lock.tryLock()) { + try { + hashValueStore.remove(Bytes.wrap(key)); + } finally { + lock.unlock(); + } + return true; } + return false; } @Override diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LimitedInMemoryKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LimitedInMemoryKeyValueStorage.java index a5ba48852c..8bfc78399c 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LimitedInMemoryKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LimitedInMemoryKeyValueStorage.java @@ -14,6 +14,8 @@ */ package org.hyperledger.besu.services.kvstore; +import static java.util.stream.Collectors.toUnmodifiableSet; + import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; @@ -27,10 +29,11 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; -import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableSet; import org.apache.tuweni.bytes.Bytes; /** @@ -76,19 +79,47 @@ public class LimitedInMemoryKeyValueStorage implements KeyValueStorage { } } + @Override + public Set getAllKeysThat(final Predicate returnCondition) { + return streamKeys().filter(returnCondition).collect(toUnmodifiableSet()); + } + + @Override + public Stream streamKeys() { + final Lock lock = rwLock.readLock(); + lock.lock(); + try { + return ImmutableSet.copyOf(storage.asMap().keySet()).stream().map(Bytes::toArrayUnsafe); + } finally { + lock.unlock(); + } + } + @Override public long removeAllKeysUnless(final Predicate retainCondition) throws StorageException { - final long initialSize = storage.size(); - storage.asMap().keySet().removeIf(key -> !retainCondition.test(key.toArrayUnsafe())); - return initialSize - storage.size(); + final Lock lock = rwLock.writeLock(); + lock.lock(); + try { + final long initialSize = storage.size(); + storage.asMap().keySet().removeIf(key -> !retainCondition.test(key.toArrayUnsafe())); + return initialSize - storage.size(); + } finally { + lock.unlock(); + } } @Override - public Set getAllKeysThat(final Predicate returnCondition) { - return storage.asMap().keySet().stream() - .map(Bytes::toArrayUnsafe) - .filter(returnCondition) - .collect(Collectors.toSet()); + public boolean tryDelete(final byte[] key) { + final Lock lock = rwLock.writeLock(); + if (lock.tryLock()) { + try { + storage.invalidate(Bytes.wrap(key)); + } finally { + lock.unlock(); + } + return true; + } + return false; } @Override diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java index d3389c295d..67e256d40c 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Stream; /** * Service provided by besu to facilitate persistent data storage. @@ -49,7 +50,37 @@ public interface SegmentedKeyValueStorage extends Closeable { */ Transaction startTransaction() throws StorageException; - long removeAllKeysUnless(S segmentHandle, Predicate inUseCheck); + /** + * Returns a stream of all keys for the segment. + * + * @param segmentHandle The segment handle whose keys we want to stream. + * @return A stream of all keys in the specified segment. + */ + Stream streamKeys(final S segmentHandle); + + /** + * Performs an evaluation against each key in the store, keeping the entries that pass, removing + * those that fail. + * + * @param segmentHandle The segment handle whose keys we want to stream. + * @param retainCondition predicate to evaluate each key against, unless the result is {@code + * null}, both the key and associated value must be removed. + * @return the number of keys removed. + */ + long removeAllKeysUnless(final S segmentHandle, Predicate retainCondition) + throws StorageException; + + /** + * Delete the value corresponding to the given key in the given segment if a write lock can be + * instantly acquired on the underlying storage. Do nothing otherwise. + * + * @param segmentHandle The segment handle whose keys we want to stream. + * @param key The key to delete. + * @throws StorageException any problem encountered during the deletion attempt. + * @return false if the lock on the underlying storage could not be instantly acquired, true + * otherwise + */ + boolean tryDelete(S segmentHandle, byte[] key) throws StorageException; Set getAllKeysThat(S segmentHandle, Predicate returnCondition); diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java index 5e7d94f1de..95facc9139 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java @@ -23,9 +23,9 @@ import java.io.IOException; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Stream; public class SegmentedKeyValueStorageAdapter implements KeyValueStorage { - private final S segmentHandle; private final SegmentedKeyValueStorage storage; @@ -50,14 +50,24 @@ public class SegmentedKeyValueStorageAdapter implements KeyValueStorage { return storage.get(segmentHandle, key); } + @Override + public Set getAllKeysThat(final Predicate returnCondition) { + return storage.getAllKeysThat(segmentHandle, returnCondition); + } + + @Override + public Stream streamKeys() { + return storage.streamKeys(segmentHandle); + } + @Override public long removeAllKeysUnless(final Predicate retainCondition) throws StorageException { return storage.removeAllKeysUnless(segmentHandle, retainCondition); } @Override - public Set getAllKeysThat(final Predicate returnCondition) { - return storage.getAllKeysThat(segmentHandle, returnCondition); + public boolean tryDelete(final byte[] key) { + return storage.tryDelete(segmentHandle, key); } @Override diff --git a/testutil/src/main/java/org/hyperledger/besu/kvstore/AbstractKeyValueStorageTest.java b/testutil/src/main/java/org/hyperledger/besu/kvstore/AbstractKeyValueStorageTest.java index 4b106fa55e..c2668156e7 100644 --- a/testutil/src/main/java/org/hyperledger/besu/kvstore/AbstractKeyValueStorageTest.java +++ b/testutil/src/main/java/org/hyperledger/besu/kvstore/AbstractKeyValueStorageTest.java @@ -14,6 +14,8 @@ */ package org.hyperledger.besu.kvstore; +import static java.util.stream.Collectors.toUnmodifiableList; +import static java.util.stream.Collectors.toUnmodifiableSet; import static org.assertj.core.api.Assertions.assertThat; import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; @@ -26,6 +28,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.function.Function; +import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; import org.junit.Ignore; @@ -71,19 +74,17 @@ public abstract class AbstractKeyValueStorageTest { } @Test - public void removeUnless() throws Exception { + public void streamKeys() throws Exception { final KeyValueStorage store = createStore(); final KeyValueStorageTransaction tx = store.startTransaction(); - tx.put(bytesFromHexString("0F"), bytesFromHexString("0ABC")); - tx.put(bytesFromHexString("10"), bytesFromHexString("0ABC")); - tx.put(bytesFromHexString("11"), bytesFromHexString("0ABC")); - tx.put(bytesFromHexString("12"), bytesFromHexString("0ABC")); + final List keys = + Stream.of("0F", "10", "11", "12") + .map(this::bytesFromHexString) + .collect(toUnmodifiableList()); + keys.forEach(key -> tx.put(key, bytesFromHexString("0ABC"))); tx.commit(); - store.removeAllKeysUnless(bv -> Bytes.wrap(bv).toString().contains("1")); - assertThat(store.containsKey(bytesFromHexString("0F"))).isFalse(); - assertThat(store.containsKey(bytesFromHexString("10"))).isTrue(); - assertThat(store.containsKey(bytesFromHexString("11"))).isTrue(); - assertThat(store.containsKey(bytesFromHexString("12"))).isTrue(); + assertThat(store.streamKeys().collect(toUnmodifiableSet())) + .containsExactlyInAnyOrder(keys.toArray(new byte[][] {})); } @Test