@ -41,6 +41,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors ;
import java.util.concurrent.Executors ;
import java.util.concurrent.Semaphore ;
import java.util.concurrent.Semaphore ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicReference ;
import com.google.common.base.MoreObjects ;
import com.google.common.base.MoreObjects ;
import com.google.common.base.Stopwatch ;
import com.google.common.base.Stopwatch ;
@ -98,6 +99,7 @@ public class RlpBlockImporter implements Closeable {
rlp , ScheduleBasedBlockHeaderFunctions . create ( protocolSchedule ) ) ) ) {
rlp , ScheduleBasedBlockHeaderFunctions . create ( protocolSchedule ) ) ) ) {
BlockHeader previousHeader = null ;
BlockHeader previousHeader = null ;
CompletableFuture < Void > previousBlockFuture = null ;
CompletableFuture < Void > previousBlockFuture = null ;
final AtomicReference < Throwable > threadedException = new AtomicReference < > ( ) ;
while ( iterator . hasNext ( ) ) {
while ( iterator . hasNext ( ) ) {
final Block block = iterator . next ( ) ;
final Block block = iterator . next ( ) ;
final BlockHeader header = block . getHeader ( ) ;
final BlockHeader header = block . getHeader ( ) ;
@ -132,7 +134,12 @@ public class RlpBlockImporter implements Closeable {
}
}
try {
try {
blockBacklog . acquire ( ) ;
do {
final Throwable t = ( Exception ) threadedException . get ( ) ;
if ( t ! = null ) {
throw new RuntimeException ( "Error importing block" , t ) ;
}
} while ( ! blockBacklog . tryAcquire ( 1 , SECONDS ) ) ;
} catch ( final InterruptedException e ) {
} catch ( final InterruptedException e ) {
LOG . error ( "Interrupted adding to backlog." , e ) ;
LOG . error ( "Interrupted adding to backlog." , e ) ;
break ;
break ;
@ -142,6 +149,11 @@ public class RlpBlockImporter implements Closeable {
calculationFutures ,
calculationFutures ,
( ) - > evaluateBlock ( context , block , header , protocolSpec , skipPowValidation ) ,
( ) - > evaluateBlock ( context , block , header , protocolSpec , skipPowValidation ) ,
importExecutor ) ;
importExecutor ) ;
previousBlockFuture . exceptionally (
exception - > {
threadedException . set ( exception ) ;
return null ;
} ) ;
+ + count ;
+ + count ;
previousHeader = header ;
previousHeader = header ;
@ -251,6 +263,7 @@ public class RlpBlockImporter implements Closeable {
public void close ( ) {
public void close ( ) {
validationExecutor . shutdownNow ( ) ;
validationExecutor . shutdownNow ( ) ;
try {
try {
//noinspection ResultOfMethodCallIgnored
validationExecutor . awaitTermination ( 5 , SECONDS ) ;
validationExecutor . awaitTermination ( 5 , SECONDS ) ;
} catch ( final Exception e ) {
} catch ( final Exception e ) {
LOG . error ( "Error shutting down validatorExecutor." , e ) ;
LOG . error ( "Error shutting down validatorExecutor." , e ) ;
@ -258,6 +271,7 @@ public class RlpBlockImporter implements Closeable {
importExecutor . shutdownNow ( ) ;
importExecutor . shutdownNow ( ) ;
try {
try {
//noinspection ResultOfMethodCallIgnored
importExecutor . awaitTermination ( 5 , SECONDS ) ;
importExecutor . awaitTermination ( 5 , SECONDS ) ;
} catch ( final Exception e ) {
} catch ( final Exception e ) {
LOG . error ( "Error shutting down importExecutor" , e ) ;
LOG . error ( "Error shutting down importExecutor" , e ) ;