@ -14,6 +14,7 @@
* /
package org.hyperledger.besu.ethereum.eth.sync.backwardsync ;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose ;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda ;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda ;
@ -33,7 +34,6 @@ import java.time.Duration;
import java.util.Optional ;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.atomic.AtomicReference ;
import java.util.function.Function ;
import java.util.stream.Stream ;
import com.google.common.annotations.VisibleForTesting ;
@ -43,7 +43,9 @@ import org.slf4j.LoggerFactory;
public class BackwardSyncContext {
private static final Logger LOG = LoggerFactory . getLogger ( BackwardSyncContext . class ) ;
public static final int BATCH_SIZE = 200 ;
private static final int MAX_RETRIES = 100 ;
private static final int DEFAULT_MAX_RETRIES = 20 ;
private static final long DEFAULT_MILLIS_BETWEEN_RETRIES = 5000 ;
protected final ProtocolContext protocolContext ;
private final ProtocolSchedule protocolSchedule ;
@ -58,6 +60,10 @@ public class BackwardSyncContext {
private Optional < Hash > maybeFinalized = Optional . empty ( ) ;
private Optional < Hash > maybeHead = Optional . empty ( ) ;
private final int maxRetries ;
private final long millisBetweenRetries = DEFAULT_MILLIS_BETWEEN_RETRIES ;
public BackwardSyncContext (
final ProtocolContext protocolContext ,
final ProtocolSchedule protocolSchedule ,
@ -65,6 +71,24 @@ public class BackwardSyncContext {
final EthContext ethContext ,
final SyncState syncState ,
final BackwardChain backwardChain ) {
this (
protocolContext ,
protocolSchedule ,
metricsSystem ,
ethContext ,
syncState ,
backwardChain ,
DEFAULT_MAX_RETRIES ) ;
}
public BackwardSyncContext (
final ProtocolContext protocolContext ,
final ProtocolSchedule protocolSchedule ,
final MetricsSystem metricsSystem ,
final EthContext ethContext ,
final SyncState syncState ,
final BackwardChain backwardChain ,
final int maxRetries ) {
this . protocolContext = protocolContext ;
this . protocolSchedule = protocolSchedule ;
@ -72,6 +96,7 @@ public class BackwardSyncContext {
this . metricsSystem = metricsSystem ;
this . syncState = syncState ;
this . backwardChain = backwardChain ;
this . maxRetries = maxRetries ;
}
public synchronized boolean isSyncing ( ) {
@ -135,28 +160,33 @@ public class BackwardSyncContext {
}
private CompletableFuture < Void > prepareBackwardSyncFutureWithRetry ( ) {
return prepareBackwardSyncFutureWithRetry ( maxRetries )
. handle (
( unused , throwable ) - > {
this . currentBackwardSyncFuture . set ( null ) ;
if ( throwable ! = null ) {
throw extractBackwardSyncException ( throwable )
. orElse ( new BackwardSyncException ( throwable ) ) ;
}
return null ;
} ) ;
}
CompletableFuture < Void > f = prepareBackwardSyncFuture ( ) ;
for ( int i = 0 ; i < MAX_RETRIES ; i + + ) {
f =
f . thenApply ( CompletableFuture : : completedFuture )
. exceptionally (
ex - > {
processException ( ex ) ;
return ethContext
. getScheduler ( )
. scheduleFutureTask ( this : : prepareBackwardSyncFuture , Duration . ofSeconds ( 5 ) ) ;
} )
. thenCompose ( Function . identity ( ) ) ;
private CompletableFuture < Void > prepareBackwardSyncFutureWithRetry ( final int retries ) {
if ( retries = = 0 ) {
return CompletableFuture . failedFuture (
new BackwardSyncException ( "Max number of retries " + maxRetries + " reached" ) ) ;
}
return f . handle (
( unused , throwable ) - > {
this . currentBackwardSyncFuture . set ( null ) ;
if ( throwable ! = null ) {
throw extractBackwardSyncException ( throwable )
. orElse ( new BackwardSyncException ( throwable ) ) ;
}
return null ;
return exceptionallyCompose (
prepareBackwardSyncFuture ( ) ,
throwable - > {
processException ( throwable ) ;
return ethContext
. getScheduler ( )
. scheduleFutureTask (
( ) - > prepareBackwardSyncFutureWithRetry ( retries - 1 ) ,
Duration . ofMillis ( millisBetweenRetries ) ) ;
} ) ;
}
@ -167,17 +197,23 @@ public class BackwardSyncContext {
backwardSyncException - > {
if ( backwardSyncException . shouldRestart ( ) ) {
LOG . info (
"Backward sync failed ({}). Current Peers: {}. Retrying in few seconds..." ,
"Backward sync failed ({}). Current Peers: {}. Retrying in "
+ millisBetweenRetries
+ " milliseconds..." ,
backwardSyncException . getMessage ( ) ,
ethContext . getEthPeers ( ) . peerCount ( ) ) ;
return ;
} else {
debugLambda (
LOG , "Not recoverable backward sync exception {}" , throwable : : getMessage ) ;
throw backwardSyncException ;
}
} ,
( ) - >
LOG . warn (
"There was an uncaught exception during Backwards Sync. Retrying in few seconds..." ,
"There was an uncaught exception during Backwards Sync. Retrying in "
+ millisBetweenRetries
+ " milliseconds..." ,
throwable ) ) ;
}
@ -193,7 +229,8 @@ public class BackwardSyncContext {
return Optional . empty ( ) ;
}
private CompletableFuture < Void > prepareBackwardSyncFuture ( ) {
@VisibleForTesting
CompletableFuture < Void > prepareBackwardSyncFuture ( ) {
final MutableBlockchain blockchain = getProtocolContext ( ) . getBlockchain ( ) ;
return new BackwardsSyncAlgorithm (
this ,