diff --git a/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java b/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java index 0d00d5ca11..1e63b6d858 100644 --- a/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java +++ b/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java @@ -34,7 +34,7 @@ import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; import tech.pegasys.pantheon.services.kvstore.RocksDbConfiguration; import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; -import tech.pegasys.pantheon.services.tasks.RocksDbTaskQueue; +import tech.pegasys.pantheon.services.tasks.FlatFileTaskCollection; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.nio.file.Path; @@ -97,11 +97,10 @@ public class WorldStateDownloaderBenchmark { pendingRequests = new CachingTaskCollection<>( - RocksDbTaskQueue.create( + new FlatFileTaskCollection<>( tempDir.resolve("fastsync"), NodeDataRequest::serialize, - NodeDataRequest::deserialize, - metricsSystem), + NodeDataRequest::deserialize), 0); worldStateDownloader = new WorldStateDownloader( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastDownloaderFactory.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastDownloaderFactory.java index 5c331cf3ab..167775113c 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastDownloaderFactory.java @@ -26,7 +26,7 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; -import tech.pegasys.pantheon.services.tasks.RocksDbTaskQueue; +import tech.pegasys.pantheon.services.tasks.FlatFileTaskCollection; import java.io.File; import java.nio.file.Path; @@ -119,11 +119,8 @@ public class FastDownloaderFactory { final Path dataDirectory, final MetricsSystem metricsSystem) { final CachingTaskCollection taskCollection = new CachingTaskCollection<>( - RocksDbTaskQueue.create( - dataDirectory, - NodeDataRequest::serialize, - NodeDataRequest::deserialize, - metricsSystem)); + new FlatFileTaskCollection<>( + dataDirectory, NodeDataRequest::serialize, NodeDataRequest::deserialize)); metricsSystem.createLongGauge( MetricCategory.SYNCHRONIZER, diff --git a/services/tasks/build.gradle b/services/tasks/build.gradle index 5345c5e811..670382094d 100644 --- a/services/tasks/build.gradle +++ b/services/tasks/build.gradle @@ -33,9 +33,9 @@ dependencies { implementation project(':metrics:core') implementation project(':services:util') + implementation 'io.vertx:vertx-core' implementation 'org.apache.logging.log4j:log4j-api' implementation 'com.google.guava:guava' - implementation 'org.rocksdb:rocksdbjni' runtime 'org.apache.logging.log4j:log4j-core' diff --git a/services/tasks/src/jmh/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueBenchmark.java b/services/tasks/src/jmh/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueBenchmark.java deleted file mode 100644 index c28643159c..0000000000 --- a/services/tasks/src/jmh/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueBenchmark.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.services.tasks; - -import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; -import tech.pegasys.pantheon.util.bytes.BytesValue; -import tech.pegasys.pantheon.util.uint.UInt256; - -import java.io.File; -import java.io.IOException; -import java.util.function.Function; - -import com.google.common.io.Files; -import com.google.common.io.MoreFiles; -import com.google.common.io.RecursiveDeleteOption; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; - -@State(Scope.Benchmark) -public class RocksDbTaskQueueBenchmark { - - private File tempDir; - private RocksDbTaskQueue queue; - - @Setup(Level.Trial) - public void prepare() { - tempDir = Files.createTempDir(); - queue = - RocksDbTaskQueue.create( - tempDir.toPath(), Function.identity(), Function.identity(), new NoOpMetricsSystem()); - for (int i = 0; i < 1_000_000; i++) { - queue.add(UInt256.of(i).getBytes()); - } - } - - @TearDown - public void tearDown() throws IOException { - queue.close(); - MoreFiles.deleteRecursively(tempDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE); - } - - @Benchmark - public Task dequeue() { - return queue.remove(); - } -} diff --git a/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollection.java b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollection.java new file mode 100644 index 0000000000..49e8d1da7a --- /dev/null +++ b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollection.java @@ -0,0 +1,271 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.tasks; + +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class FlatFileTaskCollection implements TaskCollection { + private static final Logger LOG = LogManager.getLogger(); + private static final long DEFAULT_FILE_ROLL_SIZE_BYTES = 1024 * 1024 * 10; // 10Mb + static final String FILENAME_PREFIX = "tasks"; + private final Set> outstandingTasks = new HashSet<>(); + + private final Path storageDirectory; + private final Function serializer; + private final Function deserializer; + private final long rollWhenFileSizeExceedsBytes; + + private final ByteBuffer lengthBuffer = ByteBuffer.allocate(Integer.SIZE); + + private FileChannel readFileChannel; + private FileChannel writeFileChannel; + + private long size = 0; + private int readFileNumber = 0; + private int writeFileNumber = 0; + + public FlatFileTaskCollection( + final Path storageDirectory, + final Function serializer, + final Function deserializer) { + this(storageDirectory, serializer, deserializer, DEFAULT_FILE_ROLL_SIZE_BYTES); + } + + FlatFileTaskCollection( + final Path storageDirectory, + final Function serializer, + final Function deserializer, + final long rollWhenFileSizeExceedsBytes) { + this.storageDirectory = storageDirectory; + this.serializer = serializer; + this.deserializer = deserializer; + this.rollWhenFileSizeExceedsBytes = rollWhenFileSizeExceedsBytes; + writeFileChannel = openWriteFileChannel(writeFileNumber); + readFileChannel = openReadFileChannel(readFileNumber); + } + + private FileChannel openReadFileChannel(final int fileNumber) { + try { + return FileChannel.open( + pathForFileNumber(fileNumber), + StandardOpenOption.DELETE_ON_CLOSE, + StandardOpenOption.READ); + } catch (final IOException e) { + throw new StorageException(e); + } + } + + private FileChannel openWriteFileChannel(final int fileNumber) { + try { + return FileChannel.open( + pathForFileNumber(fileNumber), + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE); + } catch (final IOException e) { + throw new StorageException(e); + } + } + + @Override + public synchronized void add(final T taskData) { + final BytesValue data = serializer.apply(taskData); + try { + writeTaskData(data); + size++; + if (writeFileChannel.size() > rollWhenFileSizeExceedsBytes) { + LOG.debug("Writing reached end of file {}", writeFileNumber); + writeFileChannel.close(); + writeFileNumber++; + writeFileChannel = openWriteFileChannel(writeFileNumber); + } + } catch (final IOException e) { + throw new StorageException(e); + } + } + + @Override + public synchronized Task remove() { + if (isEmpty()) { + return null; + } + try { + final ByteBuffer dataBuffer = readNextTaskData(); + final T data = deserializer.apply(BytesValue.wrapBuffer(dataBuffer)); + final FlatFileTask task = new FlatFileTask<>(this, data); + outstandingTasks.add(task); + size--; + return task; + } catch (final IOException e) { + throw new StorageException(e); + } + } + + private ByteBuffer readNextTaskData() throws IOException { + final int dataLength = readDataLength(); + final ByteBuffer dataBuffer = ByteBuffer.allocate(dataLength); + readBytes(dataBuffer, dataLength); + return dataBuffer; + } + + private void writeTaskData(final BytesValue data) throws IOException { + final long offset = writeFileChannel.size(); + writeDataLength(data.size(), offset); + writeFileChannel.write(ByteBuffer.wrap(data.getArrayUnsafe()), offset + Integer.SIZE); + } + + private int readDataLength() throws IOException { + lengthBuffer.position(0); + lengthBuffer.limit(Integer.SIZE); + readBytes(lengthBuffer, Integer.SIZE); + return lengthBuffer.getInt(0); + } + + private void writeDataLength(final int size, final long offset) throws IOException { + lengthBuffer.position(0); + lengthBuffer.putInt(size); + lengthBuffer.flip(); + writeFileChannel.write(lengthBuffer, offset); + } + + private void readBytes(final ByteBuffer buffer, final int expectedLength) throws IOException { + int readBytes = readFileChannel.read(buffer); + + if (readBytes == -1 && writeFileNumber > readFileNumber) { + LOG.debug("Reading reached end of file {}", readFileNumber); + readFileChannel.close(); + readFileNumber++; + readFileChannel = openReadFileChannel(readFileNumber); + + readBytes = readFileChannel.read(buffer); + } + if (readBytes != expectedLength) { + throw new IllegalStateException( + "Task queue corrupted. Expected to read " + + expectedLength + + " bytes but only got " + + readBytes); + } + } + + @Override + public synchronized long size() { + return size; + } + + @Override + public synchronized boolean isEmpty() { + return size() == 0; + } + + @Override + public synchronized void clear() { + outstandingTasks.clear(); + try { + readFileChannel.close(); + writeFileChannel.close(); + for (int i = readFileNumber; i <= writeFileNumber; i++) { + final File file = pathForFileNumber(i).toFile(); + if (!file.delete() && file.exists()) { + LOG.error("Failed to delete tasks file {}", file.getAbsolutePath()); + } + } + readFileNumber = 0; + writeFileNumber = 0; + writeFileChannel = openWriteFileChannel(writeFileNumber); + readFileChannel = openReadFileChannel(readFileNumber); + size = 0; + } catch (final IOException e) { + throw new StorageException(e); + } + } + + @Override + public synchronized boolean allTasksCompleted() { + return isEmpty() && outstandingTasks.isEmpty(); + } + + @Override + public synchronized void close() { + try { + readFileChannel.close(); + writeFileChannel.close(); + } catch (final IOException e) { + throw new StorageException(e); + } + } + + private Path pathForFileNumber(final int fileNumber) { + return storageDirectory.resolve(FILENAME_PREFIX + fileNumber); + } + + private synchronized boolean markTaskCompleted(final FlatFileTask task) { + return outstandingTasks.remove(task); + } + + private synchronized void handleFailedTask(final FlatFileTask task) { + if (markTaskCompleted(task)) { + add(task.getData()); + } + } + + public static class StorageException extends RuntimeException { + StorageException(final Throwable t) { + super(t); + } + } + + private static class FlatFileTask implements Task { + private final AtomicBoolean completed = new AtomicBoolean(false); + private final FlatFileTaskCollection parentQueue; + private final T data; + + private FlatFileTask(final FlatFileTaskCollection parentQueue, final T data) { + this.parentQueue = parentQueue; + this.data = data; + } + + @Override + public T getData() { + return data; + } + + @Override + public void markCompleted() { + if (completed.compareAndSet(false, true)) { + parentQueue.markTaskCompleted(this); + } + } + + @Override + public void markFailed() { + if (completed.compareAndSet(false, true)) { + parentQueue.handleFailedTask(this); + } + } + } +} diff --git a/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java deleted file mode 100644 index ab3353c728..0000000000 --- a/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.services.tasks; - -import tech.pegasys.pantheon.metrics.MetricCategory; -import tech.pegasys.pantheon.metrics.MetricsSystem; -import tech.pegasys.pantheon.metrics.OperationTimer; -import tech.pegasys.pantheon.services.util.RocksDbUtil; -import tech.pegasys.pantheon.util.bytes.BytesValue; - -import java.nio.file.Path; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; - -import com.google.common.primitives.Longs; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; - -public class RocksDbTaskQueue implements TaskCollection { - - private final Options options; - private final RocksDB db; - - private long lastEnqueuedKey = 0; - private long lastDequeuedKey = 0; - private RocksIterator dequeueIterator; - private long lastValidKeyFromIterator; - private final Set> outstandingTasks = new HashSet<>(); - - private boolean closed = false; - - private final Function serializer; - private final Function deserializer; - - private final OperationTimer enqueueLatency; - private final OperationTimer dequeueLatency; - - private RocksDbTaskQueue( - final Path storageDirectory, - final Function serializer, - final Function deserializer, - final MetricsSystem metricsSystem) { - this.serializer = serializer; - this.deserializer = deserializer; - try { - RocksDbUtil.loadNativeLibrary(); - // We don't support reloading data so ensure we're starting from a clean slate. - RocksDB.destroyDB(storageDirectory.toString(), new Options()); - options = new Options().setCreateIfMissing(true).setErrorIfExists(true); - db = RocksDB.open(options, storageDirectory.toString()); - - enqueueLatency = - metricsSystem.createTimer( - MetricCategory.BIG_QUEUE, - "enqueue_latency_seconds", - "Latency for enqueuing an item."); - dequeueLatency = - metricsSystem.createTimer( - MetricCategory.BIG_QUEUE, - "dequeue_latency_seconds", - "Latency for dequeuing an item."); - - } catch (final RocksDBException e) { - throw new StorageException(e); - } - } - - public static RocksDbTaskQueue create( - final Path storageDirectory, - final Function serializer, - final Function deserializer, - final MetricsSystem metricsSystem) { - return new RocksDbTaskQueue<>(storageDirectory, serializer, deserializer, metricsSystem); - } - - @Override - public synchronized void add(final T taskData) { - assertNotClosed(); - try (final OperationTimer.TimingContext ignored = enqueueLatency.startTimer()) { - final long key = ++lastEnqueuedKey; - db.put(Longs.toByteArray(key), serializer.apply(taskData).getArrayUnsafe()); - } catch (final RocksDBException e) { - throw new StorageException(e); - } - } - - @Override - public synchronized Task remove() { - assertNotClosed(); - if (isEmpty()) { - return null; - } - try (final OperationTimer.TimingContext ignored = dequeueLatency.startTimer()) { - if (dequeueIterator == null) { - createNewIterator(); - } - final long key = ++lastDequeuedKey; - dequeueIterator.seek(Longs.toByteArray(key)); - if (key > lastValidKeyFromIterator || !dequeueIterator.isValid()) { - // Reached the end of the snapshot this iterator was loaded with - dequeueIterator.close(); - createNewIterator(); - dequeueIterator.seek(Longs.toByteArray(key)); - if (!dequeueIterator.isValid()) { - throw new IllegalStateException("Next expected value is missing"); - } - } - final byte[] value = dequeueIterator.value(); - final BytesValue data = BytesValue.of(value); - final RocksDbTask task = new RocksDbTask<>(this, deserializer.apply(data), key); - outstandingTasks.add(task); - return task; - } - } - - private void createNewIterator() { - dequeueIterator = db.newIterator(); - lastValidKeyFromIterator = lastEnqueuedKey; - } - - @Override - public synchronized long size() { - if (closed) { - return 0; - } - return lastEnqueuedKey - lastDequeuedKey; - } - - @Override - public synchronized boolean isEmpty() { - return size() == 0; - } - - @Override - public synchronized void clear() { - assertNotClosed(); - outstandingTasks.clear(); - final byte[] from = Longs.toByteArray(0); - final byte[] to = Longs.toByteArray(lastEnqueuedKey + 1); - try { - db.deleteRange(from, to); - if (dequeueIterator != null) { - dequeueIterator.close(); - dequeueIterator = null; - } - lastDequeuedKey = 0; - lastEnqueuedKey = 0; - } catch (final RocksDBException e) { - throw new StorageException(e); - } - } - - @Override - public synchronized boolean allTasksCompleted() { - return isEmpty() && outstandingTasks.isEmpty(); - } - - @Override - public synchronized void close() { - if (closed) { - return; - } - closed = true; - if (dequeueIterator != null) { - dequeueIterator.close(); - } - options.close(); - db.close(); - } - - private void assertNotClosed() { - if (closed) { - throw new IllegalStateException("Attempt to access closed " + getClass().getSimpleName()); - } - } - - private synchronized boolean markTaskCompleted(final RocksDbTask task) { - return outstandingTasks.remove(task); - } - - private synchronized void handleFailedTask(final RocksDbTask task) { - if (markTaskCompleted(task)) { - add(task.getData()); - } - } - - public static class StorageException extends RuntimeException { - StorageException(final Throwable t) { - super(t); - } - } - - private static class RocksDbTask implements Task { - private final AtomicBoolean completed = new AtomicBoolean(false); - private final RocksDbTaskQueue parentQueue; - private final T data; - private final long key; - - private RocksDbTask(final RocksDbTaskQueue parentQueue, final T data, final long key) { - this.parentQueue = parentQueue; - this.data = data; - this.key = key; - } - - public long getKey() { - return key; - } - - @Override - public T getData() { - return data; - } - - @Override - public void markCompleted() { - if (completed.compareAndSet(false, true)) { - parentQueue.markTaskCompleted(this); - } - } - - @Override - public void markFailed() { - if (completed.compareAndSet(false, true)) { - parentQueue.handleFailedTask(this); - } - } - } -} diff --git a/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/AbstractTaskQueueTest.java b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/AbstractTaskQueueTest.java index a02a7b2ee2..7e6f96a21e 100644 --- a/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/AbstractTaskQueueTest.java +++ b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/AbstractTaskQueueTest.java @@ -30,10 +30,10 @@ abstract class AbstractTaskQueueTest> { @Test public void enqueueAndDequeue() throws Exception { - try (T queue = createQueue()) { - BytesValue one = BytesValue.of(1); - BytesValue two = BytesValue.of(2); - BytesValue three = BytesValue.of(3); + try (final T queue = createQueue()) { + final BytesValue one = BytesValue.of(1); + final BytesValue two = BytesValue.of(2); + final BytesValue three = BytesValue.of(3); assertThat(queue.remove()).isNull(); @@ -54,8 +54,8 @@ abstract class AbstractTaskQueueTest> { @Test public void markTaskFailed() throws Exception { - try (T queue = createQueue()) { - BytesValue value = BytesValue.of(1); + try (final T queue = createQueue()) { + final BytesValue value = BytesValue.of(1); assertThat(queue.isEmpty()).isTrue(); assertThat(queue.allTasksCompleted()).isTrue(); @@ -65,7 +65,7 @@ abstract class AbstractTaskQueueTest> { assertThat(queue.isEmpty()).isFalse(); assertThat(queue.allTasksCompleted()).isFalse(); - Task task = queue.remove(); + final Task task = queue.remove(); assertThat(task).isNotNull(); assertThat(task.getData()).isEqualTo(value); assertThat(queue.isEmpty()).isTrue(); @@ -84,8 +84,8 @@ abstract class AbstractTaskQueueTest> { @Test public void markTaskCompleted() throws Exception { - try (T queue = createQueue()) { - BytesValue value = BytesValue.of(1); + try (final T queue = createQueue()) { + final BytesValue value = BytesValue.of(1); assertThat(queue.isEmpty()).isTrue(); assertThat(queue.allTasksCompleted()).isTrue(); @@ -95,7 +95,7 @@ abstract class AbstractTaskQueueTest> { assertThat(queue.isEmpty()).isFalse(); assertThat(queue.allTasksCompleted()).isFalse(); - Task task = queue.remove(); + final Task task = queue.remove(); assertThat(task).isNotNull(); assertThat(task.getData()).isEqualTo(value); assertThat(queue.isEmpty()).isTrue(); @@ -114,11 +114,11 @@ abstract class AbstractTaskQueueTest> { @Test public void clear() throws Exception { - try (T queue = createQueue()) { - BytesValue one = BytesValue.of(1); - BytesValue two = BytesValue.of(2); - BytesValue three = BytesValue.of(3); - BytesValue four = BytesValue.of(4); + try (final T queue = createQueue()) { + final BytesValue one = BytesValue.of(1); + final BytesValue two = BytesValue.of(2); + final BytesValue three = BytesValue.of(3); + final BytesValue four = BytesValue.of(4); // Fill queue queue.add(one); @@ -145,12 +145,12 @@ abstract class AbstractTaskQueueTest> { @Test public void clear_emptyQueueWithOutstandingTasks() throws Exception { - try (T queue = createQueue()) { - BytesValue one = BytesValue.of(1); + try (final T queue = createQueue()) { + final BytesValue one = BytesValue.of(1); // Add and then remove task queue.add(one); - Task task = queue.remove(); + final Task task = queue.remove(); assertThat(task.getData()).isEqualTo(one); assertThat(queue.isEmpty()).isTrue(); assertThat(queue.allTasksCompleted()).isFalse(); @@ -181,13 +181,13 @@ abstract class AbstractTaskQueueTest> { final CountDownLatch queuingFinished = new CountDownLatch(threadCount); // Start thread for reading values - List> dequeued = new ArrayList<>(); - Thread reader = + final List> dequeued = new ArrayList<>(); + final Thread reader = new Thread( () -> { while (queuingFinished.getCount() > 0 || !queue.isEmpty()) { if (!queue.isEmpty()) { - Task value = queue.remove(); + final Task value = queue.remove(); value.markCompleted(); dequeued.add(value); } diff --git a/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollectionTest.java b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollectionTest.java new file mode 100644 index 0000000000..28e16b6ea4 --- /dev/null +++ b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollectionTest.java @@ -0,0 +1,98 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.tasks; + +import static org.assertj.core.api.Assertions.assertThat; + +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class FlatFileTaskCollectionTest + extends AbstractTaskQueueTest> { + + private static final int ROLL_SIZE = 10; + @Rule public final TemporaryFolder folder = new TemporaryFolder(); + + @Override + protected FlatFileTaskCollection createQueue() throws IOException { + final Path dataDir = folder.newFolder().toPath(); + return createQueue(dataDir); + } + + private FlatFileTaskCollection createQueue(final Path dataDir) { + return new FlatFileTaskCollection<>( + dataDir, Function.identity(), Function.identity(), ROLL_SIZE); + } + + @Test + public void shouldRollFilesWhenSizeExceeded() throws Exception { + final Path dataDir = folder.newFolder().toPath(); + try (final FlatFileTaskCollection queue = createQueue(dataDir)) { + final List tasks = new ArrayList<>(); + + addItem(queue, tasks, 0); + final File[] currentFiles = getCurrentFiles(dataDir); + assertThat(currentFiles).hasSize(1); + final File firstFile = currentFiles[0]; + int tasksInFirstFile = 1; + while (getCurrentFiles(dataDir).length == 1) { + addItem(queue, tasks, tasksInFirstFile); + tasksInFirstFile++; + } + + assertThat(getCurrentFiles(dataDir)).hasSizeGreaterThan(1); + assertThat(getCurrentFiles(dataDir)).contains(firstFile); + + // Add an extra item to be sure we have at least one in a later file + addItem(queue, tasks, 123); + + final List removedTasks = new ArrayList<>(); + // Read through all the items in the first file. + for (int i = 0; i < tasksInFirstFile; i++) { + removedTasks.add(queue.remove().getData()); + } + + // Fully read files should have been removed. + assertThat(getCurrentFiles(dataDir)).doesNotContain(firstFile); + + // Check that all tasks were read correctly. + removedTasks.add(queue.remove().getData()); + assertThat(queue.isEmpty()).isTrue(); + assertThat(removedTasks).isEqualTo(tasks); + } + } + + private void addItem( + final FlatFileTaskCollection queue, + final List tasks, + final int value) { + tasks.add(BytesValue.of(value)); + queue.add(BytesValue.of(value)); + } + + private File[] getCurrentFiles(final Path dataDir) { + return dataDir + .toFile() + .listFiles((dir, name) -> name.startsWith(FlatFileTaskCollection.FILENAME_PREFIX)); + } +} diff --git a/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java deleted file mode 100644 index 835c275d68..0000000000 --- a/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.services.tasks; - -import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; -import tech.pegasys.pantheon.util.bytes.BytesValue; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.function.Function; - -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -public class RocksDbTaskQueueTest extends AbstractTaskQueueTest> { - - @Rule public final TemporaryFolder folder = new TemporaryFolder(); - - @Override - protected RocksDbTaskQueue createQueue() throws IOException { - final Path dataDir = folder.newFolder().toPath(); - return createQueue(dataDir); - } - - private RocksDbTaskQueue createQueue(final Path dataDir) { - return RocksDbTaskQueue.create( - dataDir, Function.identity(), Function.identity(), new NoOpMetricsSystem()); - } -}