@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer ;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer ;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler ;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler ;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition ;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition ;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage ;
import java.time.Duration ;
import java.time.Duration ;
import java.util.ArrayDeque ;
import java.util.ArrayDeque ;
@ -44,7 +45,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
private final SyncTargetChecker syncTargetChecker ;
private final SyncTargetChecker syncTargetChecker ;
private final EthPeer peer ;
private final EthPeer peer ;
private final EthScheduler ethScheduler ;
private final EthScheduler ethScheduler ;
private final int rangeTimeout sPermitted ;
private final int retrie sPermitted ;
private final Duration newHeaderWaitDuration ;
private final Duration newHeaderWaitDuration ;
private final SyncTerminationCondition terminationCondition ;
private final SyncTerminationCondition terminationCondition ;
@ -52,7 +53,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
private BlockHeader lastRangeEnd ;
private BlockHeader lastRangeEnd ;
private boolean reachedEndOfRanges = false ;
private boolean reachedEndOfRanges = false ;
private Optional < CompletableFuture < List < BlockHeader > > > pendingRequests = Optional . empty ( ) ;
private Optional < CompletableFuture < List < BlockHeader > > > pendingRequests = Optional . empty ( ) ;
private int requestFailure Count = 0 ;
private int retry Count = 0 ;
public SyncTargetRangeSource (
public SyncTargetRangeSource (
final RangeHeadersFetcher fetcher ,
final RangeHeadersFetcher fetcher ,
@ -60,7 +61,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
final EthScheduler ethScheduler ,
final EthScheduler ethScheduler ,
final EthPeer peer ,
final EthPeer peer ,
final BlockHeader commonAncestor ,
final BlockHeader commonAncestor ,
final int rangeTimeout sPermitted ,
final int retrie sPermitted ,
final SyncTerminationCondition terminationCondition ) {
final SyncTerminationCondition terminationCondition ) {
this (
this (
fetcher ,
fetcher ,
@ -68,7 +69,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
ethScheduler ,
ethScheduler ,
peer ,
peer ,
commonAncestor ,
commonAncestor ,
rangeTimeout sPermitted ,
retrie sPermitted ,
Duration . ofSeconds ( 5 ) ,
Duration . ofSeconds ( 5 ) ,
terminationCondition ) ;
terminationCondition ) ;
}
}
@ -79,7 +80,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
final EthScheduler ethScheduler ,
final EthScheduler ethScheduler ,
final EthPeer peer ,
final EthPeer peer ,
final BlockHeader commonAncestor ,
final BlockHeader commonAncestor ,
final int rangeTimeout sPermitted ,
final int retrie sPermitted ,
final Duration newHeaderWaitDuration ,
final Duration newHeaderWaitDuration ,
final SyncTerminationCondition terminationCondition ) {
final SyncTerminationCondition terminationCondition ) {
this . fetcher = fetcher ;
this . fetcher = fetcher ;
@ -87,7 +88,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
this . ethScheduler = ethScheduler ;
this . ethScheduler = ethScheduler ;
this . peer = peer ;
this . peer = peer ;
this . lastRangeEnd = commonAncestor ;
this . lastRangeEnd = commonAncestor ;
this . rangeTimeoutsPermitted = rangeTimeout sPermitted ;
this . retriesPermitted = retrie sPermitted ;
this . newHeaderWaitDuration = newHeaderWaitDuration ;
this . newHeaderWaitDuration = newHeaderWaitDuration ;
this . terminationCondition = terminationCondition ;
this . terminationCondition = terminationCondition ;
}
}
@ -96,7 +97,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
public boolean hasNext ( ) {
public boolean hasNext ( ) {
return terminationCondition . shouldContinueDownload ( )
return terminationCondition . shouldContinueDownload ( )
& & ( ! retrievedRanges . isEmpty ( )
& & ( ! retrievedRanges . isEmpty ( )
| | ( requestFailureCount < rangeTimeout sPermitted
| | ( retryCount < retrie sPermitted
& & syncTargetChecker . shouldContinueDownloadingFromSyncTarget ( peer , lastRangeEnd )
& & syncTargetChecker . shouldContinueDownloadingFromSyncTarget ( peer , lastRangeEnd )
& & ! reachedEndOfRanges ) ) ;
& & ! reachedEndOfRanges ) ) ;
}
}
@ -148,13 +149,21 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
pendingRequest . get ( newHeaderWaitDuration . toMillis ( ) , MILLISECONDS ) ;
pendingRequest . get ( newHeaderWaitDuration . toMillis ( ) , MILLISECONDS ) ;
this . pendingRequests = Optional . empty ( ) ;
this . pendingRequests = Optional . empty ( ) ;
if ( newHeaders . isEmpty ( ) ) {
if ( newHeaders . isEmpty ( ) ) {
requestFailureCount + + ;
retryCount + + ;
if ( retryCount > = retriesPermitted ) {
LOG . atDebug ( )
. setMessage (
"Disconnecting target peer for providing useless or empty range header: {}." )
. addArgument ( peer )
. log ( ) ;
peer . disconnect ( DisconnectMessage . DisconnectReason . USELESS_PEER_USELESS_RESPONSES ) ;
}
} else {
} else {
requestFailureCount = 0 ;
retry Count = 0 ;
}
for ( final BlockHeader header : newHeaders ) {
for ( final BlockHeader header : newHeaders ) {
retrievedRanges . add ( new SyncTargetRange ( peer , lastRangeEnd , header ) ) ;
retrievedRanges . add ( new SyncTargetRange ( peer , lastRangeEnd , header ) ) ;
lastRangeEnd = header ;
lastRangeEnd = header ;
}
}
}
return retrievedRanges . poll ( ) ;
return retrievedRanges . poll ( ) ;
} catch ( final InterruptedException e ) {
} catch ( final InterruptedException e ) {
@ -163,7 +172,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
} catch ( final ExecutionException e ) {
} catch ( final ExecutionException e ) {
LOG . debug ( "Failed to retrieve new range headers" , e ) ;
LOG . debug ( "Failed to retrieve new range headers" , e ) ;
this . pendingRequests = Optional . empty ( ) ;
this . pendingRequests = Optional . empty ( ) ;
requestFailure Count + + ;
retry Count + + ;
return null ;
return null ;
} catch ( final TimeoutException e ) {
} catch ( final TimeoutException e ) {
return null ;
return null ;