Handle the pipeline being aborted while finalizing an async operation. (#1372)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 3d455b38a6
commit 412d2ab844
  1. 7
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java
  2. 7
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ProcessingStage.java
  3. 4
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Processor.java
  4. 4
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java
  5. 26
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java

@ -64,10 +64,9 @@ class AsyncOperationProcessor<I, O> implements Processor<I, O> {
}
@Override
public void finalize(final WritePipe<O> outputPipe) {
while (!inProgress.isEmpty()) {
outputNextCompletedTask(outputPipe);
}
public boolean attemptFinalization(final WritePipe<O> outputPipe) {
outputNextCompletedTask(outputPipe);
return inProgress.isEmpty();
}
@Override

@ -38,7 +38,12 @@ class ProcessingStage<I, O> implements Stage {
if (inputPipe.isAborted()) {
processor.abort();
}
processor.finalize(outputPipe);
while (!processor.attemptFinalization(outputPipe)) {
if (inputPipe.isAborted()) {
processor.abort();
break;
}
}
outputPipe.close();
}

@ -15,7 +15,9 @@ package tech.pegasys.pantheon.services.pipeline;
interface Processor<I, O> {
void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe);
default void finalize(final WritePipe<O> outputPipe) {}
default boolean attemptFinalization(final WritePipe<O> outputPipe) {
return true;
}
default void abort() {}
}

@ -94,7 +94,7 @@ public class AsyncOperationProcessorTest {
task2.complete("b");
// Processing
processor.finalize(writePipe);
processor.attemptFinalization(writePipe);
verify(writePipe).put("a");
verify(writePipe).put("b");
}
@ -174,7 +174,7 @@ public class AsyncOperationProcessorTest {
// And should finalize in order
task4.complete("d");
task3.complete("c");
processor.finalize(writePipe);
assertThat(processor.attemptFinalization(writePipe)).isTrue();
verify(writePipe).put("c");
verify(writePipe).put("d");
}

@ -17,6 +17,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import java.util.Locale;
@ -51,6 +52,7 @@ public class ProcessingStageTest {
@Test
public void shouldCallSingleStepStageForEachInput() {
when(singleStep.attemptFinalization(outputPipe)).thenReturn(true);
inputPipe.put("A");
inputPipe.put("B");
inputPipe.put("C");
@ -68,22 +70,42 @@ public class ProcessingStageTest {
@Test
public void shouldFinalizeSingleStepStageAndCloseOutputPipeWhenInputCloses() {
when(singleStep.attemptFinalization(outputPipe)).thenReturn(true);
inputPipe.close();
stage.run();
verify(singleStep).finalize(outputPipe);
verify(singleStep).attemptFinalization(outputPipe);
verifyNoMoreInteractions(singleStep);
assertThat(outputPipe.isOpen()).isFalse();
}
@Test
public void shouldAbortIfPipeIsCancelledWhileAttemptingToFinalise() {
when(singleStep.attemptFinalization(outputPipe))
.thenAnswer(
invocation -> {
inputPipe.abort();
return false;
});
inputPipe.close();
stage.run();
verify(singleStep).attemptFinalization(outputPipe);
verify(singleStep).abort();
verifyNoMoreInteractions(singleStep);
assertThat(outputPipe.isOpen()).isFalse();
}
@Test
public void shouldAbortProcessorIfReadPipeIsAborted() {
when(singleStep.attemptFinalization(outputPipe)).thenReturn(true);
inputPipe.abort();
stage.run();
verify(singleStep).abort();
verify(singleStep).finalize(outputPipe);
verify(singleStep).attemptFinalization(outputPipe);
assertThat(outputPipe.isOpen()).isFalse();
}
}

Loading…
Cancel
Save