Retry mechanism when getting a broadcasted block fail on all peers (#4271)

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/4318/head
Fabio Di Fabio 2 years ago committed by GitHub
parent c321a85ef0
commit af65c86e64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java
  3. 240
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java
  4. 23
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SynchronizerConfiguration.java
  5. 15
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java

@ -5,6 +5,7 @@
### Additions and Improvements
- Upgrade besu-native to 0.6.0 and use Blake2bf native implementation if available by default [#4264](https://github.com/hyperledger/besu/pull/4264)
- Better management of jemalloc presence/absence in startup script [#4237](https://github.com/hyperledger/besu/pull/4237)
- Retry mechanism when getting a broadcasted block fail on all peers [#4271](https://github.com/hyperledger/besu/pull/4271)
- Filter out disconnected peers when fetching available peers [#4269](https://github.com/hyperledger/besu/pull/4269)
- Updated the default value of fast-sync-min-peers post merge [#4298](https://github.com/hyperledger/besu/pull/4298)
- Log imported block info post merge [#4310](https://github.com/hyperledger/besu/pull/4310)

@ -114,9 +114,9 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
if (cause instanceof NoAvailablePeersException) {
LOG.debug(
"No useful peer found, checking remaining current peers for usefulness: {}",
"No useful peer found, wait max 5 seconds for new peer to connect: current peers {}",
ethContext.getEthPeers().peerCount());
// Wait for new peer to connect
final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext, metricsSystem);
executeSubTask(
() ->

@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.sync;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.consensus.merge.ForkchoiceMessageListener;
@ -23,6 +25,7 @@ import org.hyperledger.besu.ethereum.chain.BadBlockManager;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent.EventType;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
@ -46,6 +49,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.Di
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -53,8 +57,10 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -76,12 +82,9 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
private final BlockBroadcaster blockBroadcaster;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Set<Hash> importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Hash> requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Long> requestedNonAnnouncedBlocks =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ProcessingBlocksManager processingBlocksManager;
private final PendingBlocksManager pendingBlocksManager;
private final Duration getBlockTimeoutMillis;
private Optional<Long> onBlockAddedSId = Optional.empty();
private Optional<Long> newBlockSId;
private Optional<Long> newBlockHashesSId;
@ -95,6 +98,28 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
final PendingBlocksManager pendingBlocksManager,
final MetricsSystem metricsSystem,
final BlockBroadcaster blockBroadcaster) {
this(
config,
protocolSchedule,
protocolContext,
ethContext,
syncState,
pendingBlocksManager,
metricsSystem,
blockBroadcaster,
new ProcessingBlocksManager());
}
BlockPropagationManager(
final SynchronizerConfiguration config,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final SyncState syncState,
final PendingBlocksManager pendingBlocksManager,
final MetricsSystem metricsSystem,
final BlockBroadcaster blockBroadcaster,
final ProcessingBlocksManager processingBlocksManager) {
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
@ -104,6 +129,9 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
this.syncState = syncState;
this.pendingBlocksManager = pendingBlocksManager;
this.syncState.subscribeTTDReached(this::reactToTTDReachedEvent);
this.getBlockTimeoutMillis =
Duration.ofMillis(config.getPropagationManagerGetBlockTimeoutMillis());
this.processingBlocksManager = processingBlocksManager;
}
public void start() {
@ -156,6 +184,7 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
// Check to see if any of our pending blocks are now ready for import
final Block newBlock = blockAddedEvent.getBlock();
traceLambda(
LOG,
"Block added event type {} for block {}. Current status {}",
@ -247,7 +276,7 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
if (distance < config.getBlockPropagationRange().upperEndpoint()
&& minAnnouncedBlockNumber > firstNonAnnouncedBlockNumber) {
if (requestedNonAnnouncedBlocks.add(firstNonAnnouncedBlockNumber)) {
if (processingBlocksManager.addNonAnnouncedBlocks(firstNonAnnouncedBlockNumber)) {
retrieveNonAnnouncedBlock(firstNonAnnouncedBlockNumber);
}
}
@ -338,7 +367,7 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
LOG.trace("New block hash from network {} is already pending", announcedBlock);
continue;
}
if (importingBlocks.contains(announcedBlock.hash())) {
if (processingBlocksManager.alreadyImporting(announcedBlock.hash())) {
LOG.trace("New block hash from network {} is already importing", announcedBlock);
continue;
}
@ -346,7 +375,7 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
LOG.trace("New block hash from network {} was already imported", announcedBlock);
continue;
}
if (requestedBlocks.add(announcedBlock.hash())) {
if (processingBlocksManager.addRequestedBlock(announcedBlock.hash())) {
newBlocks.add(announcedBlock);
} else {
LOG.trace("New block hash from network {} was already requested", announcedBlock);
@ -379,7 +408,7 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
private void requestParentBlock(final Block block) {
final BlockHeader blockHeader = block.getHeader();
if (requestedBlocks.add(blockHeader.getParentHash())) {
if (processingBlocksManager.addRequestedBlock(blockHeader.getParentHash())) {
retrieveParentBlock(blockHeader);
} else {
LOG.debug("Parent block with hash {} is already requested", blockHeader.getParentHash());
@ -397,34 +426,115 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
private CompletableFuture<Block> getBlockFromPeers(
final Optional<EthPeer> preferredPeer,
final long blockNumber,
final Optional<Hash> blockHash) {
final Optional<Hash> maybeBlockHash) {
return repeatableGetBlockFromPeer(preferredPeer, blockNumber, maybeBlockHash)
.whenComplete(
(block, throwable) -> {
if (block != null) {
debugLambda(LOG, "Successfully retrieved block {}", block::toLogString);
processingBlocksManager.registerReceivedBlock(block);
} else {
if (throwable != null) {
LOG.warn(
"Failed to retrieve block "
+ logBlockNumberMaybeHash(blockNumber, maybeBlockHash),
throwable);
} else {
// this could happen if we give up at some point since we find that it make no
// sense to retry
debugLambda(
LOG,
"Block {} not retrieved",
() -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash));
}
processingBlocksManager.registerFailedGetBlock(blockNumber, maybeBlockHash);
}
});
}
private CompletableFuture<Block> repeatableGetBlockFromPeer(
final Optional<EthPeer> preferredPeer,
final long blockNumber,
final Optional<Hash> maybeBlockHash) {
return exceptionallyCompose(
scheduleGetBlockFromPeers(preferredPeer, blockNumber, maybeBlockHash),
handleGetBlockErrors(blockNumber, maybeBlockHash))
.thenCompose(r -> maybeRepeatGetBlock(blockNumber, maybeBlockHash));
}
private Function<Throwable, CompletionStage<Block>> handleGetBlockErrors(
final long blockNumber, final Optional<Hash> maybeBlockHash) {
return throwable -> {
debugLambda(
LOG,
"Temporary failure retrieving block {} from peers with error {}",
() -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash),
throwable::toString);
return CompletableFuture.completedFuture(null);
};
}
private CompletableFuture<Block> maybeRepeatGetBlock(
final long blockNumber, final Optional<Hash> maybeBlockHash) {
final MutableBlockchain blockchain = protocolContext.getBlockchain();
final Optional<Block> maybeBlock =
maybeBlockHash
.map(hash -> blockchain.getBlockByHash(hash))
.orElseGet(() -> blockchain.getBlockByNumber(blockNumber));
// check if we got this block by other means
if (maybeBlock.isPresent()) {
final Block block = maybeBlock.get();
debugLambda(
LOG, "No need to retry to get block {} since it is already present", block::toLogString);
return CompletableFuture.completedFuture(block);
}
final long localChainHeight = blockchain.getChainHeadBlockNumber();
final long bestChainHeight = syncState.bestChainHeight(localChainHeight);
if (!shouldImportBlockAtHeight(blockNumber, localChainHeight, bestChainHeight)) {
debugLambda(
LOG,
"Not retrying to get block {} since we are too far from local chain head {}",
() -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash),
blockchain.getChainHead()::toLogString);
return CompletableFuture.completedFuture(null);
}
debugLambda(
LOG,
"Retrying to get block {}",
() -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash));
return ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() -> repeatableGetBlockFromPeer(Optional.empty(), blockNumber, maybeBlockHash));
}
private CompletableFuture<Block> scheduleGetBlockFromPeers(
final Optional<EthPeer> maybePreferredPeer,
final long blockNumber,
final Optional<Hash> maybeBlockHash) {
final RetryingGetBlockFromPeersTask getBlockTask =
RetryingGetBlockFromPeersTask.create(
protocolSchedule,
ethContext,
metricsSystem,
ethContext.getEthPeers().getMaxPeers(),
blockHash,
Math.max(1, ethContext.getEthPeers().peerCount()),
maybeBlockHash,
blockNumber);
preferredPeer.ifPresent(getBlockTask::assignPeer);
maybePreferredPeer.ifPresent(getBlockTask::assignPeer);
return ethContext
.getScheduler()
.scheduleSyncWorkerTask(getBlockTask::run)
.thenCompose(r -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()))
.whenComplete(
(r, t) -> {
requestedNonAnnouncedBlocks.remove(blockNumber);
blockHash.ifPresentOrElse(
requestedBlocks::remove,
() -> {
if (r != null) {
// in case we successfully retrieved only by block number, when can remove
// the request by hash too
requestedBlocks.remove(r.getHash());
}
});
});
var future =
ethContext
.getScheduler()
.scheduleSyncWorkerTask(getBlockTask::run)
.thenCompose(r -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()));
ethContext.getScheduler().failAfterTimeout(future, getBlockTimeoutMillis);
return future;
}
private void broadcastBlock(final Block block, final BlockHeader parent) {
@ -453,14 +563,14 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
return CompletableFuture.completedFuture(block);
}
if (!importingBlocks.add(block.getHash())) {
if (!processingBlocksManager.addImportingBlock(block.getHash())) {
traceLambda(LOG, "We're already importing this block {}", block::toLogString);
return CompletableFuture.completedFuture(block);
}
if (protocolContext.getBlockchain().contains(block.getHash())) {
traceLambda(LOG, "We've already imported this block {}", block::toLogString);
importingBlocks.remove(block.getHash());
processingBlocksManager.registerBlockImportDone(block.getHash());
return CompletableFuture.completedFuture(block);
}
@ -537,7 +647,7 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent));
return runImportTask(block);
} else {
importingBlocks.remove(block.getHash());
processingBlocksManager.registerBlockImportDone(block.getHash());
badBlockManager.addBadBlock(block);
LOG.warn("Failed to import announced block {}", block.toLogString());
return CompletableFuture.completedFuture(block);
@ -557,7 +667,7 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
.run()
.whenComplete(
(result, throwable) -> {
importingBlocks.remove(block.getHash());
processingBlocksManager.registerBlockImportDone(block.getHash());
if (throwable != null) {
LOG.warn("Failed to import announced block {}", block.toLogString());
}
@ -591,17 +701,17 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
@Override
public String toString() {
return "BlockPropagationManager{"
+ "requestedBlocks="
+ requestedBlocks
+ ", requestedNonAnnounceBlocks="
+ requestedNonAnnouncedBlocks
+ ", importingBlocks="
+ importingBlocks
+ processingBlocksManager
+ ", pendingBlocksManager="
+ pendingBlocksManager
+ '}';
}
private String logBlockNumberMaybeHash(
final long blockNumber, final Optional<Hash> maybeBlockHash) {
return blockNumber + maybeBlockHash.map(h -> " (" + h + ")").orElse("");
}
@Override
public void onNewForkchoiceMessage(
final Hash headBlockHash,
@ -611,4 +721,54 @@ public class BlockPropagationManager implements ForkchoiceMessageListener {
stop();
}
}
static class ProcessingBlocksManager {
private final Set<Hash> importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Hash> requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Long> requestedNonAnnouncedBlocks =
Collections.newSetFromMap(new ConcurrentHashMap<>());
boolean addRequestedBlock(final Hash hash) {
return requestedBlocks.add(hash);
}
public boolean addNonAnnouncedBlocks(final long blockNumber) {
return requestedNonAnnouncedBlocks.add(blockNumber);
}
public boolean alreadyImporting(final Hash hash) {
return importingBlocks.contains(hash);
}
public synchronized void registerReceivedBlock(final Block block) {
requestedBlocks.remove(block.getHash());
requestedNonAnnouncedBlocks.remove(block.getHeader().getNumber());
}
public synchronized void registerFailedGetBlock(
final long blockNumber, final Optional<Hash> maybeBlockHash) {
requestedNonAnnouncedBlocks.remove(blockNumber);
maybeBlockHash.ifPresent(requestedBlocks::remove);
}
public boolean addImportingBlock(final Hash hash) {
return importingBlocks.add(hash);
}
public void registerBlockImportDone(final Hash hash) {
importingBlocks.remove(hash);
}
@Override
public synchronized String toString() {
return "ProcessingBlocksManager{"
+ "importingBlocks="
+ importingBlocks
+ ", requestedBlocks="
+ requestedBlocks
+ ", requestedNonAnnouncedBlocks="
+ requestedNonAnnouncedBlocks
+ '}';
}
}
}

@ -47,6 +47,8 @@ public class SynchronizerConfiguration {
public static final int DEFAULT_COMPUTATION_PARALLELISM = 2;
public static final int DEFAULT_WORLD_STATE_TASK_CACHE_SIZE =
CachingTaskCollection.DEFAULT_CACHE_SIZE;
public static final long DEFAULT_PROPAGATION_MANAGER_GET_BLOCK_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(60);
// Fast sync config
private final int fastSyncPivotDistance;
@ -77,6 +79,7 @@ public class SynchronizerConfiguration {
private final int computationParallelism;
private final int maxTrailingPeers;
private final long worldStateMinMillisBeforeStalling;
private final long propagationManagerGetBlockTimeoutMillis;
private SynchronizerConfiguration(
final int fastSyncPivotDistance,
@ -98,7 +101,8 @@ public class SynchronizerConfiguration {
final int downloaderParallelism,
final int transactionsParallelism,
final int computationParallelism,
final int maxTrailingPeers) {
final int maxTrailingPeers,
final long propagationManagerGetBlockTimeoutMillis) {
this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
this.fastSyncMinimumPeerCount = fastSyncMinimumPeerCount;
@ -119,6 +123,7 @@ public class SynchronizerConfiguration {
this.transactionsParallelism = transactionsParallelism;
this.computationParallelism = computationParallelism;
this.maxTrailingPeers = maxTrailingPeers;
this.propagationManagerGetBlockTimeoutMillis = propagationManagerGetBlockTimeoutMillis;
}
public static Builder builder() {
@ -234,6 +239,10 @@ public class SynchronizerConfiguration {
return maxTrailingPeers;
}
public long getPropagationManagerGetBlockTimeoutMillis() {
return propagationManagerGetBlockTimeoutMillis;
}
public static class Builder {
private SyncMode syncMode = SyncMode.FULL;
private int fastSyncMinimumPeerCount = DEFAULT_FAST_SYNC_MINIMUM_PEERS;
@ -260,6 +269,9 @@ public class SynchronizerConfiguration {
private long worldStateMinMillisBeforeStalling = DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING;
private int worldStateTaskCacheSize = DEFAULT_WORLD_STATE_TASK_CACHE_SIZE;
private long propagationManagerGetBlockTimeoutMillis =
DEFAULT_PROPAGATION_MANAGER_GET_BLOCK_TIMEOUT_MILLIS;
public Builder fastSyncPivotDistance(final int distance) {
fastSyncPivotDistance = distance;
return this;
@ -371,6 +383,12 @@ public class SynchronizerConfiguration {
return this;
}
public Builder propagationManagerGetBlockTimeoutMillis(
final long propagationManagerGetBlockTimeoutMillis) {
this.propagationManagerGetBlockTimeoutMillis = propagationManagerGetBlockTimeoutMillis;
return this;
}
public SynchronizerConfiguration build() {
return new SynchronizerConfiguration(
fastSyncPivotDistance,
@ -392,7 +410,8 @@ public class SynchronizerConfiguration {
downloaderParallelism,
transactionsParallelism,
computationParallelism,
maxTrailingPeers);
maxTrailingPeers,
propagationManagerGetBlockTimeoutMillis);
}
}
}

@ -50,6 +50,7 @@ import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer.Responder;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockMessage;
import org.hyperledger.besu.ethereum.eth.sync.BlockPropagationManager.ProcessingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
@ -87,6 +88,8 @@ public abstract class AbstractBlockPropagationManagerTest {
spy(
new PendingBlocksManager(
SynchronizerConfiguration.builder().blockPropagationRange(-10, 30).build()));
protected final ProcessingBlocksManager processingBlocksManager =
spy(new ProcessingBlocksManager());
protected SyncState syncState;
protected final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final Hash finalizedHash = Hash.fromHexStringLenient("0x1337");
@ -119,7 +122,8 @@ public abstract class AbstractBlockPropagationManagerTest {
syncState,
pendingBlocksManager,
metricsSystem,
blockBroadcaster);
blockBroadcaster,
processingBlocksManager);
}
@Test
@ -933,7 +937,7 @@ public abstract class AbstractBlockPropagationManagerTest {
}
@Test
public void shouldRequestBlockAgainIfFirstGetBlockFails() {
public void shouldRequestBlockFromOtherPeersIfFirstPeerFails() {
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
@ -956,15 +960,18 @@ public abstract class AbstractBlockPropagationManagerTest {
assertThat(blockchain.contains(nextBlock.getHash())).isFalse();
// Re-broadcast the previous message and peer responds
// second peer responds
final RespondingEthPeer secondPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, secondPeer, nextAnnouncement);
final Responder goodResponder = RespondingEthPeer.blockchainResponder(getFullBlockchain());
secondPeer.respondWhile(goodResponder, secondPeer::hasOutstandingRequests);
assertThat(blockchain.contains(nextBlock.getHash())).isTrue();
verify(processingBlocksManager).addRequestedBlock(nextBlock.getHash());
verify(processingBlocksManager).addImportingBlock(nextBlock.getHash());
verify(processingBlocksManager).registerReceivedBlock(nextBlock);
verify(processingBlocksManager).registerBlockImportDone(nextBlock.getHash());
}
public abstract Blockchain getFullBlockchain();

Loading…
Cancel
Save