Add flat file based task collection (#1377)

Replaces the RocksDB based queue for pending world state download tasks with one that uses a simple file.  Added tasks are appended to the file while the reader starts from the beginning of the file and reads forwards.  

Periodically a new file is started to limit the disk space used. The reader deletes files it has completed reading.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 8937754c14
commit 135e869c20
  1. 7
      ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java
  2. 9
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastDownloaderFactory.java
  3. 2
      services/tasks/build.gradle
  4. 60
      services/tasks/src/jmh/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueBenchmark.java
  5. 271
      services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollection.java
  6. 242
      services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java
  7. 42
      services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/AbstractTaskQueueTest.java
  8. 98
      services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollectionTest.java
  9. 39
      services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java

@ -34,7 +34,7 @@ import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.kvstore.RocksDbConfiguration; import tech.pegasys.pantheon.services.kvstore.RocksDbConfiguration;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.RocksDbTaskQueue; import tech.pegasys.pantheon.services.tasks.FlatFileTaskCollection;
import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.nio.file.Path; import java.nio.file.Path;
@ -97,11 +97,10 @@ public class WorldStateDownloaderBenchmark {
pendingRequests = pendingRequests =
new CachingTaskCollection<>( new CachingTaskCollection<>(
RocksDbTaskQueue.create( new FlatFileTaskCollection<>(
tempDir.resolve("fastsync"), tempDir.resolve("fastsync"),
NodeDataRequest::serialize, NodeDataRequest::serialize,
NodeDataRequest::deserialize, NodeDataRequest::deserialize),
metricsSystem),
0); 0);
worldStateDownloader = worldStateDownloader =
new WorldStateDownloader( new WorldStateDownloader(

@ -26,7 +26,7 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.RocksDbTaskQueue; import tech.pegasys.pantheon.services.tasks.FlatFileTaskCollection;
import java.io.File; import java.io.File;
import java.nio.file.Path; import java.nio.file.Path;
@ -119,11 +119,8 @@ public class FastDownloaderFactory {
final Path dataDirectory, final MetricsSystem metricsSystem) { final Path dataDirectory, final MetricsSystem metricsSystem) {
final CachingTaskCollection<NodeDataRequest> taskCollection = final CachingTaskCollection<NodeDataRequest> taskCollection =
new CachingTaskCollection<>( new CachingTaskCollection<>(
RocksDbTaskQueue.create( new FlatFileTaskCollection<>(
dataDirectory, dataDirectory, NodeDataRequest::serialize, NodeDataRequest::deserialize));
NodeDataRequest::serialize,
NodeDataRequest::deserialize,
metricsSystem));
metricsSystem.createLongGauge( metricsSystem.createLongGauge(
MetricCategory.SYNCHRONIZER, MetricCategory.SYNCHRONIZER,

@ -33,9 +33,9 @@ dependencies {
implementation project(':metrics:core') implementation project(':metrics:core')
implementation project(':services:util') implementation project(':services:util')
implementation 'io.vertx:vertx-core'
implementation 'org.apache.logging.log4j:log4j-api' implementation 'org.apache.logging.log4j:log4j-api'
implementation 'com.google.guava:guava' implementation 'com.google.guava:guava'
implementation 'org.rocksdb:rocksdbjni'
runtime 'org.apache.logging.log4j:log4j-core' runtime 'org.apache.logging.log4j:log4j-core'

@ -1,60 +0,0 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.tasks;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.io.File;
import java.io.IOException;
import java.util.function.Function;
import com.google.common.io.Files;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
@State(Scope.Benchmark)
public class RocksDbTaskQueueBenchmark {
private File tempDir;
private RocksDbTaskQueue<BytesValue> queue;
@Setup(Level.Trial)
public void prepare() {
tempDir = Files.createTempDir();
queue =
RocksDbTaskQueue.create(
tempDir.toPath(), Function.identity(), Function.identity(), new NoOpMetricsSystem());
for (int i = 0; i < 1_000_000; i++) {
queue.add(UInt256.of(i).getBytes());
}
}
@TearDown
public void tearDown() throws IOException {
queue.close();
MoreFiles.deleteRecursively(tempDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);
}
@Benchmark
public Task<BytesValue> dequeue() {
return queue.remove();
}
}

@ -0,0 +1,271 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.tasks;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class FlatFileTaskCollection<T> implements TaskCollection<T> {
private static final Logger LOG = LogManager.getLogger();
private static final long DEFAULT_FILE_ROLL_SIZE_BYTES = 1024 * 1024 * 10; // 10Mb
static final String FILENAME_PREFIX = "tasks";
private final Set<FlatFileTask<T>> outstandingTasks = new HashSet<>();
private final Path storageDirectory;
private final Function<T, BytesValue> serializer;
private final Function<BytesValue, T> deserializer;
private final long rollWhenFileSizeExceedsBytes;
private final ByteBuffer lengthBuffer = ByteBuffer.allocate(Integer.SIZE);
private FileChannel readFileChannel;
private FileChannel writeFileChannel;
private long size = 0;
private int readFileNumber = 0;
private int writeFileNumber = 0;
public FlatFileTaskCollection(
final Path storageDirectory,
final Function<T, BytesValue> serializer,
final Function<BytesValue, T> deserializer) {
this(storageDirectory, serializer, deserializer, DEFAULT_FILE_ROLL_SIZE_BYTES);
}
FlatFileTaskCollection(
final Path storageDirectory,
final Function<T, BytesValue> serializer,
final Function<BytesValue, T> deserializer,
final long rollWhenFileSizeExceedsBytes) {
this.storageDirectory = storageDirectory;
this.serializer = serializer;
this.deserializer = deserializer;
this.rollWhenFileSizeExceedsBytes = rollWhenFileSizeExceedsBytes;
writeFileChannel = openWriteFileChannel(writeFileNumber);
readFileChannel = openReadFileChannel(readFileNumber);
}
private FileChannel openReadFileChannel(final int fileNumber) {
try {
return FileChannel.open(
pathForFileNumber(fileNumber),
StandardOpenOption.DELETE_ON_CLOSE,
StandardOpenOption.READ);
} catch (final IOException e) {
throw new StorageException(e);
}
}
private FileChannel openWriteFileChannel(final int fileNumber) {
try {
return FileChannel.open(
pathForFileNumber(fileNumber),
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
} catch (final IOException e) {
throw new StorageException(e);
}
}
@Override
public synchronized void add(final T taskData) {
final BytesValue data = serializer.apply(taskData);
try {
writeTaskData(data);
size++;
if (writeFileChannel.size() > rollWhenFileSizeExceedsBytes) {
LOG.debug("Writing reached end of file {}", writeFileNumber);
writeFileChannel.close();
writeFileNumber++;
writeFileChannel = openWriteFileChannel(writeFileNumber);
}
} catch (final IOException e) {
throw new StorageException(e);
}
}
@Override
public synchronized Task<T> remove() {
if (isEmpty()) {
return null;
}
try {
final ByteBuffer dataBuffer = readNextTaskData();
final T data = deserializer.apply(BytesValue.wrapBuffer(dataBuffer));
final FlatFileTask<T> task = new FlatFileTask<>(this, data);
outstandingTasks.add(task);
size--;
return task;
} catch (final IOException e) {
throw new StorageException(e);
}
}
private ByteBuffer readNextTaskData() throws IOException {
final int dataLength = readDataLength();
final ByteBuffer dataBuffer = ByteBuffer.allocate(dataLength);
readBytes(dataBuffer, dataLength);
return dataBuffer;
}
private void writeTaskData(final BytesValue data) throws IOException {
final long offset = writeFileChannel.size();
writeDataLength(data.size(), offset);
writeFileChannel.write(ByteBuffer.wrap(data.getArrayUnsafe()), offset + Integer.SIZE);
}
private int readDataLength() throws IOException {
lengthBuffer.position(0);
lengthBuffer.limit(Integer.SIZE);
readBytes(lengthBuffer, Integer.SIZE);
return lengthBuffer.getInt(0);
}
private void writeDataLength(final int size, final long offset) throws IOException {
lengthBuffer.position(0);
lengthBuffer.putInt(size);
lengthBuffer.flip();
writeFileChannel.write(lengthBuffer, offset);
}
private void readBytes(final ByteBuffer buffer, final int expectedLength) throws IOException {
int readBytes = readFileChannel.read(buffer);
if (readBytes == -1 && writeFileNumber > readFileNumber) {
LOG.debug("Reading reached end of file {}", readFileNumber);
readFileChannel.close();
readFileNumber++;
readFileChannel = openReadFileChannel(readFileNumber);
readBytes = readFileChannel.read(buffer);
}
if (readBytes != expectedLength) {
throw new IllegalStateException(
"Task queue corrupted. Expected to read "
+ expectedLength
+ " bytes but only got "
+ readBytes);
}
}
@Override
public synchronized long size() {
return size;
}
@Override
public synchronized boolean isEmpty() {
return size() == 0;
}
@Override
public synchronized void clear() {
outstandingTasks.clear();
try {
readFileChannel.close();
writeFileChannel.close();
for (int i = readFileNumber; i <= writeFileNumber; i++) {
final File file = pathForFileNumber(i).toFile();
if (!file.delete() && file.exists()) {
LOG.error("Failed to delete tasks file {}", file.getAbsolutePath());
}
}
readFileNumber = 0;
writeFileNumber = 0;
writeFileChannel = openWriteFileChannel(writeFileNumber);
readFileChannel = openReadFileChannel(readFileNumber);
size = 0;
} catch (final IOException e) {
throw new StorageException(e);
}
}
@Override
public synchronized boolean allTasksCompleted() {
return isEmpty() && outstandingTasks.isEmpty();
}
@Override
public synchronized void close() {
try {
readFileChannel.close();
writeFileChannel.close();
} catch (final IOException e) {
throw new StorageException(e);
}
}
private Path pathForFileNumber(final int fileNumber) {
return storageDirectory.resolve(FILENAME_PREFIX + fileNumber);
}
private synchronized boolean markTaskCompleted(final FlatFileTask<T> task) {
return outstandingTasks.remove(task);
}
private synchronized void handleFailedTask(final FlatFileTask<T> task) {
if (markTaskCompleted(task)) {
add(task.getData());
}
}
public static class StorageException extends RuntimeException {
StorageException(final Throwable t) {
super(t);
}
}
private static class FlatFileTask<T> implements Task<T> {
private final AtomicBoolean completed = new AtomicBoolean(false);
private final FlatFileTaskCollection<T> parentQueue;
private final T data;
private FlatFileTask(final FlatFileTaskCollection<T> parentQueue, final T data) {
this.parentQueue = parentQueue;
this.data = data;
}
@Override
public T getData() {
return data;
}
@Override
public void markCompleted() {
if (completed.compareAndSet(false, true)) {
parentQueue.markTaskCompleted(this);
}
}
@Override
public void markFailed() {
if (completed.compareAndSet(false, true)) {
parentQueue.handleFailedTask(this);
}
}
}
}

@ -1,242 +0,0 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.tasks;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.util.RocksDbUtil;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import com.google.common.primitives.Longs;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
public class RocksDbTaskQueue<T> implements TaskCollection<T> {
private final Options options;
private final RocksDB db;
private long lastEnqueuedKey = 0;
private long lastDequeuedKey = 0;
private RocksIterator dequeueIterator;
private long lastValidKeyFromIterator;
private final Set<RocksDbTask<T>> outstandingTasks = new HashSet<>();
private boolean closed = false;
private final Function<T, BytesValue> serializer;
private final Function<BytesValue, T> deserializer;
private final OperationTimer enqueueLatency;
private final OperationTimer dequeueLatency;
private RocksDbTaskQueue(
final Path storageDirectory,
final Function<T, BytesValue> serializer,
final Function<BytesValue, T> deserializer,
final MetricsSystem metricsSystem) {
this.serializer = serializer;
this.deserializer = deserializer;
try {
RocksDbUtil.loadNativeLibrary();
// We don't support reloading data so ensure we're starting from a clean slate.
RocksDB.destroyDB(storageDirectory.toString(), new Options());
options = new Options().setCreateIfMissing(true).setErrorIfExists(true);
db = RocksDB.open(options, storageDirectory.toString());
enqueueLatency =
metricsSystem.createTimer(
MetricCategory.BIG_QUEUE,
"enqueue_latency_seconds",
"Latency for enqueuing an item.");
dequeueLatency =
metricsSystem.createTimer(
MetricCategory.BIG_QUEUE,
"dequeue_latency_seconds",
"Latency for dequeuing an item.");
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}
public static <T> RocksDbTaskQueue<T> create(
final Path storageDirectory,
final Function<T, BytesValue> serializer,
final Function<BytesValue, T> deserializer,
final MetricsSystem metricsSystem) {
return new RocksDbTaskQueue<>(storageDirectory, serializer, deserializer, metricsSystem);
}
@Override
public synchronized void add(final T taskData) {
assertNotClosed();
try (final OperationTimer.TimingContext ignored = enqueueLatency.startTimer()) {
final long key = ++lastEnqueuedKey;
db.put(Longs.toByteArray(key), serializer.apply(taskData).getArrayUnsafe());
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}
@Override
public synchronized Task<T> remove() {
assertNotClosed();
if (isEmpty()) {
return null;
}
try (final OperationTimer.TimingContext ignored = dequeueLatency.startTimer()) {
if (dequeueIterator == null) {
createNewIterator();
}
final long key = ++lastDequeuedKey;
dequeueIterator.seek(Longs.toByteArray(key));
if (key > lastValidKeyFromIterator || !dequeueIterator.isValid()) {
// Reached the end of the snapshot this iterator was loaded with
dequeueIterator.close();
createNewIterator();
dequeueIterator.seek(Longs.toByteArray(key));
if (!dequeueIterator.isValid()) {
throw new IllegalStateException("Next expected value is missing");
}
}
final byte[] value = dequeueIterator.value();
final BytesValue data = BytesValue.of(value);
final RocksDbTask<T> task = new RocksDbTask<>(this, deserializer.apply(data), key);
outstandingTasks.add(task);
return task;
}
}
private void createNewIterator() {
dequeueIterator = db.newIterator();
lastValidKeyFromIterator = lastEnqueuedKey;
}
@Override
public synchronized long size() {
if (closed) {
return 0;
}
return lastEnqueuedKey - lastDequeuedKey;
}
@Override
public synchronized boolean isEmpty() {
return size() == 0;
}
@Override
public synchronized void clear() {
assertNotClosed();
outstandingTasks.clear();
final byte[] from = Longs.toByteArray(0);
final byte[] to = Longs.toByteArray(lastEnqueuedKey + 1);
try {
db.deleteRange(from, to);
if (dequeueIterator != null) {
dequeueIterator.close();
dequeueIterator = null;
}
lastDequeuedKey = 0;
lastEnqueuedKey = 0;
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}
@Override
public synchronized boolean allTasksCompleted() {
return isEmpty() && outstandingTasks.isEmpty();
}
@Override
public synchronized void close() {
if (closed) {
return;
}
closed = true;
if (dequeueIterator != null) {
dequeueIterator.close();
}
options.close();
db.close();
}
private void assertNotClosed() {
if (closed) {
throw new IllegalStateException("Attempt to access closed " + getClass().getSimpleName());
}
}
private synchronized boolean markTaskCompleted(final RocksDbTask<T> task) {
return outstandingTasks.remove(task);
}
private synchronized void handleFailedTask(final RocksDbTask<T> task) {
if (markTaskCompleted(task)) {
add(task.getData());
}
}
public static class StorageException extends RuntimeException {
StorageException(final Throwable t) {
super(t);
}
}
private static class RocksDbTask<T> implements Task<T> {
private final AtomicBoolean completed = new AtomicBoolean(false);
private final RocksDbTaskQueue<T> parentQueue;
private final T data;
private final long key;
private RocksDbTask(final RocksDbTaskQueue<T> parentQueue, final T data, final long key) {
this.parentQueue = parentQueue;
this.data = data;
this.key = key;
}
public long getKey() {
return key;
}
@Override
public T getData() {
return data;
}
@Override
public void markCompleted() {
if (completed.compareAndSet(false, true)) {
parentQueue.markTaskCompleted(this);
}
}
@Override
public void markFailed() {
if (completed.compareAndSet(false, true)) {
parentQueue.handleFailedTask(this);
}
}
}
}

@ -30,10 +30,10 @@ abstract class AbstractTaskQueueTest<T extends TaskCollection<BytesValue>> {
@Test @Test
public void enqueueAndDequeue() throws Exception { public void enqueueAndDequeue() throws Exception {
try (T queue = createQueue()) { try (final T queue = createQueue()) {
BytesValue one = BytesValue.of(1); final BytesValue one = BytesValue.of(1);
BytesValue two = BytesValue.of(2); final BytesValue two = BytesValue.of(2);
BytesValue three = BytesValue.of(3); final BytesValue three = BytesValue.of(3);
assertThat(queue.remove()).isNull(); assertThat(queue.remove()).isNull();
@ -54,8 +54,8 @@ abstract class AbstractTaskQueueTest<T extends TaskCollection<BytesValue>> {
@Test @Test
public void markTaskFailed() throws Exception { public void markTaskFailed() throws Exception {
try (T queue = createQueue()) { try (final T queue = createQueue()) {
BytesValue value = BytesValue.of(1); final BytesValue value = BytesValue.of(1);
assertThat(queue.isEmpty()).isTrue(); assertThat(queue.isEmpty()).isTrue();
assertThat(queue.allTasksCompleted()).isTrue(); assertThat(queue.allTasksCompleted()).isTrue();
@ -65,7 +65,7 @@ abstract class AbstractTaskQueueTest<T extends TaskCollection<BytesValue>> {
assertThat(queue.isEmpty()).isFalse(); assertThat(queue.isEmpty()).isFalse();
assertThat(queue.allTasksCompleted()).isFalse(); assertThat(queue.allTasksCompleted()).isFalse();
Task<BytesValue> task = queue.remove(); final Task<BytesValue> task = queue.remove();
assertThat(task).isNotNull(); assertThat(task).isNotNull();
assertThat(task.getData()).isEqualTo(value); assertThat(task.getData()).isEqualTo(value);
assertThat(queue.isEmpty()).isTrue(); assertThat(queue.isEmpty()).isTrue();
@ -84,8 +84,8 @@ abstract class AbstractTaskQueueTest<T extends TaskCollection<BytesValue>> {
@Test @Test
public void markTaskCompleted() throws Exception { public void markTaskCompleted() throws Exception {
try (T queue = createQueue()) { try (final T queue = createQueue()) {
BytesValue value = BytesValue.of(1); final BytesValue value = BytesValue.of(1);
assertThat(queue.isEmpty()).isTrue(); assertThat(queue.isEmpty()).isTrue();
assertThat(queue.allTasksCompleted()).isTrue(); assertThat(queue.allTasksCompleted()).isTrue();
@ -95,7 +95,7 @@ abstract class AbstractTaskQueueTest<T extends TaskCollection<BytesValue>> {
assertThat(queue.isEmpty()).isFalse(); assertThat(queue.isEmpty()).isFalse();
assertThat(queue.allTasksCompleted()).isFalse(); assertThat(queue.allTasksCompleted()).isFalse();
Task<BytesValue> task = queue.remove(); final Task<BytesValue> task = queue.remove();
assertThat(task).isNotNull(); assertThat(task).isNotNull();
assertThat(task.getData()).isEqualTo(value); assertThat(task.getData()).isEqualTo(value);
assertThat(queue.isEmpty()).isTrue(); assertThat(queue.isEmpty()).isTrue();
@ -114,11 +114,11 @@ abstract class AbstractTaskQueueTest<T extends TaskCollection<BytesValue>> {
@Test @Test
public void clear() throws Exception { public void clear() throws Exception {
try (T queue = createQueue()) { try (final T queue = createQueue()) {
BytesValue one = BytesValue.of(1); final BytesValue one = BytesValue.of(1);
BytesValue two = BytesValue.of(2); final BytesValue two = BytesValue.of(2);
BytesValue three = BytesValue.of(3); final BytesValue three = BytesValue.of(3);
BytesValue four = BytesValue.of(4); final BytesValue four = BytesValue.of(4);
// Fill queue // Fill queue
queue.add(one); queue.add(one);
@ -145,12 +145,12 @@ abstract class AbstractTaskQueueTest<T extends TaskCollection<BytesValue>> {
@Test @Test
public void clear_emptyQueueWithOutstandingTasks() throws Exception { public void clear_emptyQueueWithOutstandingTasks() throws Exception {
try (T queue = createQueue()) { try (final T queue = createQueue()) {
BytesValue one = BytesValue.of(1); final BytesValue one = BytesValue.of(1);
// Add and then remove task // Add and then remove task
queue.add(one); queue.add(one);
Task<BytesValue> task = queue.remove(); final Task<BytesValue> task = queue.remove();
assertThat(task.getData()).isEqualTo(one); assertThat(task.getData()).isEqualTo(one);
assertThat(queue.isEmpty()).isTrue(); assertThat(queue.isEmpty()).isTrue();
assertThat(queue.allTasksCompleted()).isFalse(); assertThat(queue.allTasksCompleted()).isFalse();
@ -181,13 +181,13 @@ abstract class AbstractTaskQueueTest<T extends TaskCollection<BytesValue>> {
final CountDownLatch queuingFinished = new CountDownLatch(threadCount); final CountDownLatch queuingFinished = new CountDownLatch(threadCount);
// Start thread for reading values // Start thread for reading values
List<Task<BytesValue>> dequeued = new ArrayList<>(); final List<Task<BytesValue>> dequeued = new ArrayList<>();
Thread reader = final Thread reader =
new Thread( new Thread(
() -> { () -> {
while (queuingFinished.getCount() > 0 || !queue.isEmpty()) { while (queuingFinished.getCount() > 0 || !queue.isEmpty()) {
if (!queue.isEmpty()) { if (!queue.isEmpty()) {
Task<BytesValue> value = queue.remove(); final Task<BytesValue> value = queue.remove();
value.markCompleted(); value.markCompleted();
dequeued.add(value); dequeued.add(value);
} }

@ -0,0 +1,98 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.tasks;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class FlatFileTaskCollectionTest
extends AbstractTaskQueueTest<FlatFileTaskCollection<BytesValue>> {
private static final int ROLL_SIZE = 10;
@Rule public final TemporaryFolder folder = new TemporaryFolder();
@Override
protected FlatFileTaskCollection<BytesValue> createQueue() throws IOException {
final Path dataDir = folder.newFolder().toPath();
return createQueue(dataDir);
}
private FlatFileTaskCollection<BytesValue> createQueue(final Path dataDir) {
return new FlatFileTaskCollection<>(
dataDir, Function.identity(), Function.identity(), ROLL_SIZE);
}
@Test
public void shouldRollFilesWhenSizeExceeded() throws Exception {
final Path dataDir = folder.newFolder().toPath();
try (final FlatFileTaskCollection<BytesValue> queue = createQueue(dataDir)) {
final List<BytesValue> tasks = new ArrayList<>();
addItem(queue, tasks, 0);
final File[] currentFiles = getCurrentFiles(dataDir);
assertThat(currentFiles).hasSize(1);
final File firstFile = currentFiles[0];
int tasksInFirstFile = 1;
while (getCurrentFiles(dataDir).length == 1) {
addItem(queue, tasks, tasksInFirstFile);
tasksInFirstFile++;
}
assertThat(getCurrentFiles(dataDir)).hasSizeGreaterThan(1);
assertThat(getCurrentFiles(dataDir)).contains(firstFile);
// Add an extra item to be sure we have at least one in a later file
addItem(queue, tasks, 123);
final List<BytesValue> removedTasks = new ArrayList<>();
// Read through all the items in the first file.
for (int i = 0; i < tasksInFirstFile; i++) {
removedTasks.add(queue.remove().getData());
}
// Fully read files should have been removed.
assertThat(getCurrentFiles(dataDir)).doesNotContain(firstFile);
// Check that all tasks were read correctly.
removedTasks.add(queue.remove().getData());
assertThat(queue.isEmpty()).isTrue();
assertThat(removedTasks).isEqualTo(tasks);
}
}
private void addItem(
final FlatFileTaskCollection<BytesValue> queue,
final List<BytesValue> tasks,
final int value) {
tasks.add(BytesValue.of(value));
queue.add(BytesValue.of(value));
}
private File[] getCurrentFiles(final Path dataDir) {
return dataDir
.toFile()
.listFiles((dir, name) -> name.startsWith(FlatFileTaskCollection.FILENAME_PREFIX));
}
}

@ -1,39 +0,0 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.tasks;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.io.IOException;
import java.nio.file.Path;
import java.util.function.Function;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
public class RocksDbTaskQueueTest extends AbstractTaskQueueTest<RocksDbTaskQueue<BytesValue>> {
@Rule public final TemporaryFolder folder = new TemporaryFolder();
@Override
protected RocksDbTaskQueue<BytesValue> createQueue() throws IOException {
final Path dataDir = folder.newFolder().toPath();
return createQueue(dataDir);
}
private RocksDbTaskQueue<BytesValue> createQueue(final Path dataDir) {
return RocksDbTaskQueue.create(
dataDir, Function.identity(), Function.identity(), new NoOpMetricsSystem());
}
}
Loading…
Cancel
Save