Make the retrying snap tasks switching (#7307)

* make snap tasks switching

Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
pull/7315/head
Stefan Pingel 4 months ago committed by GitHub
parent 812dc74742
commit d35c6d7875
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 15
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java
  2. 15
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java
  3. 19
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java
  4. 15
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java
  5. 6
      services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/AsyncOperationProcessor.java

@ -17,18 +17,17 @@ package org.hyperledger.besu.ethereum.eth.manager.snap;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.tuweni.bytes.Bytes32;
public class RetryingGetAccountRangeFromPeerTask
extends AbstractRetryingPeerTask<AccountRangeMessage.AccountRangeData> {
extends AbstractRetryingSwitchingPeerTask<AccountRangeMessage.AccountRangeData> {
public static final int MAX_RETRIES = 4;
@ -46,9 +45,9 @@ public class RetryingGetAccountRangeFromPeerTask
final MetricsSystem metricsSystem) {
super(
ethContext,
MAX_RETRIES,
metricsSystem,
data -> data.accounts().isEmpty() && data.proofs().isEmpty(),
metricsSystem);
MAX_RETRIES);
this.ethContext = ethContext;
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
@ -67,12 +66,12 @@ public class RetryingGetAccountRangeFromPeerTask
}
@Override
protected CompletableFuture<AccountRangeMessage.AccountRangeData> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<AccountRangeMessage.AccountRangeData> executeTaskOnCurrentPeer(
final EthPeer peer) {
final GetAccountRangeFromPeerTask task =
GetAccountRangeFromPeerTask.forAccountRange(
ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {

@ -17,19 +17,21 @@ package org.hyperledger.besu.ethereum.eth.manager.snap;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
public class RetryingGetBytecodeFromPeerTask extends AbstractRetryingPeerTask<Map<Bytes32, Bytes>> {
public class RetryingGetBytecodeFromPeerTask
extends AbstractRetryingSwitchingPeerTask<Map<Bytes32, Bytes>> {
public static final int MAX_RETRIES = 4;
private final EthContext ethContext;
private final List<Bytes32> codeHashes;
@ -41,7 +43,7 @@ public class RetryingGetBytecodeFromPeerTask extends AbstractRetryingPeerTask<Ma
final List<Bytes32> codeHashes,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, Map::isEmpty, metricsSystem);
super(ethContext, metricsSystem, Map::isEmpty, MAX_RETRIES);
this.ethContext = ethContext;
this.codeHashes = codeHashes;
this.blockHeader = blockHeader;
@ -57,11 +59,10 @@ public class RetryingGetBytecodeFromPeerTask extends AbstractRetryingPeerTask<Ma
}
@Override
protected CompletableFuture<Map<Bytes32, Bytes>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<Map<Bytes32, Bytes>> executeTaskOnCurrentPeer(final EthPeer peer) {
final GetBytecodeFromPeerTask task =
GetBytecodeFromPeerTask.forBytecode(ethContext, codeHashes, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {

@ -17,19 +17,20 @@ package org.hyperledger.besu.ethereum.eth.manager.snap;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.tuweni.bytes.Bytes32;
public class RetryingGetStorageRangeFromPeerTask
extends AbstractRetryingPeerTask<StorageRangeMessage.SlotRangeData> {
extends AbstractRetryingSwitchingPeerTask<StorageRangeMessage.SlotRangeData> {
public static final int MAX_RETRIES = 4;
private final EthContext ethContext;
private final List<Bytes32> accountHashes;
@ -45,7 +46,11 @@ public class RetryingGetStorageRangeFromPeerTask
final Bytes32 endKeyHash,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, data -> data.proofs().isEmpty() && data.slots().isEmpty(), metricsSystem);
super(
ethContext,
metricsSystem,
data -> data.proofs().isEmpty() && data.slots().isEmpty(),
MAX_RETRIES);
this.ethContext = ethContext;
this.accountHashes = accountHashes;
this.startKeyHash = startKeyHash;
@ -66,12 +71,12 @@ public class RetryingGetStorageRangeFromPeerTask
}
@Override
protected CompletableFuture<StorageRangeMessage.SlotRangeData> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<StorageRangeMessage.SlotRangeData> executeTaskOnCurrentPeer(
final EthPeer peer) {
final GetStorageRangeFromPeerTask task =
GetStorageRangeFromPeerTask.forStorageRange(
ethContext, accountHashes, startKeyHash, endKeyHash, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {

@ -17,18 +17,20 @@ package org.hyperledger.besu.ethereum.eth.manager.snap;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.tuweni.bytes.Bytes;
public class RetryingGetTrieNodeFromPeerTask extends AbstractRetryingPeerTask<Map<Bytes, Bytes>> {
public class RetryingGetTrieNodeFromPeerTask
extends AbstractRetryingSwitchingPeerTask<Map<Bytes, Bytes>> {
public static final int MAX_RETRIES = 4;
private final EthContext ethContext;
private final Map<Bytes, List<Bytes>> paths;
@ -40,7 +42,7 @@ public class RetryingGetTrieNodeFromPeerTask extends AbstractRetryingPeerTask<Ma
final Map<Bytes, List<Bytes>> paths,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, Map::isEmpty, metricsSystem);
super(ethContext, metricsSystem, Map::isEmpty, MAX_RETRIES);
this.ethContext = ethContext;
this.paths = paths;
this.blockHeader = blockHeader;
@ -56,11 +58,10 @@ public class RetryingGetTrieNodeFromPeerTask extends AbstractRetryingPeerTask<Ma
}
@Override
protected CompletableFuture<Map<Bytes, Bytes>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<Map<Bytes, Bytes>> executeTaskOnCurrentPeer(final EthPeer peer) {
final GetTrieNodeFromPeerTask task =
GetTrieNodeFromPeerTask.forTrieNodes(ethContext, paths, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {

@ -83,7 +83,11 @@ class AsyncOperationProcessor<I, O> implements Processor<I, O> {
waitForAnyFutureToComplete();
outputCompletedTasks(outputPipe);
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for processing to complete", e.getMessage());
LOG.atTrace()
.setMessage("Interrupted while waiting for processing to complete: Message=({})")
.addArgument(e.getMessage())
.setCause(e)
.log();
} catch (final ExecutionException e) {
throw new AsyncOperationException("Async operation failed. " + e.getMessage(), e);
} catch (final TimeoutException e) {

Loading…
Cancel
Save