Support async processing while maintaining output order (#1215)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 56347981c6
commit 362219d908
  1. 42
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java
  2. 26
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java
  3. 82
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java
  4. 55
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java

@ -12,9 +12,11 @@
*/
package tech.pegasys.pantheon.services.pipeline;
import static java.util.concurrent.CompletableFuture.completedFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -27,14 +29,19 @@ import org.apache.logging.log4j.Logger;
class AsyncOperationProcessor<I, O> implements Processor<I, O> {
private static final Logger LOG = LogManager.getLogger();
private final Function<I, CompletableFuture<O>> processor;
private final Collection<CompletableFuture<O>> inProgress;
private final List<CompletableFuture<O>> inProgress;
private CompletableFuture<?> nextOutputAvailableFuture = completedFuture(null);
private final boolean preserveOrder;
private final int maxConcurrency;
public AsyncOperationProcessor(
final Function<I, CompletableFuture<O>> processor, final int maxConcurrency) {
final Function<I, CompletableFuture<O>> processor,
final int maxConcurrency,
final boolean preserveOrder) {
this.processor = processor;
this.maxConcurrency = maxConcurrency;
this.inProgress = new ArrayList<>(maxConcurrency);
this.preserveOrder = preserveOrder;
}
@Override
@ -47,6 +54,7 @@ class AsyncOperationProcessor<I, O> implements Processor<I, O> {
// schedule the output.
final Thread stageThread = Thread.currentThread();
inProgress.add(future);
updateNextOutputAvailableFuture();
future.whenComplete((result, error) -> stageThread.interrupt());
}
outputCompletedTasks(outputPipe);
@ -74,26 +82,48 @@ class AsyncOperationProcessor<I, O> implements Processor<I, O> {
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for processing to complete", e);
} catch (final ExecutionException e) {
LOG.error("Processing failed and we don't handle exceptions properly yet", e);
throw new RuntimeException("Async operation failed", e);
} catch (final TimeoutException e) {
// Ignore and go back around the loop.
}
}
@SuppressWarnings("rawtypes")
private void waitForAnyFutureToComplete()
throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture.anyOf(inProgress.toArray(new CompletableFuture[0])).get(1, TimeUnit.SECONDS);
nextOutputAvailableFuture.get(1, TimeUnit.SECONDS);
}
private void outputCompletedTasks(final WritePipe<O> outputPipe) {
boolean inProgressChanged = false;
for (final Iterator<CompletableFuture<O>> i = inProgress.iterator(); i.hasNext(); ) {
final CompletableFuture<O> process = i.next();
final O result = process.getNow(null);
if (result != null) {
inProgressChanged = true;
outputPipe.put(result);
i.remove();
} else if (preserveOrder) {
break;
}
}
if (inProgressChanged) {
updateNextOutputAvailableFuture();
}
}
/**
* CompletableFuture.anyOf adds a completion handler to every future its passed so if we call it
* too often we can quickly wind up with thousands of completion handlers which take a long time
* to iterate through and notify. So only create it when the futures it covers have actually
* changed.
*/
@SuppressWarnings("rawtypes")
private void updateNextOutputAvailableFuture() {
if (preserveOrder) {
nextOutputAvailableFuture = inProgress.isEmpty() ? completedFuture(null) : inProgress.get(0);
} else {
nextOutputAvailableFuture =
CompletableFuture.anyOf(inProgress.toArray(new CompletableFuture[0]));
}
}
}

