[PAN-2422] Add in-memory cache over world state download queue (#1087)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
mbaxter 6 years ago committed by GitHub
parent 610440396a
commit fafd7e69ed
  1. 2
      ethereum/eth/build.gradle
  2. 18
      ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java
  3. 52
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java
  4. 14
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java
  5. 18
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java
  6. 36
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java
  7. 81
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java
  8. 2
      services/tasks/build.gradle
  9. 7
      services/tasks/src/jmh/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueBenchmark.java
  10. 146
      services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/CachingTaskCollection.java
  11. 10
      services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/InMemoryTaskQueue.java
  12. 10
      services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java
  13. 23
      services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/Task.java
  14. 30
      services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/TaskCollection.java
  15. 0
      services/tasks/src/main/resources/log4j2.xml
  16. 61
      services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/AbstractTaskQueueTest.java
  17. 210
      services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/CachingTaskCollectionTest.java
  18. 2
      services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/InMemoryTaskQueueTest.java
  19. 10
      services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java
  20. 1
      settings.gradle

@ -33,7 +33,7 @@ dependencies {
implementation project(':ethereum:permissioning')
implementation project(':metrics')
implementation project(':services:kvstore')
implementation project(':services:queue')
implementation project(':services:tasks')
implementation 'io.vertx:vertx-core'
implementation 'com.google.guava:guava'

@ -32,8 +32,8 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.queue.RocksDbTaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.RocksDbTaskQueue;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.nio.file.Path;
@ -61,7 +61,7 @@ public class WorldStateDownloaderBenchmark {
private WorldStateStorage worldStateStorage;
private RespondingEthPeer peer;
private Responder responder;
private TaskQueue<NodeDataRequest> pendingRequests;
private CachingTaskCollection<NodeDataRequest> pendingRequests;
private StorageProvider storageProvider;
private EthProtocolManager ethProtocolManager;
private InMemoryKeyValueStorage remoteKeyValueStorage;
@ -89,11 +89,13 @@ public class WorldStateDownloaderBenchmark {
worldStateStorage = storageProvider.createWorldStateStorage();
pendingRequests =
RocksDbTaskQueue.create(
tempDir.resolve("fastsync"),
NodeDataRequest::serialize,
NodeDataRequest::deserialize,
metricsSystem);
new CachingTaskCollection<>(
RocksDbTaskQueue.create(
tempDir.resolve("fastsync"),
NodeDataRequest::serialize,
NodeDataRequest::deserialize,
metricsSystem),
0);
worldStateDownloader =
new WorldStateDownloader(
ethContext,

@ -25,9 +25,10 @@ import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.queue.RocksDbTaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.RocksDbTaskQueue;
import java.io.File;
import java.io.IOException;
@ -45,19 +46,19 @@ class FastSynchronizer<C> {
private final FastSyncDownloader<C> fastSyncDownloader;
private final Path fastSyncDataDirectory;
private final TaskQueue<NodeDataRequest> stateQueue;
private final CachingTaskCollection<NodeDataRequest> taskCollection;
private final WorldStateDownloader worldStateDownloader;
private final FastSyncState initialSyncState;
private FastSynchronizer(
final FastSyncDownloader<C> fastSyncDownloader,
final Path fastSyncDataDirectory,
final TaskQueue<NodeDataRequest> stateQueue,
final CachingTaskCollection<NodeDataRequest> taskCollection,
final WorldStateDownloader worldStateDownloader,
final FastSyncState initialSyncState) {
this.fastSyncDownloader = fastSyncDownloader;
this.fastSyncDataDirectory = fastSyncDataDirectory;
this.stateQueue = stateQueue;
this.taskCollection = taskCollection;
this.worldStateDownloader = worldStateDownloader;
this.initialSyncState = initialSyncState;
}
@ -88,13 +89,14 @@ class FastSynchronizer<C> {
return Optional.empty();
}
final TaskQueue<NodeDataRequest> stateQueue =
createWorldStateDownloaderQueue(getStateQueueDirectory(dataDirectory), metricsSystem);
final CachingTaskCollection<NodeDataRequest> taskCollection =
createWorldStateDownloaderTaskCollection(
getStateQueueDirectory(dataDirectory), metricsSystem);
final WorldStateDownloader worldStateDownloader =
new WorldStateDownloader(
ethContext,
worldStateStorage,
stateQueue,
taskCollection,
syncConfig.getWorldStateHashCountPerRequest(),
syncConfig.getWorldStateRequestParallelism(),
syncConfig.getWorldStateMaxRequestsWithoutProgress(),
@ -114,7 +116,7 @@ class FastSynchronizer<C> {
new FastSynchronizer<>(
fastSyncDownloader,
fastSyncDataDirectory,
stateQueue,
taskCollection,
worldStateDownloader,
fastSyncState));
}
@ -128,7 +130,7 @@ class FastSynchronizer<C> {
// Make sure downloader is stopped before we start cleaning up its dependencies
worldStateDownloader.cancel();
try {
stateQueue.close();
taskCollection.close();
if (fastSyncDataDirectory.toFile().exists()) {
// Clean up this data for now (until fast sync resume functionality is in place)
MoreFiles.deleteRecursively(fastSyncDataDirectory, RecursiveDeleteOption.ALLOW_INSECURE);
@ -156,9 +158,33 @@ class FastSynchronizer<C> {
}
}
private static TaskQueue<NodeDataRequest> createWorldStateDownloaderQueue(
private static CachingTaskCollection<NodeDataRequest> createWorldStateDownloaderTaskCollection(
final Path dataDirectory, final MetricsSystem metricsSystem) {
return RocksDbTaskQueue.create(
dataDirectory, NodeDataRequest::serialize, NodeDataRequest::deserialize, metricsSystem);
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(
RocksDbTaskQueue.create(
dataDirectory,
NodeDataRequest::serialize,
NodeDataRequest::deserialize,
metricsSystem));
metricsSystem.createLongGauge(
MetricCategory.SYNCHRONIZER,
"world_state_pending_requests_current",
"Number of pending requests for fast sync world state download",
taskCollection::size);
metricsSystem.createIntegerGauge(
MetricCategory.SYNCHRONIZER,
"world_state_pending_requests_cache_size",
"Pending request cache size for fast sync world state download",
taskCollection::cacheSize);
// We're using the CachingTaskCollection which isn't designed to reliably persist all
// added tasks. We therefore can't resume from previously added tasks.
// So for now, clear tasks when we start up.
taskCollection.clear();
return taskCollection;
}
}

@ -16,8 +16,8 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.Task;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -38,7 +38,7 @@ class WorldDownloadState {
private static final Logger LOG = LogManager.getLogger();
private final boolean downloadWasResumed;
private final TaskQueue<NodeDataRequest> pendingRequests;
private final CachingTaskCollection<NodeDataRequest> pendingRequests;
private final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist;
private final int maxOutstandingRequests;
private final int maxRequestsWithoutProgress;
@ -54,7 +54,7 @@ class WorldDownloadState {
private EthTask<?> persistenceTask;
public WorldDownloadState(
final TaskQueue<NodeDataRequest> pendingRequests,
final CachingTaskCollection<NodeDataRequest> pendingRequests,
final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist,
final int maxOutstandingRequests,
final int maxRequestsWithoutProgress) {
@ -151,13 +151,13 @@ class WorldDownloadState {
public synchronized void enqueueRequest(final NodeDataRequest request) {
if (!internalFuture.isDone()) {
pendingRequests.enqueue(request);
pendingRequests.add(request);
}
}
public synchronized void enqueueRequests(final Collection<NodeDataRequest> requests) {
if (!internalFuture.isDone()) {
requests.forEach(pendingRequests::enqueue);
requests.forEach(pendingRequests::add);
}
}
@ -165,7 +165,7 @@ class WorldDownloadState {
if (internalFuture.isDone()) {
return null;
}
return pendingRequests.dequeue();
return pendingRequests.remove();
}
public synchronized void setRootNodeData(final BytesValue rootNodeData) {

@ -27,8 +27,8 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.Task;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -61,7 +61,7 @@ public class WorldStateDownloader {
private final MetricsSystem metricsSystem;
private final EthContext ethContext;
private final TaskQueue<NodeDataRequest> taskQueue;
private final CachingTaskCollection<NodeDataRequest> taskCollection;
private final int hashCountPerRequest;
private final int maxOutstandingRequests;
private final int maxNodeRequestsWithoutProgress;
@ -72,25 +72,19 @@ public class WorldStateDownloader {
public WorldStateDownloader(
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final TaskQueue<NodeDataRequest> taskQueue,
final CachingTaskCollection<NodeDataRequest> taskCollection,
final int hashCountPerRequest,
final int maxOutstandingRequests,
final int maxNodeRequestsWithoutProgress,
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.worldStateStorage = worldStateStorage;
this.taskQueue = taskQueue;
this.taskCollection = taskCollection;
this.hashCountPerRequest = hashCountPerRequest;
this.maxOutstandingRequests = maxOutstandingRequests;
this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress;
this.metricsSystem = metricsSystem;
metricsSystem.createLongGauge(
MetricCategory.SYNCHRONIZER,
"world_state_pending_requests_current",
"Number of pending requests for fast sync world state download",
taskQueue::size);
completedRequestsCounter =
metricsSystem.createCounter(
MetricCategory.SYNCHRONIZER,
@ -159,7 +153,7 @@ public class WorldStateDownloader {
final int persistenceQueueCapacity = hashCountPerRequest * maxOutstandingRequests * 2;
final WorldDownloadState newDownloadState =
new WorldDownloadState(
taskQueue,
taskCollection,
new ArrayBlockingQueue<>(persistenceQueueCapacity),
maxOutstandingRequests,
maxNodeRequestsWithoutProgress);

@ -28,8 +28,9 @@ import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.queue.InMemoryTaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.InMemoryTaskQueue;
import tech.pegasys.pantheon.services.tasks.Task;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Arrays;
@ -51,7 +52,8 @@ public class WorldDownloadStateTest {
private final BlockHeader header =
new BlockHeaderTestFixture().stateRoot(ROOT_NODE_HASH).buildHeader();
private final InMemoryTaskQueue<NodeDataRequest> pendingRequests = new InMemoryTaskQueue<>();
private final CachingTaskCollection<NodeDataRequest> pendingRequests =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
private final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist =
new ArrayBlockingQueue<>(100);
@ -94,7 +96,7 @@ public class WorldDownloadStateTest {
@Test
public void shouldNotCompleteWhenThereArePendingTasks() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
downloadState.checkCompletion(worldStateStorage, header);
@ -115,8 +117,8 @@ public class WorldDownloadStateTest {
downloadState.addOutstandingTask(outstandingTask1);
downloadState.addOutstandingTask(outstandingTask2);
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY));
requestsToPersist.add(toPersist1);
requestsToPersist.add(toPersist2);
@ -133,7 +135,7 @@ public class WorldDownloadStateTest {
@Test
public void shouldNotSendAdditionalRequestsWhenWaitingForANewPeer() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
downloadState.setWaitingForNewPeer(true);
downloadState.whileAdditionalRequestsCanBeSent(mustNotBeCalled());
@ -141,7 +143,7 @@ public class WorldDownloadStateTest {
@Test
public void shouldResumeSendingAdditionalRequestsWhenNoLongerWaitingForPeer() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest =
mockWithAction(() -> downloadState.addOutstandingTask(mock(EthTask.class)));
@ -155,10 +157,10 @@ public class WorldDownloadStateTest {
@Test
public void shouldStopSendingAdditionalRequestsWhenPendingRequestsIsEmpty() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest = mockWithAction(pendingRequests::dequeue);
final Runnable sendRequest = mockWithAction(pendingRequests::remove);
downloadState.whileAdditionalRequestsCanBeSent(sendRequest);
verify(sendRequest, times(2)).run();
@ -166,7 +168,7 @@ public class WorldDownloadStateTest {
@Test
public void shouldStopSendingAdditionalRequestsWhenMaximumOutstandingRequestCountReached() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest =
mockWithAction(() -> downloadState.addOutstandingTask(mock(EthTask.class)));
@ -176,7 +178,7 @@ public class WorldDownloadStateTest {
@Test
public void shouldStopSendingAdditionalRequestsWhenFutureIsCancelled() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest = mockWithAction(() -> future.cancel(true));
downloadState.whileAdditionalRequestsCanBeSent(sendRequest);
@ -185,7 +187,7 @@ public class WorldDownloadStateTest {
@Test
public void shouldStopSendingAdditionalRequestsWhenDownloadIsMarkedAsStalled() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest = mockWithAction(() -> downloadState.requestComplete(false));
downloadState.whileAdditionalRequestsCanBeSent(sendRequest);
@ -210,7 +212,7 @@ public class WorldDownloadStateTest {
@Test
public void shouldNotAllowMultipleCallsToSendAdditionalRequestsAtOnce() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest =
mockWithAction(
() -> {
@ -223,7 +225,7 @@ public class WorldDownloadStateTest {
}
@Test
public void shouldNotEnqueueRequestsAfterDownloadIsStalled() {
public void shouldNotAddRequestsAfterDownloadIsStalled() {
downloadState.checkCompletion(worldStateStorage, header);
downloadState.enqueueRequests(Arrays.asList(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)));
@ -235,7 +237,7 @@ public class WorldDownloadStateTest {
@Test // Sanity check for the test structure
public void shouldFailWhenMustNotBeCalledIsCalled() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
assertThatThrownBy(() -> downloadState.whileAdditionalRequestsCanBeSent(mustNotBeCalled()))
.hasMessage("Unexpected invocation");
}

@ -58,8 +58,8 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.queue.InMemoryTaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.InMemoryTaskQueue;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
@ -152,11 +152,12 @@ public class WorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final WorldStateDownloader downloader =
createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection);
final CompletableFuture<Void> future = downloader.run(header);
assertThat(future).isDone();
@ -192,9 +193,10 @@ public class WorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final WorldStateDownloader downloader =
createDownloader(ethProtocolManager.ethContext(), storage, queue);
createDownloader(ethProtocolManager.ethContext(), storage, taskCollection);
final CompletableFuture<Void> future = downloader.run(header);
assertThat(future).isDone();
@ -234,11 +236,12 @@ public class WorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final WorldStateDownloader downloader =
createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection);
final CompletableFuture<Void> result = downloader.run(header);
@ -292,7 +295,8 @@ public class WorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
@ -304,7 +308,7 @@ public class WorldStateDownloaderTest {
localStorageUpdater.commit();
final WorldStateDownloader downloader =
createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection);
final CompletableFuture<Void> result = downloader.run(header);
@ -377,12 +381,13 @@ public class WorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final TaskQueue<NodeDataRequest> queue = spy(new InMemoryTaskQueue<>());
final CachingTaskCollection<NodeDataRequest> taskCollection =
spy(new CachingTaskCollection<>(new InMemoryTaskQueue<>()));
final WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final WorldStateDownloader downloader =
createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection);
final CompletableFuture<Void> result = downloader.run(header);
@ -398,8 +403,8 @@ public class WorldStateDownloaderTest {
}
assertThat(result.isDone()).isFalse(); // Sanity check
// Reset queue so we can track interactions after the cancellation
reset(queue);
// Reset taskCollection so we can track interactions after the cancellation
reset(taskCollection);
if (shouldCancelFuture) {
result.cancel(true);
} else {
@ -418,9 +423,9 @@ public class WorldStateDownloaderTest {
// Now allow the persistence service to run which should exit immediately
serviceExecutor.runPendingFutures();
verify(queue, times(1)).clear();
verify(queue, never()).dequeue();
verify(queue, never()).enqueue(any());
verify(taskCollection, times(1)).clear();
verify(taskCollection, never()).remove();
verify(taskCollection, never()).add(any());
// Target world state should not be available
assertThat(localStorage.isWorldStateAvailable(header.getStateRoot())).isFalse();
}
@ -450,7 +455,8 @@ public class WorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
@ -475,7 +481,7 @@ public class WorldStateDownloaderTest {
localStorageUpdater.commit();
final WorldStateDownloader downloader =
createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection);
final CompletableFuture<Void> result = downloader.run(header);
@ -536,7 +542,8 @@ public class WorldStateDownloaderTest {
.limit(5)
.collect(Collectors.toList());
final TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
@ -576,7 +583,7 @@ public class WorldStateDownloaderTest {
localStorageUpdater.commit();
final WorldStateDownloader downloader =
createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection);
final CompletableFuture<Void> result = downloader.run(header);
@ -632,13 +639,14 @@ public class WorldStateDownloaderTest {
final BlockHeader header =
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();
final TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder().worldStateMaxRequestsWithoutProgress(10).build();
final WorldStateDownloader downloader =
createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue);
createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, taskCollection);
// Create a peer that can respond
final RespondingEthPeer peer =
@ -687,17 +695,18 @@ public class WorldStateDownloaderTest {
final BlockHeader header =
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();
// Add some nodes to the queue
final TaskQueue<NodeDataRequest> queue = spy(new InMemoryTaskQueue<>());
// Add some nodes to the taskCollection
final CachingTaskCollection<NodeDataRequest> taskCollection =
spy(new CachingTaskCollection<>(new InMemoryTaskQueue<>()));
List<Bytes32> queuedHashes = getFirstSetOfChildNodeRequests(remoteStorage, stateRoot);
assertThat(queuedHashes.size()).isGreaterThan(0); // Sanity check
for (Bytes32 bytes32 : queuedHashes) {
queue.enqueue(new AccountTrieNodeDataRequest(Hash.wrap(bytes32)));
taskCollection.add(new AccountTrieNodeDataRequest(Hash.wrap(bytes32)));
}
// Sanity check
for (Bytes32 bytes32 : queuedHashes) {
final Hash hash = Hash.wrap(bytes32);
verify(queue, times(1)).enqueue(argThat((r) -> r.getHash().equals(hash)));
verify(taskCollection, times(1)).add(argThat((r) -> r.getHash().equals(hash)));
}
final WorldStateStorage localStorage =
@ -705,7 +714,7 @@ public class WorldStateDownloaderTest {
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder().worldStateMaxRequestsWithoutProgress(10).build();
final WorldStateDownloader downloader =
createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue);
createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, taskCollection);
// Create a peer that can respond
final RespondingEthPeer peer =
@ -738,7 +747,7 @@ public class WorldStateDownloaderTest {
// Check that already enqueued requests were not enqueued more than once
for (Bytes32 bytes32 : queuedHashes) {
final Hash hash = Hash.wrap(bytes32);
verify(queue, times(1)).enqueue(argThat((r) -> r.getHash().equals(hash)));
verify(taskCollection, times(1)).add(argThat((r) -> r.getHash().equals(hash)));
}
// Check that all expected account data was downloaded
@ -837,7 +846,8 @@ public class WorldStateDownloaderTest {
.getHeader();
assertThat(otherStateRoot).isNotEqualTo(stateRoot); // Sanity check
final TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>(new InMemoryTaskQueue<>());
final WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
@ -847,7 +857,7 @@ public class WorldStateDownloaderTest {
.worldStateRequestParallelism(maxOutstandingRequests)
.build();
final WorldStateDownloader downloader =
createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue);
createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, taskCollection);
// Create some peers that can respond
final List<RespondingEthPeer> usefulPeers =
@ -977,19 +987,20 @@ public class WorldStateDownloaderTest {
private WorldStateDownloader createDownloader(
final EthContext context,
final WorldStateStorage storage,
final TaskQueue<NodeDataRequest> queue) {
return createDownloader(SynchronizerConfiguration.builder().build(), context, storage, queue);
final CachingTaskCollection<NodeDataRequest> taskCollection) {
return createDownloader(
SynchronizerConfiguration.builder().build(), context, storage, taskCollection);
}
private WorldStateDownloader createDownloader(
final SynchronizerConfiguration config,
final EthContext context,
final WorldStateStorage storage,
final TaskQueue<NodeDataRequest> queue) {
final CachingTaskCollection<NodeDataRequest> taskCollection) {
return new WorldStateDownloader(
context,
storage,
queue,
taskCollection,
config.getWorldStateHashCountPerRequest(),
config.getWorldStateRequestParallelism(),
config.getWorldStateMaxRequestsWithoutProgress(),

@ -14,7 +14,7 @@
apply plugin: 'java-library'
jar {
baseName 'pantheon-queue'
baseName 'pantheon-tasks'
manifest {
attributes(
'Specification-Title': baseName,

@ -10,10 +10,9 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;
package tech.pegasys.pantheon.services.tasks;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
@ -44,7 +43,7 @@ public class RocksDbTaskQueueBenchmark {
RocksDbTaskQueue.create(
tempDir.toPath(), Function.identity(), Function.identity(), new NoOpMetricsSystem());
for (int i = 0; i < 1_000_000; i++) {
queue.enqueue(UInt256.of(i).getBytes());
queue.add(UInt256.of(i).getBytes());
}
}
@ -56,6 +55,6 @@ public class RocksDbTaskQueueBenchmark {
@Benchmark
public Task<BytesValue> dequeue() {
return queue.dequeue();
return queue.remove();
}
}

@ -0,0 +1,146 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.tasks;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
public class CachingTaskCollection<T> implements TaskCollection<T> {
private static final int DEFAULT_CACHE_SIZE = 1_000_000;
private final int maxCacheSize;
// The underlying collection
private final TaskCollection<T> wrappedCollection;
/**
* A cache of tasks to operate on before going to {@link CachingTaskCollection#wrappedCollection}
*/
private final Queue<Task<T>> cache = new ArrayDeque<>();
// Tasks that have been removed, but not marked completed yet
private final Set<Task<T>> outstandingTasks = new HashSet<>();
private boolean closed = false;
public CachingTaskCollection(final TaskCollection<T> collection, final int maxCacheSize) {
this.wrappedCollection = collection;
this.maxCacheSize = maxCacheSize;
}
public CachingTaskCollection(final TaskCollection<T> collection) {
this(collection, DEFAULT_CACHE_SIZE);
}
@Override
public synchronized void add(final T taskData) {
assertNotClosed();
if (cacheSize() >= maxCacheSize) {
// Too many tasks in the cache, push this to the underlying collection
wrappedCollection.add(taskData);
return;
}
Task<T> newTask = new CachedTask<>(this, taskData);
cache.add(newTask);
}
@Override
public synchronized Task<T> remove() {
assertNotClosed();
if (cache.size() == 0) {
return wrappedCollection.remove();
}
final Task<T> pendingTask = cache.remove();
outstandingTasks.add(pendingTask);
return pendingTask;
}
@Override
public synchronized void clear() {
assertNotClosed();
wrappedCollection.clear();
outstandingTasks.clear();
cache.clear();
}
@Override
public synchronized long size() {
return wrappedCollection.size() + cache.size();
}
public synchronized int cacheSize() {
return outstandingTasks.size() + cache.size();
}
@Override
public synchronized boolean isEmpty() {
return size() == 0;
}
/** @return True if all tasks have been removed and processed. */
@Override
public synchronized boolean allTasksCompleted() {
return cacheSize() == 0 && wrappedCollection.allTasksCompleted();
}
private synchronized boolean completePendingTask(final CachedTask<T> cachedTask) {
return outstandingTasks.remove(cachedTask);
}
private synchronized void failPendingTask(final CachedTask<T> cachedTask) {
if (completePendingTask(cachedTask)) {
cache.add(cachedTask);
}
}
@Override
public synchronized void close() throws IOException {
outstandingTasks.clear();
cache.clear();
wrappedCollection.close();
closed = true;
}
private void assertNotClosed() {
if (closed) {
throw new IllegalStateException("Attempt to access closed " + getClass().getSimpleName());
}
}
private static class CachedTask<T> implements Task<T> {
private final CachingTaskCollection<T> cachingTaskCollection;
private final T data;
private CachedTask(final CachingTaskCollection<T> cachingTaskCollection, final T data) {
this.cachingTaskCollection = cachingTaskCollection;
this.data = data;
}
@Override
public T getData() {
return data;
}
@Override
public void markCompleted() {
cachingTaskCollection.completePendingTask(this);
}
@Override
public void markFailed() {
cachingTaskCollection.failPendingTask(this);
}
}
}

@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;
package tech.pegasys.pantheon.services.tasks;
import java.util.ArrayDeque;
import java.util.HashSet;
@ -18,19 +18,19 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
public class InMemoryTaskQueue<T> implements TaskQueue<T> {
public class InMemoryTaskQueue<T> implements TaskCollection<T> {
private final Queue<T> internalQueue = new ArrayDeque<>();
private final Set<InMemoryTask<T>> unfinishedOutstandingTasks = new HashSet<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
@Override
public synchronized void enqueue(final T taskData) {
public synchronized void add(final T taskData) {
assertNotClosed();
internalQueue.add(taskData);
}
@Override
public synchronized Task<T> dequeue() {
public synchronized Task<T> remove() {
assertNotClosed();
T data = internalQueue.poll();
if (data == null) {
@ -81,7 +81,7 @@ public class InMemoryTaskQueue<T> implements TaskQueue<T> {
private synchronized void handleFailedTask(final InMemoryTask<T> task) {
if (markTaskCompleted(task)) {
enqueue(task.getData());
add(task.getData());
}
}

@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;
package tech.pegasys.pantheon.services.tasks;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
@ -31,7 +31,7 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
public class RocksDbTaskQueue<T> implements TaskQueue<T> {
public class RocksDbTaskQueue<T> implements TaskCollection<T> {
private final Options options;
private final RocksDB db;
@ -106,7 +106,7 @@ public class RocksDbTaskQueue<T> implements TaskQueue<T> {
}
@Override
public synchronized void enqueue(final T taskData) {
public synchronized void add(final T taskData) {
assertNotClosed();
try (final OperationTimer.TimingContext ignored = enqueueLatency.startTimer()) {
final long key = ++lastEnqueuedKey;
@ -117,7 +117,7 @@ public class RocksDbTaskQueue<T> implements TaskQueue<T> {
}
@Override
public synchronized Task<T> dequeue() {
public synchronized Task<T> remove() {
assertNotClosed();
if (isEmpty()) {
return null;
@ -237,7 +237,7 @@ public class RocksDbTaskQueue<T> implements TaskQueue<T> {
private synchronized void handleFailedTask(final RocksDbTask<T> task) {
if (markTaskCompleted(task)) {
enqueue(task.getData());
add(task.getData());
}
}

@ -0,0 +1,23 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.tasks;
public interface Task<T> {
T getData();
/** Mark this task as completed. */
void markCompleted();
/** Mark this task as failed and requeue. */
void markFailed();
}

@ -1,5 +1,5 @@
/*
* Copyright 2018 ConsenSys AG.
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@ -10,31 +10,25 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;
package tech.pegasys.pantheon.services.tasks;
import java.io.Closeable;
/**
* Represents a very large thread-safe task queue that may exceed memory limits.
*
* @param <T> the type of data held in the queue
*/
public interface TaskQueue<T> extends Closeable {
public interface TaskCollection<T> extends Closeable {
/**
* Enqueue some data for processing.
* Add some data that needs to be processed.
*
* @param taskData The data to be processed.
*/
void enqueue(T taskData);
void add(T taskData);
/**
* Dequeue a task for processing. This task will be tracked as a pending task until either {@code
* Get a task for processing. This task will be tracked as a pending task until either {@code
* Task.markCompleted} or {@code Task.requeue} is called.
*
* @return The task to be processed.
*/
Task<T> dequeue();
Task<T> remove();
/** @return The number of tasks in the queue. */
long size();
@ -47,14 +41,4 @@ public interface TaskQueue<T> extends Closeable {
/** @return True if all tasks have been dequeued and processed. */
boolean allTasksCompleted();
interface Task<T> {
T getData();
/** Mark this task as completed. */
void markCompleted();
/** Mark this task as failed and requeue. */
void markFailed();
}
}

@ -10,11 +10,10 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;
package tech.pegasys.pantheon.services.tasks;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.ArrayList;
@ -25,7 +24,7 @@ import java.util.function.Function;
import org.junit.Test;
abstract class AbstractTaskQueueTest<T extends TaskQueue<BytesValue>> {
abstract class AbstractTaskQueueTest<T extends TaskCollection<BytesValue>> {
protected abstract T createQueue() throws Exception;
@ -36,20 +35,20 @@ abstract class AbstractTaskQueueTest<T extends TaskQueue<BytesValue>> {
BytesValue two = BytesValue.of(2);
BytesValue three = BytesValue.of(3);
assertThat(queue.dequeue()).isNull();
assertThat(queue.remove()).isNull();
queue.enqueue(one);
queue.enqueue(two);
assertThat(queue.dequeue().getData()).isEqualTo(one);
queue.add(one);
queue.add(two);
assertThat(queue.remove().getData()).isEqualTo(one);
queue.enqueue(three);
assertThat(queue.dequeue().getData()).isEqualTo(two);
assertThat(queue.dequeue().getData()).isEqualTo(three);
assertThat(queue.dequeue()).isNull();
assertThat(queue.dequeue()).isNull();
queue.add(three);
assertThat(queue.remove().getData()).isEqualTo(two);
assertThat(queue.remove().getData()).isEqualTo(three);
assertThat(queue.remove()).isNull();
assertThat(queue.remove()).isNull();
queue.enqueue(three);
assertThat(queue.dequeue().getData()).isEqualTo(three);
queue.add(three);
assertThat(queue.remove().getData()).isEqualTo(three);
}
}
@ -61,12 +60,12 @@ abstract class AbstractTaskQueueTest<T extends TaskQueue<BytesValue>> {
assertThat(queue.isEmpty()).isTrue();
assertThat(queue.allTasksCompleted()).isTrue();
queue.enqueue(value);
queue.add(value);
assertThat(queue.isEmpty()).isFalse();
assertThat(queue.allTasksCompleted()).isFalse();
Task<BytesValue> task = queue.dequeue();
Task<BytesValue> task = queue.remove();
assertThat(task).isNotNull();
assertThat(task.getData()).isEqualTo(value);
assertThat(queue.isEmpty()).isTrue();
@ -91,12 +90,12 @@ abstract class AbstractTaskQueueTest<T extends TaskQueue<BytesValue>> {
assertThat(queue.isEmpty()).isTrue();
assertThat(queue.allTasksCompleted()).isTrue();
queue.enqueue(value);
queue.add(value);
assertThat(queue.isEmpty()).isFalse();
assertThat(queue.allTasksCompleted()).isFalse();
Task<BytesValue> task = queue.dequeue();
Task<BytesValue> task = queue.remove();
assertThat(task).isNotNull();
assertThat(task.getData()).isEqualTo(value);
assertThat(queue.isEmpty()).isTrue();
@ -122,8 +121,8 @@ abstract class AbstractTaskQueueTest<T extends TaskQueue<BytesValue>> {
BytesValue four = BytesValue.of(4);
// Fill queue
queue.enqueue(one);
queue.enqueue(two);
queue.add(one);
queue.add(two);
assertThat(queue.size()).isEqualTo(2);
assertThat(queue.isEmpty()).isFalse();
assertThat(queue.allTasksCompleted()).isFalse();
@ -133,14 +132,14 @@ abstract class AbstractTaskQueueTest<T extends TaskQueue<BytesValue>> {
assertThat(queue.size()).isEqualTo(0);
assertThat(queue.isEmpty()).isTrue();
assertThat(queue.allTasksCompleted()).isTrue();
assertThat(queue.dequeue()).isNull();
assertThat(queue.remove()).isNull();
// Subsequent operations should work as expected
queue.enqueue(three);
queue.add(three);
assertThat(queue.size()).isEqualTo(1);
queue.enqueue(four);
queue.add(four);
assertThat(queue.size()).isEqualTo(2);
assertThat(queue.dequeue().getData()).isEqualTo(three);
assertThat(queue.remove().getData()).isEqualTo(three);
}
}
@ -149,9 +148,9 @@ abstract class AbstractTaskQueueTest<T extends TaskQueue<BytesValue>> {
try (T queue = createQueue()) {
BytesValue one = BytesValue.of(1);
// Add and then dequeue task
queue.enqueue(one);
Task<BytesValue> task = queue.dequeue();
// Add and then remove task
queue.add(one);
Task<BytesValue> task = queue.remove();
assertThat(task.getData()).isEqualTo(one);
assertThat(queue.isEmpty()).isTrue();
assertThat(queue.allTasksCompleted()).isFalse();
@ -161,14 +160,14 @@ abstract class AbstractTaskQueueTest<T extends TaskQueue<BytesValue>> {
assertThat(queue.size()).isEqualTo(0);
assertThat(queue.isEmpty()).isTrue();
assertThat(queue.allTasksCompleted()).isTrue();
assertThat(queue.dequeue()).isNull();
assertThat(queue.remove()).isNull();
// Marking old task as failed should not requeue task
task.markFailed();
assertThat(queue.size()).isEqualTo(0);
assertThat(queue.isEmpty()).isTrue();
assertThat(queue.allTasksCompleted()).isTrue();
assertThat(queue.dequeue()).isNull();
assertThat(queue.remove()).isNull();
}
}
@ -188,7 +187,7 @@ abstract class AbstractTaskQueueTest<T extends TaskQueue<BytesValue>> {
() -> {
while (queuingFinished.getCount() > 0 || !queue.isEmpty()) {
if (!queue.isEmpty()) {
Task<BytesValue> value = queue.dequeue();
Task<BytesValue> value = queue.remove();
value.markCompleted();
dequeued.add(value);
}
@ -203,7 +202,7 @@ abstract class AbstractTaskQueueTest<T extends TaskQueue<BytesValue>> {
() -> {
try {
for (int i = 0; i < itemsPerThread; i++) {
queue.enqueue(value);
queue.add(value);
}
} finally {
queuingFinished.countDown();

@ -0,0 +1,210 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.tasks;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
public class CachingTaskCollectionTest {
private TaskCollection<BytesValue> wrappedTaskCollection;
@Before
public void setup() {
wrappedTaskCollection = new InMemoryTaskQueue<>();
}
@Test
public void failTasksFromCache() {
testFailTasks(10, 5);
}
@Test
public void failTasksOverflowingCache() {
testFailTasks(10, 20);
}
@Test
public void failTasksWithNoCache() {
testFailTasks(0, 5);
}
private void testFailTasks(final int cacheSize, final int taskCount) {
final CachingTaskCollection<BytesValue> taskCollection = createCachingCollection(cacheSize);
final List<BytesValue> taskData = generateTasks(taskCollection, taskCount);
assertThat(taskCollection.size()).isEqualTo(taskCount);
assertThat(taskCollection.allTasksCompleted()).isFalse();
List<Task<BytesValue>> tasks = getAllTasks(taskCollection);
assertThat(taskCollection.size()).isEqualTo(0);
assertThat(taskCollection.allTasksCompleted()).isFalse();
// Check tasks match what we added
assertThat(getTaskData(tasks)).containsExactlyInAnyOrder(taskData.toArray(new BytesValue[0]));
// Fail all tasks
tasks.forEach(Task::markFailed);
assertThat(taskCollection.size()).isEqualTo(taskCount);
assertThat(taskCollection.allTasksCompleted()).isFalse();
// Collect tasks again - they should have all been re-added
tasks = getAllTasks(taskCollection);
// Check tasks match what we added
assertThat(getTaskData(tasks)).containsExactlyInAnyOrder(taskData.toArray(new BytesValue[0]));
// Clear tasks and then fail all outstanding tasks
taskCollection.clear();
assertThat(taskCollection.isEmpty()).isTrue();
assertThat(taskCollection.allTasksCompleted()).isTrue();
// Old failed tasks should not be re-added
tasks.forEach(Task::markFailed);
assertThat(taskCollection.isEmpty()).isTrue();
assertThat(taskCollection.allTasksCompleted()).isTrue();
assertThat(taskCollection.size()).isEqualTo(0);
}
@Test
public void completeTasksFromCache() {
testCompleteTasks(10, 9);
}
@Test
public void completeTasksThatOverflowCache() {
testCompleteTasks(10, 20);
}
@Test
public void completeTasksWithNoCache() {
testCompleteTasks(0, 20);
}
private void testCompleteTasks(final int cacheSize, final int taskCount) {
final CachingTaskCollection<BytesValue> taskCollection = createCachingCollection(cacheSize);
final List<BytesValue> taskData = generateTasks(taskCollection, taskCount);
assertThat(taskCollection.size()).isEqualTo(taskCount);
assertThat(taskCollection.allTasksCompleted()).isFalse();
final List<Task<BytesValue>> tasks = getAllTasks(taskCollection);
assertThat(taskCollection.size()).isEqualTo(0);
assertThat(taskCollection.allTasksCompleted()).isFalse();
// Complete all but last task
tasks.subList(0, tasks.size() - 1).forEach(Task::markCompleted);
assertThat(taskCollection.allTasksCompleted()).isFalse();
// Process last task
tasks.get(tasks.size() - 1).markCompleted();
assertThat(taskCollection.size()).isEqualTo(0);
assertThat(taskCollection.allTasksCompleted()).isTrue();
assertThat(getTaskData(tasks)).containsExactlyInAnyOrder(taskData.toArray(new BytesValue[0]));
}
@Test
public void processTasksWithMixedSuccess_cachedTasks() {
testProcessTasksWithMixedSuccess(10, 5);
}
@Test
public void processTasksWithMixedSuccess_tasksOverflowCache() {
testProcessTasksWithMixedSuccess(10, 20);
}
@Test
public void processTasksWithMixedSuccess_noCache() {
testProcessTasksWithMixedSuccess(10, 20);
}
private CachingTaskCollection<BytesValue> createCachingCollection(final int cacheSize) {
return new CachingTaskCollection<>(wrappedTaskCollection, cacheSize);
}
private void testProcessTasksWithMixedSuccess(final int cacheSize, final int taskCount) {
final CachingTaskCollection<BytesValue> taskCollection = createCachingCollection(cacheSize);
final List<BytesValue> taskData = generateTasks(taskCollection, taskCount);
assertThat(taskCollection.size()).isEqualTo(taskCount);
assertThat(taskCollection.allTasksCompleted()).isFalse();
final List<Task<BytesValue>> tasks = getAllTasks(taskCollection);
final List<Task<BytesValue>> failedTasks = new ArrayList<>();
boolean shouldFail = false;
for (Task<BytesValue> task : tasks) {
if (shouldFail) {
task.markFailed();
failedTasks.add(task);
} else {
task.markCompleted();
}
shouldFail = !shouldFail;
}
assertThat(taskCollection.allTasksCompleted()).isFalse();
assertThat(taskCollection.size()).isEqualTo(failedTasks.size());
final List<BytesValue> actualTaskData =
tasks.stream().map(Task::getData).collect(Collectors.toList());
assertThat(actualTaskData).containsExactlyInAnyOrder(taskData.toArray(new BytesValue[0]));
final List<Task<BytesValue>> remainingTasks = getAllTasks(taskCollection);
assertThat(remainingTasks.size()).isEqualTo(failedTasks.size());
assertThat(getTaskData(remainingTasks))
.containsExactlyInAnyOrder(getTaskData(failedTasks).toArray(new BytesValue[0]));
}
@Test
public void close() throws IOException {
final CachingTaskCollection<BytesValue> taskCollection = createCachingCollection(10);
taskCollection.close();
assertThatThrownBy(() -> taskCollection.add(BytesValue.of(1)))
.isInstanceOf(IllegalStateException.class);
}
private List<BytesValue> generateTasks(
final TaskCollection<BytesValue> taskCollection, final int taskCount) {
final List<BytesValue> taskData = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
final BytesValue value = BytesValue.of(i & 0xff);
taskData.add(value);
taskCollection.add(value);
}
return taskData;
}
private List<BytesValue> getTaskData(final List<Task<BytesValue>> tasks) {
return tasks.stream().map(Task::getData).collect(Collectors.toList());
}
private List<Task<BytesValue>> getAllTasks(final TaskCollection<BytesValue> taskCollection) {
final List<Task<BytesValue>> tasks = new ArrayList<>();
while (taskCollection.size() > 0) {
tasks.add(taskCollection.remove());
}
return tasks;
}
private interface TaskCollectionSupplier {
TaskCollection<BytesValue> get() throws Exception;
}
}

@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;
package tech.pegasys.pantheon.services.tasks;
import tech.pegasys.pantheon.util.bytes.BytesValue;

@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;
package tech.pegasys.pantheon.services.tasks;
import static org.assertj.core.api.Assertions.assertThat;
@ -59,21 +59,21 @@ public class RocksDbTaskQueueTest extends AbstractTaskQueueTest<RocksDbTaskQueue
final Path dataDir = folder.newFolder().toPath();
try (final RocksDbTaskQueue<BytesValue> queue = createQueue(dataDir)) {
for (int i = 0; i < elementCount; i++) {
queue.enqueue(BytesValue.of(i));
queue.add(BytesValue.of(i));
}
}
try (final RocksDbTaskQueue<BytesValue> resumedQueue = createQueue(dataDir)) {
assertThat(resumedQueue.size()).isEqualTo(elementCount);
// Queue an additional element
resumedQueue.enqueue(BytesValue.of(99));
resumedQueue.add(BytesValue.of(99));
assertThat(resumedQueue.size()).isEqualTo(elementCount + 1);
// Check that everything dequeues in order as expected
for (int i = 0; i < elementCount; i++) {
assertThat(resumedQueue.dequeue().getData()).isEqualTo(BytesValue.of(i));
assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(i));
}
assertThat(resumedQueue.dequeue().getData()).isEqualTo(BytesValue.of(99));
assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(99));
assertThat(resumedQueue.size()).isEqualTo(0);
}

@ -36,6 +36,7 @@ include 'metrics'
include 'pantheon'
include 'services:kvstore'
include 'services:pipeline'
include 'services:tasks'
include 'services:queue'
include 'services:util'
include 'testutil'

Loading…
Cancel
Save