When downloading the world state, persist children before parents (#3202)

* Fast sync should traverse the world state depth first

1. The pending requests queue in the world state downloader is now a different data structure. We now use a list of priority queues.
2. The node requests now know about the parent request that was responsible for spawning them.
3. When a node has data available and is asked to persist before its children are persisted, the node will not do anything. Instead, it will wait for all its children to persist first and will persist together with the last child.

Storing children before parents gives us the following benefits:
* We don't need to store pending requests on disk any more.
* When restarting download from a new pivot, we do not need to walk and check the whole tree any more.

And the following drawbacks:
* We now have pending nodes in memory for which we already downloaded data, but we do not store them in the database yet.

Overall expectations on performance:
We still need to download every single state node at least once, so there is no improvement there. We will save a significant amount of time in case we change pivots. And we save lots of read/writes on filesystem because tasks are not needed to be written to disk any more.

We want to avoid having too many pending unsaved nodes in memory, not to run out of it. If we were always handling only one request to our peers at the same time, we would not need to be worried, and we would just use a simple depth first search. Because we batch our requests, we might produce too many pending unprocessed nodes in memory if we are not careful about the order of processing requests. That is where the priority on node request comes from. We want to always process nodes lower in the tree before nodes higher in the tree, and preferably we want to first process children from the same parent so that we can save the current unsaved parent as soon as possible.

At the moment, I still left in the code several artefacts that I use for debugging the behaviour. I am planning to get rid of most of these counters, feel free to point them out in the review. There is for instance a weird counter in the NodeDataRequest class that I am using to monitor the total amount of unsaved nodes. In case pending unsaved node count rises too high, there is a warning printed into the logs. At the moment of writing, I would expect the counter to stay below 10 000 generally and not rise above 20 000 nodes. If you saw the number rise to for instance 100 000 that would signify a bug.

Similarly, because of the order of processing of the nodes, we do not need to store huge number of requests on the disk any more and the whole list fits comfortably into the memory. Without batching, we would not have more than a thousand requests around waiting. Because of the batching, we can see the number of requests occasionally rises all the way up to 300 000, but usually should be under 200 000.

Note that at any time there should not be more pending unsaved blocks than pending requests. Such a situation would be a bug to be reported.

Signed-off-by: Jiri Peinlich <jiri.peinlich@gmail.com>

* Addressing review comments

Signed-off-by: Jiri Peinlich <jiri.peinlich@gmail.com>

* Fixed failing test

Signed-off-by: Jiri Peinlich <jiri.peinlich@gmail.com>

* Improving test coverage

Signed-off-by: Jiri Peinlich <jiri.peinlich@gmail.com>

* Addressing review comments

Signed-off-by: Jiri Peinlich <jiri.peinlich@gmail.com>
pull/3269/head
Jiri Peinlich 3 years ago committed by GitHub
parent 724248b1f9
commit 97a3494085
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      ethereum/eth/src/jmh/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java
  2. 37
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactory.java
  3. 16
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStep.java
  4. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldDownloadState.java
  5. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloader.java
  6. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStep.java
  7. 88
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/NodeDataRequest.java
  8. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java
  9. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/TrieNodeDataRequest.java
  10. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TaskQueueIterator.java
  11. 9
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldDownloadState.java
  12. 12
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java
  13. 14
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStepTest.java
  14. 7
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldDownloadStateTest.java
  15. 47
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloaderTest.java
  16. 6
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStepTest.java
  17. 83
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/NodeDataRequestTest.java
  18. 11
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStepTest.java
  19. 171
      services/tasks/src/main/java/org/hyperledger/besu/services/tasks/InMemoryTasksPriorityQueues.java
  20. 24
      services/tasks/src/main/java/org/hyperledger/besu/services/tasks/TasksPriorityProvider.java
  21. 175
      services/tasks/src/test/java/org/hyperledger/besu/services/tasks/InMemoryTasksPriorityQueuesTest.java

@ -47,8 +47,7 @@ import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBKeyValueStora
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBFactoryConfiguration;
import org.hyperledger.besu.services.BesuConfigurationImpl;
import org.hyperledger.besu.services.tasks.CachingTaskCollection;
import org.hyperledger.besu.services.tasks.FlatFileTaskCollection;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import java.nio.file.Path;
import java.time.Clock;
@ -80,7 +79,7 @@ public class WorldStateDownloaderBenchmark {
private WorldStateStorage worldStateStorage;
private RespondingEthPeer peer;
private Responder responder;
private CachingTaskCollection<NodeDataRequest> pendingRequests;
private InMemoryTasksPriorityQueues<NodeDataRequest> pendingRequests;
private StorageProvider storageProvider;
private EthProtocolManager ethProtocolManager;
@ -108,13 +107,7 @@ public class WorldStateDownloaderBenchmark {
createKeyValueStorageProvider(tempDir, tempDir.resolve("database"));
worldStateStorage = storageProvider.createWorldStateStorage(DataStorageFormat.FOREST);
pendingRequests =
new CachingTaskCollection<>(
new FlatFileTaskCollection<>(
tempDir.resolve("fastsync"),
NodeDataRequest::serialize,
NodeDataRequest::deserialize),
0);
pendingRequests = new InMemoryTasksPriorityQueues<>();
worldStateDownloader =
new FastWorldStateDownloader(
ethContext,

@ -29,8 +29,7 @@ import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.tasks.CachingTaskCollection;
import org.hyperledger.besu.services.tasks.FlatFileTaskCollection;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import java.io.File;
import java.nio.file.Path;
@ -84,11 +83,9 @@ public class FastDownloaderFactory {
if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) {
worldStateStorage.clear();
}
final CachingTaskCollection<NodeDataRequest> taskCollection =
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
createWorldStateDownloaderTaskCollection(
getStateQueueDirectory(dataDirectory),
metricsSystem,
syncConfig.getWorldStateTaskCacheSize());
metricsSystem, syncConfig.getWorldStateTaskCacheSize());
final WorldStateDownloader worldStateDownloader =
new FastWorldStateDownloader(
ethContext,
@ -119,33 +116,17 @@ public class FastDownloaderFactory {
return Optional.of(fastSyncDownloader);
}
private static Path getStateQueueDirectory(final Path dataDirectory) {
final Path queueDataDir = getFastSyncDataDirectory(dataDirectory).resolve("statequeue");
ensureDirectoryExists(queueDataDir.toFile());
return queueDataDir;
}
private static Path getFastSyncDataDirectory(final Path dataDirectory) {
final Path fastSyncDataDir = dataDirectory.resolve(FAST_SYNC_FOLDER);
ensureDirectoryExists(fastSyncDataDir.toFile());
return fastSyncDataDir;
}
private static void ensureDirectoryExists(final File dir) {
if (!dir.mkdirs() && !dir.isDirectory()) {
throw new IllegalStateException("Unable to create directory: " + dir.getAbsolutePath());
}
}
private static CachingTaskCollection<NodeDataRequest> createWorldStateDownloaderTaskCollection(
final Path dataDirectory,
final MetricsSystem metricsSystem,
final int worldStateTaskCacheSize) {
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(
new FlatFileTaskCollection<>(
dataDirectory, NodeDataRequest::serialize, NodeDataRequest::deserialize),
worldStateTaskCacheSize);
private static InMemoryTasksPriorityQueues<NodeDataRequest>
createWorldStateDownloaderTaskCollection(
final MetricsSystem metricsSystem, final int worldStateTaskCacheSize) {
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
new InMemoryTasksPriorityQueues<>();
metricsSystem.createLongGauge(
BesuMetricCategory.SYNCHRONIZER,
@ -157,7 +138,7 @@ public class FastDownloaderFactory {
BesuMetricCategory.SYNCHRONIZER,
"world_state_pending_requests_cache_size",
"Pending request cache size for fast sync world state download",
taskCollection::cacheSize);
() -> worldStateTaskCacheSize);
return taskCollection;
}

@ -62,7 +62,6 @@ public class CompleteTaskStep {
final WorldDownloadState<NodeDataRequest> downloadState,
final Task<NodeDataRequest> task) {
if (task.getData().getData() != null) {
enqueueChildren(task, header, downloadState);
completedRequestsCounter.inc();
task.markCompleted();
downloadState.checkCompletion(worldStateStorage, header);
@ -89,19 +88,4 @@ public class CompleteTaskStep {
long getPendingRequests() {
return worldStatePendingRequestsCurrentSupplier.getAsLong();
}
private void enqueueChildren(
final Task<NodeDataRequest> task,
final BlockHeader blockHeader,
final WorldDownloadState<NodeDataRequest> downloadState) {
final NodeDataRequest request = task.getData();
// Only queue rootnode children if we started from scratch
if (!downloadState.downloadWasResumed() || !isRootState(blockHeader, request)) {
downloadState.enqueueRequests(request.getChildRequests(worldStateStorage));
}
}
private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
}
}

@ -18,7 +18,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater;
import org.hyperledger.besu.services.tasks.CachingTaskCollection;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import java.time.Clock;
import java.util.Optional;
@ -31,7 +31,7 @@ public class FastWorldDownloadState extends WorldDownloadState<NodeDataRequest>
private static final Logger LOG = LogManager.getLogger();
public FastWorldDownloadState(
final CachingTaskCollection<NodeDataRequest> pendingRequests,
final InMemoryTasksPriorityQueues<NodeDataRequest> pendingRequests,
final int maxRequestsWithoutProgress,
final long minMillisBeforeStalling,
final Clock clock) {

@ -23,7 +23,7 @@ import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.tasks.CachingTaskCollection;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import java.time.Clock;
import java.util.Optional;
@ -44,7 +44,7 @@ public class FastWorldStateDownloader implements WorldStateDownloader {
private final MetricsSystem metricsSystem;
private final EthContext ethContext;
private final CachingTaskCollection<NodeDataRequest> taskCollection;
private final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection;
private final int hashCountPerRequest;
private final int maxOutstandingRequests;
private final int maxNodeRequestsWithoutProgress;
@ -57,7 +57,7 @@ public class FastWorldStateDownloader implements WorldStateDownloader {
public FastWorldStateDownloader(
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final CachingTaskCollection<NodeDataRequest> taskCollection,
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection,
final int hashCountPerRequest,
final int maxOutstandingRequests,
final int maxNodeRequestsWithoutProgress,

@ -49,6 +49,10 @@ public class LoadLocalDataStep {
existingNodeCounter.inc();
request.setData(existingData.get());
request.setRequiresPersisting(false);
final WorldStateStorage.Updater updater = worldStateStorage.updater();
request.persist(updater);
updater.commit();
completedTasks.put(task);
return Stream.empty();
}

@ -15,24 +15,35 @@
package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.logging.log4j.LogManager.getLogger;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloaderException;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import org.hyperledger.besu.ethereum.rlp.RLPOutput;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.services.tasks.TasksPriorityProvider;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
public abstract class NodeDataRequest {
public abstract class NodeDataRequest implements TasksPriorityProvider {
private static final Logger LOG = getLogger();
public static final int MAX_CHILDREN = 16;
private final RequestType requestType;
private final Hash hash;
private Bytes data;
private boolean requiresPersisting = true;
private final Optional<Bytes> location;
private Optional<NodeDataRequest> possibleParent = Optional.empty();
private final AtomicInteger pendingChildren = new AtomicInteger(0);
private int depth = 0;
private long priority;
protected NodeDataRequest(
final RequestType requestType, final Hash hash, final Optional<Bytes> location) {
@ -66,45 +77,6 @@ public abstract class NodeDataRequest {
return RLP.encode(request::writeTo);
}
public static NodeDataRequest deserialize(final Bytes encoded) {
final RLPInput in = RLP.input(encoded);
in.enterList();
final RequestType requestType = RequestType.fromValue(in.readByte());
final Hash hash = Hash.wrap(in.readBytes32());
final Optional<Hash> accountHash;
final Optional<Bytes> location;
try {
final NodeDataRequest deserialized;
switch (requestType) {
case ACCOUNT_TRIE_NODE:
location = Optional.of((!in.isEndOfCurrentList()) ? in.readBytes() : Bytes.EMPTY);
deserialized = createAccountDataRequest(hash, location);
break;
case STORAGE_TRIE_NODE:
accountHash =
Optional.ofNullable((!in.isEndOfCurrentList()) ? Hash.wrap(in.readBytes32()) : null);
location = Optional.ofNullable((!in.isEndOfCurrentList()) ? in.readBytes() : Bytes.EMPTY);
deserialized = createStorageDataRequest(hash, accountHash, location);
break;
case CODE:
accountHash =
Optional.ofNullable((!in.isEndOfCurrentList()) ? Hash.wrap(in.readBytes32()) : null);
deserialized = createCodeRequest(hash, accountHash);
break;
default:
throw new IllegalArgumentException(
"Unable to deserialize provided data into a valid "
+ NodeDataRequest.class.getSimpleName());
}
return deserialized;
} finally {
in.leaveList();
}
}
public RequestType getRequestType() {
return requestType;
}
@ -132,10 +104,25 @@ public abstract class NodeDataRequest {
}
public final void persist(final WorldStateStorage.Updater updater) {
if (pendingChildren.get() > 0) {
return; // we do nothing. Our last child will eventually persist us.
}
if (requiresPersisting) {
checkNotNull(getData(), "Must set data before node can be persisted.");
doPersist(updater);
}
possibleParent.ifPresentOrElse(
parent -> parent.saveParent(updater), () -> LOG.warn("Missing a parent for {}", this.hash));
}
private void saveParent(final WorldStateStorage.Updater updater) {
if (pendingChildren.decrementAndGet() == 0) {
persist(updater);
}
}
private int incrementChildren() {
return pendingChildren.incrementAndGet();
}
protected abstract void writeTo(final RLPOutput out);
@ -145,4 +132,23 @@ public abstract class NodeDataRequest {
public abstract Stream<NodeDataRequest> getChildRequests(WorldStateStorage worldStateStorage);
public abstract Optional<Bytes> getExistingData(final WorldStateStorage worldStateStorage);
protected void registerParent(final NodeDataRequest parent) {
if (this.possibleParent.isPresent()) {
throw new WorldStateDownloaderException("Cannot set parent twice");
}
this.possibleParent = Optional.of(parent);
this.depth = parent.depth + 1;
this.priority = parent.priority * MAX_CHILDREN + parent.incrementChildren();
}
@Override
public long getPriority() {
return priority;
}
@Override
public int getDepth() {
return depth;
}
}

@ -35,6 +35,11 @@ public class PersistDataStep {
final WorldDownloadState<NodeDataRequest> downloadState) {
final Updater updater = worldStateStorage.updater();
tasks.stream()
.map(
task -> {
enqueueChildren(task, downloadState);
return task;
})
.map(Task::getData)
.filter(request -> request.getData() != null)
.forEach(
@ -52,4 +57,10 @@ public class PersistDataStep {
private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
}
private void enqueueChildren(
final Task<NodeDataRequest> task, final WorldDownloadState<NodeDataRequest> downloadState) {
final NodeDataRequest request = task.getData();
downloadState.enqueueRequests(request.getChildRequests(worldStateStorage));
}
}

@ -56,7 +56,8 @@ abstract class TrieNodeDataRequest extends NodeDataRequest {
worldStateStorage, node.getLocation(), node.getPath(), value))
.orElseGet(Stream::empty);
}
});
})
.peek(request -> request.registerParent(this));
}
private boolean nodeIsHashReferencedDescendant(final Node<Bytes> node) {

@ -15,10 +15,12 @@
package org.hyperledger.besu.ethereum.eth.sync.worldstate;
import org.hyperledger.besu.services.tasks.Task;
import org.hyperledger.besu.services.tasks.TasksPriorityProvider;
import java.util.Iterator;
public class TaskQueueIterator<REQUEST> implements Iterator<Task<REQUEST>> {
public class TaskQueueIterator<REQUEST extends TasksPriorityProvider>
implements Iterator<Task<REQUEST>> {
private final WorldDownloadState<REQUEST> downloadState;

@ -18,8 +18,9 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import org.hyperledger.besu.services.tasks.Task;
import org.hyperledger.besu.services.tasks.TaskCollection;
import org.hyperledger.besu.services.tasks.TasksPriorityProvider;
import org.hyperledger.besu.util.ExceptionUtils;
import java.time.Clock;
@ -34,11 +35,11 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
public abstract class WorldDownloadState<REQUEST> {
public abstract class WorldDownloadState<REQUEST extends TasksPriorityProvider> {
private static final Logger LOG = LogManager.getLogger();
private boolean downloadWasResumed;
protected final TaskCollection<REQUEST> pendingRequests;
protected final InMemoryTasksPriorityQueues<REQUEST> pendingRequests;
protected final int maxRequestsWithoutProgress;
private final Clock clock;
@ -54,7 +55,7 @@ public abstract class WorldDownloadState<REQUEST> {
protected WorldStateDownloadProcess worldStateDownloadProcess;
public WorldDownloadState(
final TaskCollection<REQUEST> pendingRequests,
final InMemoryTasksPriorityQueues<REQUEST> pendingRequests,
final int maxRequestsWithoutProgress,
final long minMillisBeforeStalling,
final Clock clock) {

@ -32,9 +32,7 @@ import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.io.File;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.spi.FileSystemProvider;
import java.time.Clock;
import java.util.Optional;
@ -119,23 +117,15 @@ public class FastDownloaderFactoryTest {
verify(mutableBlockchain).getChainHeadBlockNumber();
}
private void initDataDirectory(final boolean isPivotBlockHeaderFileExist)
throws NoSuchFieldException {
private void initDataDirectory(final boolean isPivotBlockHeaderFileExist) {
final File pivotBlockHeaderFile = mock(File.class);
when(pivotBlockHeaderFile.isFile()).thenReturn(isPivotBlockHeaderFileExist);
when(pivotBlockHeaderFile.isDirectory()).thenReturn(true);
final File fastSyncDirFile = mock(File.class);
when(fastSyncDirFile.isDirectory()).thenReturn(true);
final Path storagePath = mock(Path.class);
final FileSystem fileSystem = mock(FileSystem.class);
when(storagePath.getFileSystem()).thenReturn(fileSystem);
when(fileSystem.provider()).thenReturn(mock(FileSystemProvider.class));
final Path pivotBlockHeaderPath = mock(Path.class);
when(pivotBlockHeaderPath.toFile()).thenReturn(pivotBlockHeaderFile);
when(pivotBlockHeaderPath.resolve(anyString())).thenReturn(storagePath);
final Path fastSyncDir = mock(Path.class);
when(fastSyncDir.resolve(any(String.class))).thenReturn(pivotBlockHeaderPath);

@ -27,11 +27,9 @@ import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class CompleteTaskStepTest {
@ -44,10 +42,6 @@ public class CompleteTaskStepTest {
private final CompleteTaskStep completeTaskStep =
new CompleteTaskStep(worldStateStorage, new NoOpMetricsSystem(), () -> 3);
@SuppressWarnings("unchecked")
private final ArgumentCaptor<Stream<NodeDataRequest>> streamCaptor =
ArgumentCaptor.forClass(Stream.class);
@Test
public void shouldMarkTaskAsFailedIfItDoesNotHaveData() {
final StubTask task =
@ -62,7 +56,7 @@ public class CompleteTaskStepTest {
}
@Test
public void shouldEnqueueChildrenAndMarkCompleteWhenTaskHasData() {
public void shouldMarkCompleteWhenTaskHasData() {
// Use an arbitrary but actually valid trie node to get children from.
final StubTask task = validTask();
completeTaskStep.markAsCompleteOrFailed(blockHeader, downloadState, task);
@ -70,12 +64,6 @@ public class CompleteTaskStepTest {
assertThat(task.isCompleted()).isTrue();
assertThat(task.isFailed()).isFalse();
verify(downloadState).enqueueRequests(streamCaptor.capture());
assertThat(streamCaptor.getValue())
.usingRecursiveFieldByFieldElementComparator()
.containsExactlyInAnyOrderElementsOf(
() -> task.getData().getChildRequests(worldStateStorage).iterator());
verify(downloadState).checkCompletion(worldStateStorage, blockHeader);
}

@ -31,8 +31,7 @@ import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.services.tasks.CachingTaskCollection;
import org.hyperledger.besu.services.tasks.InMemoryTaskQueue;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import org.hyperledger.besu.testutil.TestClock;
import java.util.Arrays;
@ -60,8 +59,8 @@ public class FastWorldDownloadStateTest {
private final BlockHeader header =
new BlockHeaderTestFixture().stateRoot(ROOT_NODE_HASH).buildHeader();
private final CachingTaskCollection<NodeDataRequest> pendingRequests =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
private final InMemoryTasksPriorityQueues<NodeDataRequest> pendingRequests =
new InMemoryTasksPriorityQueues<>();
private final WorldStateDownloadProcess worldStateDownloadProcess =
mock(WorldStateDownloadProcess.class);

@ -66,8 +66,7 @@ import org.hyperledger.besu.evm.account.AccountStorageEntry;
import org.hyperledger.besu.evm.worldstate.WorldState;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.services.tasks.CachingTaskCollection;
import org.hyperledger.besu.services.tasks.InMemoryTaskQueue;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import org.hyperledger.besu.testutil.MockExecutorService;
import org.hyperledger.besu.testutil.TestClock;
@ -172,8 +171,8 @@ public class FastWorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
new InMemoryTasksPriorityQueues<>();
final WorldStateStorage localStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
final WorldStateDownloader downloader =
@ -208,8 +207,8 @@ public class FastWorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
new InMemoryTasksPriorityQueues<>();
final WorldStateDownloader downloader =
createDownloader(
ethProtocolManager.ethContext(),
@ -255,8 +254,8 @@ public class FastWorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
new InMemoryTasksPriorityQueues<>();
final WorldStateStorage localStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
final WorldStateDownloader downloader =
@ -307,8 +306,8 @@ public class FastWorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
new InMemoryTasksPriorityQueues<>();
final WorldStateStorage localStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
@ -389,8 +388,8 @@ public class FastWorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final CachingTaskCollection<NodeDataRequest> taskCollection =
spy(new CachingTaskCollection<>(new InMemoryTaskQueue<>()));
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
new InMemoryTasksPriorityQueues<>();
final WorldStateStorage localStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
@ -464,8 +463,8 @@ public class FastWorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
new InMemoryTasksPriorityQueues<>();
final WorldStateStorage localStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
@ -547,8 +546,8 @@ public class FastWorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
new InMemoryTasksPriorityQueues<>();
final WorldStateStorage localStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
@ -644,8 +643,8 @@ public class FastWorldStateDownloaderTest {
final BlockHeader header =
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
new InMemoryTasksPriorityQueues<>();
final WorldStateStorage localStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
final SynchronizerConfiguration syncConfig =
@ -705,8 +704,8 @@ public class FastWorldStateDownloaderTest {
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();
// Add some nodes to the taskCollection
final CachingTaskCollection<NodeDataRequest> taskCollection =
spy(new CachingTaskCollection<>(new InMemoryTaskQueue<>()));
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
spy(new InMemoryTasksPriorityQueues<>());
List<Bytes32> queuedHashes = getFirstSetOfChildNodeRequests(remoteStorage, stateRoot);
assertThat(queuedHashes.size()).isGreaterThan(0); // Sanity check
for (Bytes32 bytes32 : queuedHashes) {
@ -851,8 +850,8 @@ public class FastWorldStateDownloaderTest {
.getHeader();
assertThat(otherStateRoot).isNotEqualTo(stateRoot); // Sanity check
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection =
new InMemoryTasksPriorityQueues<>();
final WorldStateStorage localStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
final WorldStateArchive localWorldStateArchive =
@ -985,7 +984,7 @@ public class FastWorldStateDownloaderTest {
private WorldStateDownloader createDownloader(
final EthContext context,
final WorldStateStorage storage,
final CachingTaskCollection<NodeDataRequest> taskCollection) {
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection) {
return createDownloader(
SynchronizerConfiguration.builder().build(), context, storage, taskCollection);
}
@ -994,7 +993,7 @@ public class FastWorldStateDownloaderTest {
final SynchronizerConfiguration config,
final EthContext context,
final WorldStateStorage storage,
final CachingTaskCollection<NodeDataRequest> taskCollection) {
final InMemoryTasksPriorityQueues<NodeDataRequest> taskCollection) {
return new FastWorldStateDownloader(
context,
storage,

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
@ -32,6 +33,7 @@ import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.junit.Test;
import org.mockito.Mockito;
public class LoadLocalDataStepTest {
@ -61,6 +63,7 @@ public class LoadLocalDataStepTest {
@Test
public void shouldReturnEmptyStreamAndSendTaskToCompletedPipeWhenDataIsPresent() {
when(worldStateStorage.getCode(HASH, Hash.EMPTY)).thenReturn(Optional.of(DATA));
when(worldStateStorage.updater()).thenReturn(updater);
final Stream<Task<NodeDataRequest>> output =
loadLocalDataStep.loadLocalData(task, completedTasks);
@ -69,6 +72,9 @@ public class LoadLocalDataStepTest {
assertThat(request.getData()).isEqualTo(DATA);
assertThat(output).isEmpty();
verify(updater).commit();
Mockito.reset(updater);
// Should not require persisting.
request.persist(updater);
verifyNoInteractions(updater);

@ -14,87 +14,24 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloaderException;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import org.junit.Test;
public class NodeDataRequestTest {
@Test
public void serializesAccountTrieNodeRequests() {
BlockDataGenerator gen = new BlockDataGenerator(0);
AccountTrieNodeDataRequest request =
NodeDataRequest.createAccountDataRequest(gen.hash(), Optional.of(Bytes.EMPTY));
NodeDataRequest sedeRequest = serializeThenDeserialize(request);
assertRequestsEquals(sedeRequest, request);
assertThat(sedeRequest).isInstanceOf(AccountTrieNodeDataRequest.class);
}
@Test
public void serializesAccountTrieNodeRequestsWithLocation() {
BlockDataGenerator gen = new BlockDataGenerator(0);
AccountTrieNodeDataRequest request =
NodeDataRequest.createAccountDataRequest(gen.hash(), Optional.of(Bytes.of(3)));
NodeDataRequest sedeRequest = serializeThenDeserialize(request);
assertRequestsEquals(sedeRequest, request);
assertThat(sedeRequest).isInstanceOf(AccountTrieNodeDataRequest.class);
}
@Test
public void serializesStorageTrieNodeRequests() {
BlockDataGenerator gen = new BlockDataGenerator(0);
StorageTrieNodeDataRequest request =
NodeDataRequest.createStorageDataRequest(
gen.hash(), Optional.of(Hash.EMPTY), Optional.of(Bytes.EMPTY));
NodeDataRequest sedeRequest = serializeThenDeserialize(request);
assertRequestsEquals(sedeRequest, request);
assertThat(sedeRequest).isInstanceOf(StorageTrieNodeDataRequest.class);
}
@Test
public void serializesStorageTrieNodeRequestsWithAccountHashAndLocation() {
BlockDataGenerator gen = new BlockDataGenerator(0);
StorageTrieNodeDataRequest request =
NodeDataRequest.createStorageDataRequest(
gen.hash(), Optional.of(Hash.ZERO), Optional.of(Bytes.of(3)));
NodeDataRequest sedeRequest = serializeThenDeserialize(request);
assertRequestsEquals(sedeRequest, request);
assertThat(sedeRequest).isInstanceOf(StorageTrieNodeDataRequest.class);
}
@Test
public void serializesCodeRequests() {
BlockDataGenerator gen = new BlockDataGenerator(0);
CodeNodeDataRequest request = NodeDataRequest.createCodeRequest(gen.hash(), Optional.empty());
NodeDataRequest sedeRequest = serializeThenDeserialize(request);
assertRequestsEquals(sedeRequest, request);
assertThat(sedeRequest).isInstanceOf(CodeNodeDataRequest.class);
}
@Test
public void serializesCodeRequestsWithAccountHash() {
BlockDataGenerator gen = new BlockDataGenerator(0);
CodeNodeDataRequest request =
NodeDataRequest.createCodeRequest(gen.hash(), Optional.of(Hash.ZERO));
NodeDataRequest sedeRequest = serializeThenDeserialize(request);
assertRequestsEquals(sedeRequest, request);
assertThat(sedeRequest).isInstanceOf(CodeNodeDataRequest.class);
}
private NodeDataRequest serializeThenDeserialize(final NodeDataRequest request) {
return NodeDataRequest.deserialize(NodeDataRequest.serialize(request));
}
private void assertRequestsEquals(final NodeDataRequest actual, final NodeDataRequest expected) {
assertThat(actual.getRequestType()).isEqualTo(expected.getRequestType());
assertThat(actual.getHash()).isEqualTo(expected.getHash());
assertThat(actual.getData()).isEqualTo(expected.getData());
assertThat(actual.getLocation()).isEqualTo(expected.getLocation());
public void cannotAssignParentTwice() {
NodeDataRequest parent =
new StorageTrieNodeDataRequest(null, Optional.empty(), Optional.empty());
NodeDataRequest child =
new StorageTrieNodeDataRequest(null, Optional.empty(), Optional.empty());
child.registerParent(parent);
assertThatThrownBy(() -> child.registerParent(parent))
.isInstanceOf(WorldStateDownloaderException.class);
}
}

@ -25,10 +25,13 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.StubTask;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.SimpleMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.services.tasks.Task;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
@ -48,7 +51,7 @@ public class PersistDataStepTest {
private final PersistDataStep persistDataStep = new PersistDataStep(worldStateStorage);
@Test
public void shouldPersistDataWhenPresent() {
public void shouldPersistDataWhenPresentWithoutChildren() {
final List<Task<NodeDataRequest>> tasks =
asList(
createTaskWithData(1, 2, 3),
@ -95,7 +98,11 @@ public class PersistDataStepTest {
private Task<NodeDataRequest> createTaskWithData(final Bytes data) {
final StubTask task = createTaskWithoutData(data);
task.getData().setData(data);
MerklePatriciaTrie<Bytes, String> trie =
new SimpleMerklePatriciaTrie<>(
value -> (value != null) ? Bytes.wrap(value.getBytes(StandardCharsets.UTF_8)) : null);
trie.put(data, "01");
task.getData().setData(trie.getValueWithProof(data).getProofRelatedNodes().get(0));
return task;
}

@ -0,0 +1,171 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.services.tasks;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.NotNull;
public class InMemoryTasksPriorityQueues<T extends TasksPriorityProvider>
implements TaskCollection<T> {
private final List<PriorityQueue<T>> internalQueues = new ArrayList<>(16);
private final Set<InMemoryTask<T>> unfinishedOutstandingTasks = new HashSet<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
public InMemoryTasksPriorityQueues() {
clearInternalQueues();
}
private void clearInternalQueues() {
internalQueues.clear();
for (int i = 0; i < 16; i++) {
internalQueues.add(newEmptyQueue());
}
}
@NotNull
private PriorityQueue<T> newEmptyQueue() {
return new PriorityQueue<>(Comparator.comparingLong(TasksPriorityProvider::getPriority));
}
@Override
public synchronized void add(final T taskData) {
assertNotClosed();
var dequeue = findQueue(taskData.getDepth());
dequeue.add(taskData);
}
private PriorityQueue<T> findQueue(final int priority) {
while (priority + 1 > internalQueues.size()) {
internalQueues.add(newEmptyQueue());
}
return internalQueues.get(priority);
}
@Override
public synchronized Task<T> remove() {
assertNotClosed();
final Queue<T> lastNonEmptyQueue = findLastNonEmptyQueue();
if (lastNonEmptyQueue.isEmpty()) {
return null;
}
T data = lastNonEmptyQueue.remove();
InMemoryTask<T> task = new InMemoryTask<>(this, data);
unfinishedOutstandingTasks.add(task);
return task;
}
private Queue<T> findLastNonEmptyQueue() {
for (int i = internalQueues.size() - 1; i > 0; i--) {
final Queue<T> queue = internalQueues.get(i);
if (!queue.isEmpty()) {
return queue;
}
}
return internalQueues.get(0);
}
@Override
public synchronized long size() {
return internalQueues.stream().mapToInt(Queue::size).sum();
}
@Override
public synchronized boolean isEmpty() {
return findLastNonEmptyQueue().isEmpty();
}
@Override
public synchronized void clear() {
assertNotClosed();
unfinishedOutstandingTasks.clear();
clearInternalQueues();
}
@Override
public synchronized boolean allTasksCompleted() {
return isEmpty() && unfinishedOutstandingTasks.isEmpty();
}
@Override
public synchronized void close() {
if (closed.compareAndSet(false, true)) {
internalQueues.clear();
unfinishedOutstandingTasks.clear();
}
}
private void assertNotClosed() {
if (closed.get()) {
throw new IllegalStateException("Attempt to access closed " + getClass().getSimpleName());
}
}
private synchronized void handleFailedTask(final InMemoryTask<T> task) {
if (markTaskCompleted(task)) {
add(task.getData());
}
}
private synchronized boolean markTaskCompleted(final InMemoryTask<T> task) {
return unfinishedOutstandingTasks.remove(task);
}
public synchronized boolean contains(final T request) {
final PriorityQueue<T> queue = findQueue(request.getDepth());
return queue.contains(request)
|| unfinishedOutstandingTasks.stream()
.map(InMemoryTask::getData)
.anyMatch(data -> data.equals(request));
}
private static class InMemoryTask<T extends TasksPriorityProvider> implements Task<T> {
private final T data;
private final InMemoryTasksPriorityQueues<T> queue;
private final AtomicBoolean completed = new AtomicBoolean(false);
public InMemoryTask(final InMemoryTasksPriorityQueues<T> queue, final T data) {
this.queue = queue;
this.data = data;
}
@Override
public T getData() {
return data;
}
@Override
public void markCompleted() {
if (completed.compareAndSet(false, true)) {
queue.markTaskCompleted(this);
}
}
@Override
public void markFailed() {
if (completed.compareAndSet(false, true)) {
queue.handleFailedTask(this);
}
}
}
}

@ -0,0 +1,24 @@
/*
*
* * Copyright Hyperledger Besu Contributors.
* *
* * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations under the License.
* *
* * SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.services.tasks;
public interface TasksPriorityProvider {
long getPriority();
int getDepth();
}

@ -0,0 +1,175 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.services.tasks;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.junit.Test;
public class InMemoryTasksPriorityQueuesTest {
@Test
public void shouldBePossibleToAddElementsAndRetrieveThemInPriorityOrder() {
InMemoryTasksPriorityQueues<Item> queue = new InMemoryTasksPriorityQueues<>();
queue.add(item(1, 1));
queue.add(item(2, 30));
queue.add(item(2, 10));
queue.add(item(5, 1));
queue.add(item(99, Integer.MAX_VALUE));
queue.add(item(1, 20));
List<Item> items = new ArrayList<>();
while (!queue.isEmpty()) {
items.add(queue.remove().getData());
}
assertThat(items)
.containsExactly(
item(99, Integer.MAX_VALUE),
item(5, 1),
item(2, 10),
item(2, 30),
item(1, 1),
item(1, 20));
}
@Test
public void shouldNotInsertItemsToClosedQueue() {
InMemoryTasksPriorityQueues<Item> queue = new InMemoryTasksPriorityQueues<>();
queue.add(item(1, 1));
queue.close();
final Item item = item(2, 2);
assertThatThrownBy(() -> queue.add(item)).isInstanceOf(IllegalStateException.class);
}
@Test
public void shouldContainTaskUntilFinished() {
InMemoryTasksPriorityQueues<Item> queue = new InMemoryTasksPriorityQueues<>();
queue.add(item(1, 1));
final Item item = item(2, 3);
queue.add(item);
assertThat(queue.contains(item)).isTrue();
final Task<Item> removed = queue.remove();
assertThat(removed.getData()).isEqualTo(item);
assertThat(queue.contains(item)).isTrue();
removed.markCompleted();
assertThat(queue.contains(item)).isFalse();
}
@Test
public void shouldPutFailedItemBackIntoQueue() {
InMemoryTasksPriorityQueues<Item> queue = new InMemoryTasksPriorityQueues<>();
queue.add(item(1, 1));
final Item item = item(2, 3);
queue.add(item);
assertThat(queue.contains(item)).isTrue();
Task<Item> removed = queue.remove();
assertThat(removed.getData()).isEqualTo(item);
assertThat(queue.contains(item)).isTrue();
removed.markFailed();
assertThat(queue.contains(item)).isTrue();
removed = queue.remove();
assertThat(removed.getData()).isEqualTo(item);
}
@Test
public void shouldNotPutFailedItemBackIntoIfItWasCompletedAlreadyQueue() {
InMemoryTasksPriorityQueues<Item> queue = new InMemoryTasksPriorityQueues<>();
queue.add(item(1, 1));
final Item item = item(2, 3);
queue.add(item);
assertThat(queue.contains(item)).isTrue();
Task<Item> removed = queue.remove();
assertThat(removed.getData()).isEqualTo(item);
assertThat(queue.contains(item)).isTrue();
removed.markCompleted();
assertThat(queue.contains(item)).isFalse();
removed.markFailed();
assertThat(queue.contains(item)).isFalse();
removed = queue.remove();
assertThat(removed.getData()).isNotEqualTo(item);
}
private Item item(final int depth, final int priority) {
return new Item(depth, priority);
}
static class Item implements TasksPriorityProvider {
private final int depth;
private final long priority;
public Item(final int depth, final long priority) {
this.depth = depth;
this.priority = priority;
}
@Override
public long getPriority() {
return priority;
}
@Override
public int getDepth() {
return depth;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final Item item = (Item) o;
return depth == item.depth && priority == item.priority;
}
@Override
public int hashCode() {
return Objects.hash(depth, priority);
}
@Override
public String toString() {
return "Item{" + "depth=" + depth + ", priority=" + priority + '}';
}
}
}
Loading…
Cancel
Save