diff --git a/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollection.java b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollection.java index 49e8d1da7a..5cb0b12cf3 100644 --- a/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollection.java +++ b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollection.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -39,7 +40,7 @@ public class FlatFileTaskCollection implements TaskCollection { private final Function deserializer; private final long rollWhenFileSizeExceedsBytes; - private final ByteBuffer lengthBuffer = ByteBuffer.allocate(Integer.SIZE); + private final ByteBuffer lengthBuffer = ByteBuffer.allocate(Integer.BYTES); private FileChannel readFileChannel; private FileChannel writeFileChannel; @@ -135,13 +136,13 @@ public class FlatFileTaskCollection implements TaskCollection { private void writeTaskData(final BytesValue data) throws IOException { final long offset = writeFileChannel.size(); writeDataLength(data.size(), offset); - writeFileChannel.write(ByteBuffer.wrap(data.getArrayUnsafe()), offset + Integer.SIZE); + writeFileChannel.write(ByteBuffer.wrap(data.getArrayUnsafe()), offset + Integer.BYTES); } private int readDataLength() throws IOException { lengthBuffer.position(0); - lengthBuffer.limit(Integer.SIZE); - readBytes(lengthBuffer, Integer.SIZE); + lengthBuffer.limit(Integer.BYTES); + readBytes(lengthBuffer, Integer.BYTES); return lengthBuffer.getInt(0); } @@ -182,6 +183,16 @@ public class FlatFileTaskCollection implements TaskCollection { return size() == 0; } + @VisibleForTesting + int getReadFileNumber() { + return readFileNumber; + } + + @VisibleForTesting + int getWriteFileNumber() { + return writeFileNumber; + } + @Override public synchronized void clear() { outstandingTasks.clear(); diff --git a/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollectionTest.java b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollectionTest.java index 28e16b6ea4..9c8e1cfd93 100644 --- a/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollectionTest.java +++ b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/FlatFileTaskCollectionTest.java @@ -16,7 +16,6 @@ import static org.assertj.core.api.Assertions.assertThat; import tech.pegasys.pantheon.util.bytes.BytesValue; -import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; @@ -51,20 +50,19 @@ public class FlatFileTaskCollectionTest final List tasks = new ArrayList<>(); addItem(queue, tasks, 0); - final File[] currentFiles = getCurrentFiles(dataDir); - assertThat(currentFiles).hasSize(1); - final File firstFile = currentFiles[0]; + assertThat(queue.getWriteFileNumber()).isEqualTo(0); int tasksInFirstFile = 1; - while (getCurrentFiles(dataDir).length == 1) { + while (queue.getWriteFileNumber() == 0) { addItem(queue, tasks, tasksInFirstFile); tasksInFirstFile++; } - assertThat(getCurrentFiles(dataDir)).hasSizeGreaterThan(1); - assertThat(getCurrentFiles(dataDir)).contains(firstFile); + assertThat(queue.getWriteFileNumber()).isGreaterThan(0); + assertThat(queue.getReadFileNumber()).isEqualTo(0); - // Add an extra item to be sure we have at least one in a later file + // Add extra items to be sure we have at least one in a later file addItem(queue, tasks, 123); + addItem(queue, tasks, 124); final List removedTasks = new ArrayList<>(); // Read through all the items in the first file. @@ -72,8 +70,9 @@ public class FlatFileTaskCollectionTest removedTasks.add(queue.remove().getData()); } - // Fully read files should have been removed. - assertThat(getCurrentFiles(dataDir)).doesNotContain(firstFile); + // read one more to make sure we are reading from the next file + removedTasks.add(queue.remove().getData()); + assertThat(queue.getReadFileNumber()).isEqualTo(1); // Check that all tasks were read correctly. removedTasks.add(queue.remove().getData()); @@ -89,10 +88,4 @@ public class FlatFileTaskCollectionTest tasks.add(BytesValue.of(value)); queue.add(BytesValue.of(value)); } - - private File[] getCurrentFiles(final Path dataDir) { - return dataDir - .toFile() - .listFiles((dir, name) -> name.startsWith(FlatFileTaskCollection.FILENAME_PREFIX)); - } }