|
|
@ -15,7 +15,6 @@ |
|
|
|
package org.hyperledger.besu.ethereum.eth.manager.peertask; |
|
|
|
package org.hyperledger.besu.ethereum.eth.manager.peertask; |
|
|
|
|
|
|
|
|
|
|
|
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
|
|
|
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
|
|
|
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; |
|
|
|
|
|
|
|
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; |
|
|
|
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; |
|
|
|
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; |
|
|
|
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; |
|
|
|
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; |
|
|
|
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; |
|
|
@ -27,7 +26,6 @@ import org.hyperledger.besu.plugin.services.metrics.OperationTimer; |
|
|
|
import java.util.Collection; |
|
|
|
import java.util.Collection; |
|
|
|
import java.util.HashSet; |
|
|
|
import java.util.HashSet; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.concurrent.CompletableFuture; |
|
|
|
|
|
|
|
import java.util.concurrent.ExecutionException; |
|
|
|
import java.util.concurrent.ExecutionException; |
|
|
|
import java.util.concurrent.TimeoutException; |
|
|
|
import java.util.concurrent.TimeoutException; |
|
|
|
|
|
|
|
|
|
|
@ -39,18 +37,15 @@ public class PeerTaskExecutor { |
|
|
|
public static final int NO_RETRIES = 0; |
|
|
|
public static final int NO_RETRIES = 0; |
|
|
|
private final PeerSelector peerSelector; |
|
|
|
private final PeerSelector peerSelector; |
|
|
|
private final PeerTaskRequestSender requestSender; |
|
|
|
private final PeerTaskRequestSender requestSender; |
|
|
|
private final EthScheduler ethScheduler; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final LabelledMetric<OperationTimer> requestTimer; |
|
|
|
private final LabelledMetric<OperationTimer> requestTimer; |
|
|
|
|
|
|
|
|
|
|
|
public PeerTaskExecutor( |
|
|
|
public PeerTaskExecutor( |
|
|
|
final PeerSelector peerSelector, |
|
|
|
final PeerSelector peerSelector, |
|
|
|
final PeerTaskRequestSender requestSender, |
|
|
|
final PeerTaskRequestSender requestSender, |
|
|
|
final EthScheduler ethScheduler, |
|
|
|
|
|
|
|
final MetricsSystem metricsSystem) { |
|
|
|
final MetricsSystem metricsSystem) { |
|
|
|
this.peerSelector = peerSelector; |
|
|
|
this.peerSelector = peerSelector; |
|
|
|
this.requestSender = requestSender; |
|
|
|
this.requestSender = requestSender; |
|
|
|
this.ethScheduler = ethScheduler; |
|
|
|
|
|
|
|
requestTimer = |
|
|
|
requestTimer = |
|
|
|
metricsSystem.createLabelledTimer( |
|
|
|
metricsSystem.createLabelledTimer( |
|
|
|
BesuMetricCategory.PEERS, |
|
|
|
BesuMetricCategory.PEERS, |
|
|
@ -87,11 +82,6 @@ public class PeerTaskExecutor { |
|
|
|
return executorResult; |
|
|
|
return executorResult; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAsync(final PeerTask<T> peerTask) { |
|
|
|
|
|
|
|
return ethScheduler.scheduleSyncWorkerTask( |
|
|
|
|
|
|
|
() -> CompletableFuture.completedFuture(execute(peerTask))); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public <T> PeerTaskExecutorResult<T> executeAgainstPeer( |
|
|
|
public <T> PeerTaskExecutorResult<T> executeAgainstPeer( |
|
|
|
final PeerTask<T> peerTask, final EthPeer peer) { |
|
|
|
final PeerTask<T> peerTask, final EthPeer peer) { |
|
|
|
MessageData requestMessageData = peerTask.getRequestMessage(); |
|
|
|
MessageData requestMessageData = peerTask.getRequestMessage(); |
|
|
@ -144,12 +134,6 @@ public class PeerTaskExecutor { |
|
|
|
return executorResult; |
|
|
|
return executorResult; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAgainstPeerAsync( |
|
|
|
|
|
|
|
final PeerTask<T> peerTask, final EthPeer peer) { |
|
|
|
|
|
|
|
return ethScheduler.scheduleSyncWorkerTask( |
|
|
|
|
|
|
|
() -> CompletableFuture.completedFuture(executeAgainstPeer(peerTask, peer))); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private boolean sleepBetweenRetries() { |
|
|
|
private boolean sleepBetweenRetries() { |
|
|
|
try { |
|
|
|
try { |
|
|
|
// sleep for 1 second to match implemented wait between retries in AbstractRetryingPeerTask
|
|
|
|
// sleep for 1 second to match implemented wait between retries in AbstractRetryingPeerTask
|
|
|
|