@ -12,8 +12,6 @@
* /
* /
package tech.pegasys.pantheon.ethereum.eth.manager.task ;
package tech.pegasys.pantheon.ethereum.eth.manager.task ;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext ;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer ;
import tech.pegasys.pantheon.metrics.LabelledMetric ;
import tech.pegasys.pantheon.metrics.LabelledMetric ;
import tech.pegasys.pantheon.metrics.OperationTimer ;
import tech.pegasys.pantheon.metrics.OperationTimer ;
@ -28,61 +26,69 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager ;
import org.apache.logging.log4j.LogManager ;
import org.apache.logging.log4j.Logger ;
import org.apache.logging.log4j.Logger ;
public abstract class AbstractPipelinedPeer Task < I , O > extends AbstractPeer Task < List < O > > {
public abstract class AbstractPipelinedTask < I , O > extends AbstractEth Task < List < O > > {
private static final Logger LOG = LogManager . getLogger ( ) ;
private static final Logger LOG = LogManager . getLogger ( ) ;
static final int TIMEOUT_MS = 1000 ;
static final int TIMEOUT_MS = 1000 ;
private BlockingQueue < I > inboundQueue ;
private final BlockingQueue < I > inboundQueue ;
private BlockingQueue < O > outboundQueue ;
private final BlockingQueue < O > outboundQueue ;
private List < O > results ;
private final List < O > results ;
private boolean shuttingDown = false ;
private boolean shuttingDown = false ;
private AtomicReference < Throwable > processingException = new AtomicReference < > ( null ) ;
private final AtomicReference < Throwable > processingException = new AtomicReference < > ( null ) ;
protected AbstractPipelinedPeer Task (
protected AbstractPipelinedTask (
final BlockingQueue < I > inboundQueue ,
final BlockingQueue < I > inboundQueue ,
final int outboundBacklogSize ,
final int outboundBacklogSize ,
final EthContext ethContext ,
final LabelledMetric < OperationTimer > ethTasksTimer ) {
final LabelledMetric < OperationTimer > ethTasksTimer ) {
super ( ethContext , eth TasksTimer ) ;
super ( ethTasksTimer ) ;
this . inboundQueue = inboundQueue ;
this . inboundQueue = inboundQueue ;
outboundQueue = new LinkedBlockingQueue < > ( outboundBacklogSize ) ;
outboundQueue = new LinkedBlockingQueue < > ( outboundBacklogSize ) ;
results = new ArrayList < > ( ) ;
results = new ArrayList < > ( ) ;
}
}
@Override
@Override
protected void executeTaskWithPeer ( final EthPeer peer ) {
protected void executeTask ( ) {
Optional < I > previousInput = Optional . empty ( ) ;
Optional < I > previousInput = Optional . empty ( ) ;
while ( ! isDone ( ) & & processingException . get ( ) = = null ) {
try {
if ( shuttingDown & & inboundQueue . isEmpty ( ) ) {
while ( ! isDone ( ) & & processingException . get ( ) = = null ) {
break ;
if ( shuttingDown & & inboundQueue . isEmpty ( ) ) {
}
break ;
final I input ;
}
try {
final I input ;
input = inboundQueue . poll ( TIMEOUT_MS , TimeUnit . MILLISECONDS ) ;
try {
if ( input = = null ) {
input = inboundQueue . poll ( TIMEOUT_MS , TimeUnit . MILLISECONDS ) ;
// timed out waiting for a result
if ( input = = null ) {
// timed out waiting for a result
continue ;
}
} catch ( final InterruptedException e ) {
// this is expected
continue ;
continue ;
}
}
} catch ( final InterruptedException e ) {
final Optional < O > output = processStep ( input , previousInput ) ;
// this is expected
output . ifPresent (
continue ;
o - > {
while ( ! isDone ( ) ) {
try {
if ( outboundQueue . offer ( o , 1 , TimeUnit . SECONDS ) ) {
results . add ( o ) ;
break ;
}
} catch ( final InterruptedException e ) {
processingException . compareAndSet ( null , e ) ;
break ;
}
}
} ) ;
previousInput = Optional . of ( input ) ;
}
}
final Optional < O > output = processStep ( input , previousInput , peer ) ;
} catch ( final RuntimeException e ) {
output . ifPresent (
processingException . compareAndSet ( null , e ) ;
o - > {
try {
outboundQueue . put ( o ) ;
} catch ( final InterruptedException e ) {
processingException . compareAndSet ( null , e ) ;
}
results . add ( o ) ;
} ) ;
previousInput = Optional . of ( input ) ;
}
}
if ( processingException . get ( ) = = null ) {
if ( processingException . get ( ) = = null ) {
result . get ( ) . complete ( new PeerTaskResult < > ( peer , results ) ) ;
result . get ( ) . complete ( results ) ;
} else {
} else {
result . get ( ) . completeExceptionally ( processingException . get ( ) ) ;
result . get ( ) . completeExceptionally ( processingException . get ( ) ) ;
}
}
@ -105,5 +111,5 @@ public abstract class AbstractPipelinedPeerTask<I, O> extends AbstractPeerTask<L
cancel ( ) ;
cancel ( ) ;
}
}
protected abstract Optional < O > processStep ( I input , Optional < I > previousInput , EthPeer peer ) ;
protected abstract Optional < O > processStep ( I input , Optional < I > previousInput ) ;
}
}