@ -164,7 +164,31 @@ public class PipelineBuilder<I, T> {
final String stageName,
final Function<T, CompletableFuture<O>> processor,
final int maxConcurrency) {
return addStage(new AsyncOperationProcessor<>(processor, maxConcurrency), stageName);
return addStage(new AsyncOperationProcessor<>(processor, maxConcurrency, false), stageName);
}
/**
* Adds a 1-to-1, asynchronous processing stage to the pipeline. A single thread reads items from
* the input and calls <i>processor</i> to begin processing. While a single thread is used to
* begin processing, up to <i>maxConcurrency</i> items may be in progress concurrently. As each
* returned {@link CompletableFuture} completes successfully the result is passed to the next
* stage in order.
*
* <p>If the returned {@link CompletableFuture} completes exceptionally the pipeline will abort.
*
* <p>Note: While processing may occur concurrently, order is preserved when results are output.
*
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param processor the processing to apply to each item.
* @param maxConcurrency the maximum number of items being processed concurrently.
* @param <O> the output type for this processing step.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<I, O> thenProcessAsyncOrdered(
final String stageName,
final Function<T, CompletableFuture<O>> processor,
final int maxConcurrency) {
return addStage(new AsyncOperationProcessor<>(processor, maxConcurrency, true), stageName);
}
/**

@ -14,6 +14,7 @@ package tech.pegasys.pantheon.services.pipeline;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -33,11 +34,10 @@ public class AsyncOperationProcessorTest {
@SuppressWarnings("unchecked")
private final WritePipe<String> writePipe = mock(WritePipe.class);
private final AsyncOperationProcessor<CompletableFuture<String>, String> processor =
new AsyncOperationProcessor<>(Function.identity(), 3);
@Test
public void shouldImmediatelyOutputTasksThatAreAlreadyCompleteEvenIfOutputPipeIsFull() {
final AsyncOperationProcessor<CompletableFuture<String>, String> processor =
createProcessor(false);
when(writePipe.hasRemainingCapacity()).thenReturn(false);
when(readPipe.get()).thenReturn(completedFuture("a"));
@ -47,7 +47,8 @@ public class AsyncOperationProcessorTest {
@Test
public void shouldNotExceedConcurrentJobLimit() {
final AsyncOperationProcessor<CompletableFuture<String>, String> processor =
createProcessor(false);
final CompletableFuture<String> task1 = new CompletableFuture<>();
final CompletableFuture<String> task2 = new CompletableFuture<>();
final CompletableFuture<String> task3 = new CompletableFuture<>();
@ -77,6 +78,8 @@ public class AsyncOperationProcessorTest {
@Test
public void shouldOutputRemainingInProgressTasksWhenFinalizing() {
final AsyncOperationProcessor<CompletableFuture<String>, String> processor =
createProcessor(false);
final CompletableFuture<String> task1 = new CompletableFuture<>();
final CompletableFuture<String> task2 = new CompletableFuture<>();
when(readPipe.get()).thenReturn(task1).thenReturn(task2);
@ -98,6 +101,8 @@ public class AsyncOperationProcessorTest {
@Test
public void shouldCancelInProgressTasksWhenAborted() {
final AsyncOperationProcessor<CompletableFuture<String>, String> processor =
createProcessor(false);
final CompletableFuture<String> task1 = new CompletableFuture<>();
final CompletableFuture<String> task2 = new CompletableFuture<>();
when(readPipe.get()).thenReturn(task1).thenReturn(task2);
@ -114,6 +119,8 @@ public class AsyncOperationProcessorTest {
@Test
public void shouldInterruptThreadWhenFutureCompletes() {
// Ensures that if we're waiting for the next input we wake up and output completed tasks
final AsyncOperationProcessor<CompletableFuture<String>, String> processor =
createProcessor(false);
final CompletableFuture<String> task1 = new CompletableFuture<>();
when(readPipe.get()).thenReturn(task1);
@ -125,4 +132,71 @@ public class AsyncOperationProcessorTest {
assertThat(Thread.currentThread().isInterrupted()).isTrue();
}
@Test
public void shouldPreserveOrderWhenRequested() {
final AsyncOperationProcessor<CompletableFuture<String>, String> processor =
createProcessor(true);
final CompletableFuture<String> task1 = new CompletableFuture<>();
final CompletableFuture<String> task2 = new CompletableFuture<>();
final CompletableFuture<String> task3 = new CompletableFuture<>();
final CompletableFuture<String> task4 = new CompletableFuture<>();
when(readPipe.get()).thenReturn(task1).thenReturn(task2).thenReturn(task3).thenReturn(task4);
// 3 tasks started
processor.processNextInput(readPipe, writePipe);
processor.processNextInput(readPipe, writePipe);
processor.processNextInput(readPipe, writePipe);
verify(readPipe, times(3)).get();
// Reached limit of concurrent tasks so this round does nothing.
processor.processNextInput(readPipe, writePipe);
verify(readPipe, times(3)).get();
// Second task completes but shouldn't be output because task1 is not complete yet
task2.complete("b");
processor.processNextInput(readPipe, writePipe);
verify(readPipe, times(3)).get();
verifyZeroInteractions(writePipe);
task1.complete("a");
// Next round will output the two completed tasks in order
processor.processNextInput(readPipe, writePipe);
verify(writePipe).put("a");
verify(writePipe).put("b");
// And so now we are able to start another one.
processor.processNextInput(readPipe, writePipe);
verify(readPipe, times(4)).get();
// And should finalize in order
task4.complete("d");
task3.complete("c");
processor.finalize(writePipe);
verify(writePipe).put("c");
verify(writePipe).put("d");
}
@Test
public void shouldThrowExceptionWhenFutureCompletesExceptionally() {
final AsyncOperationProcessor<CompletableFuture<String>, String> processor =
createProcessor(false);
final CompletableFuture<String> task1 = new CompletableFuture<>();
when(readPipe.get()).thenReturn(task1);
processor.processNextInput(readPipe, writePipe);
final Exception exception = new IndexOutOfBoundsException("Oh dear");
task1.completeExceptionally(exception);
assertThatThrownBy(() -> processor.processNextInput(readPipe, writePipe))
.hasRootCause(exception);
}
private AsyncOperationProcessor<CompletableFuture<String>, String> createProcessor(
final boolean preserveOrder) {
return new AsyncOperationProcessor<>(Function.identity(), 3, preserveOrder);
}
}

@ -143,6 +143,53 @@ public class PipelineBuilderTest {
"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15");
}
@Test
public void shouldProcessAsyncOrdered() throws Exception {
final Map<Integer, CompletableFuture<String>> futures = new ConcurrentHashMap<>();
final List<String> output = new CopyOnWriteArrayList<>();
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 15, NO_OP_LABELLED_2_COUNTER)
.thenProcessAsyncOrdered(
"toString",
value -> {
final CompletableFuture<String> future = new CompletableFuture<>();
futures.put(value, future);
return future;
},
8)
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);
waitForSize(futures.values(), 8);
// Complete current items out of order, except for 3
for (final int value : asList(2, 7, 1, 5, 8, 4, 6)) {
futures.get(value).complete(Integer.toString(value));
}
// 1 and 2 should be output and two new async processes started
waitForSize(output, 2);
assertThat(output).containsExactly("1", "2");
waitForSize(futures.values(), 10);
// Complete task 3 and all the remaining items should now be started
futures.get(3).complete("3");
waitForSize(futures.values(), 15);
// And the first 8 items should have been output
waitForSize(output, 8);
assertThat(output).containsExactly("1", "2", "3", "4", "5", "6", "7", "8");
// Complete the remaining items.
for (final int value : asList(14, 11, 10, 15, 12, 13, 9)) {
futures.get(value).complete(Integer.toString(value));
}
// And the final result should have everything in order
result.get(10, SECONDS);
assertThat(output)
.containsExactlyInAnyOrder(
"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15");
}
@Test
public void shouldLimitInFlightProcessesWhenProcessingAsync() throws Exception {
final List<String> output = new ArrayList<>();
@ -376,16 +423,18 @@ public class PipelineBuilderTest {
.thenProcess("map", Function.identity())
.thenProcessInParallel("parallel", Function.identity(), 3)
.thenProcessAsync("async", CompletableFuture::completedFuture, 3)
.thenProcessAsyncOrdered("asyncOrdered", CompletableFuture::completedFuture, 3)
.inBatches(4)
.thenFlatMap("flatMap", List::stream, 10)
.andFinishWith("finish", new ArrayList<>()::add);
pipeline.start(executorService).get(10, SECONDS);
final List<String> stepNames = asList("input", "map", "parallel", "async", "flatMap");
final List<String> stepNames =
asList("input", "map", "parallel", "async", "asyncOrdered", "flatMap");
final List<String> expectedMetricNames =
Stream.concat(
Stream.of("async_outputPipe-batches"),
Stream.of("asyncOrdered_outputPipe-batches"),
stepNames.stream()
.map(stageName -> stageName + "_outputPipe")
.flatMap(
@ -401,7 +450,7 @@ public class PipelineBuilderTest {
.filter(name -> !name.endsWith("-batches") && !name.endsWith("-aborted"))
.forEach(metric -> assertThat(counters.get(metric).count).hasValue(15));
assertThat(counters.get("async_outputPipe-batches").count).hasValueBetween(4, 15);
assertThat(counters.get("asyncOrdered_outputPipe-batches").count).hasValueBetween(4, 15);
}
private void waitForSize(final Collection<?> collection, final int targetSize) {

Loading…
Cancel
Save