Add fix to prevent the node from stopping downloading blocks (#2213)

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
pull/2491/head
matkt 3 years ago committed by GitHub
parent fdd747e6c6
commit e2356c93ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTask.java
  2. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeersTask.java
  3. 35
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java
  4. 10
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManager.java
  5. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java
  6. 7
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTaskTest.java
  7. 14
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManagerTest.java

@ -24,24 +24,26 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
/** Downloads a block from a peer. Will complete exceptionally if block cannot be downloaded. */
public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
private static final Logger LOG = LogManager.getLogger();
private final ProtocolSchedule protocolSchedule;
private final Hash hash;
private final Optional<Hash> hash;
private final long blockNumber;
private final MetricsSystem metricsSystem;
protected GetBlockFromPeerTask(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
super(ethContext, metricsSystem);
@ -54,7 +56,7 @@ public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
public static GetBlockFromPeerTask create(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
return new GetBlockFromPeerTask(protocolSchedule, ethContext, hash, blockNumber, metricsSystem);
@ -62,9 +64,10 @@ public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
@Override
protected void executeTask() {
final String blockIdentifier = hash.map(Bytes::toHexString).orElse(Long.toString(blockNumber));
LOG.debug(
"Downloading block {} from peer {}.",
hash,
blockIdentifier,
assignedPeer.map(EthPeer::toString).orElse("<any>"));
downloadHeader()
.thenCompose(this::completeBlock)
@ -73,14 +76,15 @@ public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
if (t != null) {
LOG.info(
"Failed to download block {} from peer {}.",
hash,
blockIdentifier,
assignedPeer.map(EthPeer::toString).orElse("<any>"));
result.completeExceptionally(t);
} else if (r.getResult().isEmpty()) {
LOG.info("Failed to download block {} from peer {}.", hash, r.getPeer());
LOG.info("Failed to download block {} from peer {}.", blockIdentifier, r.getPeer());
result.completeExceptionally(new IncompleteResultsException());
} else {
LOG.debug("Successfully downloaded block {} from peer {}.", hash, r.getPeer());
LOG.debug(
"Successfully downloaded block {} from peer {}.", blockIdentifier, r.getPeer());
result.complete(new PeerTaskResult<>(r.getPeer(), r.getResult().get(0)));
}
});
@ -89,9 +93,16 @@ public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
private CompletableFuture<PeerTaskResult<List<BlockHeader>>> downloadHeader() {
return executeSubTask(
() -> {
final AbstractGetHeadersFromPeerTask task =
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule, ethContext, hash, blockNumber, metricsSystem);
final AbstractGetHeadersFromPeerTask task;
task =
hash.map(
value ->
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule, ethContext, value, blockNumber, metricsSystem))
.orElseGet(
() ->
GetHeadersFromPeerByNumberTask.forSingleNumber(
protocolSchedule, ethContext, blockNumber, metricsSystem));
assignedPeer.ifPresent(task::assignPeer);
return task.run();
});

@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -34,7 +35,7 @@ public class GetBlockFromPeersTask extends AbstractEthTask<AbstractPeerTask.Peer
private final List<EthPeer> peers;
private final EthContext ethContext;
private final ProtocolSchedule protocolSchedule;
private final Hash hash;
private final Optional<Hash> hash;
private final long blockNumber;
private final MetricsSystem metricsSystem;
@ -42,7 +43,7 @@ public class GetBlockFromPeersTask extends AbstractEthTask<AbstractPeerTask.Peer
final List<EthPeer> peers,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
super(metricsSystem);
@ -58,7 +59,7 @@ public class GetBlockFromPeersTask extends AbstractEthTask<AbstractPeerTask.Peer
final List<EthPeer> peers,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
return new GetBlockFromPeersTask(

@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessage;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
@ -45,6 +46,7 @@ import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@ -143,6 +145,20 @@ public class BlockPropagationManager {
LOG.info("Imported {} pending blocks", r.size());
}
});
} else {
pendingBlocksManager
.lowestAnnouncedBlock()
.map(ProcessableBlockHeader::getNumber)
.ifPresent(
minAnnouncedBlockNumber -> {
long distance =
minAnnouncedBlockNumber
- protocolContext.getBlockchain().getChainHeadBlockNumber();
if (distance < config.getBlockPropagationRange().upperEndpoint()
&& minAnnouncedBlockNumber > newBlock.getHeader().getNumber()) {
retrieveMissingAnnouncedBlock(newBlock.getHeader().getNumber() + 1);
}
});
}
if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
@ -245,11 +261,28 @@ public class BlockPropagationManager {
}
}
private CompletableFuture<Block> retrieveMissingAnnouncedBlock(final long blockNumber) {
LOG.debug("Retrieve missing announced block {} from peer", blockNumber);
final List<EthPeer> peers =
ethContext.getEthPeers().streamBestPeers().collect(Collectors.toList());
final GetBlockFromPeersTask getBlockTask =
GetBlockFromPeersTask.create(
peers, protocolSchedule, ethContext, Optional.empty(), blockNumber, metricsSystem);
return getBlockTask
.run()
.thenCompose((r) -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()));
}
private CompletableFuture<Block> processAnnouncedBlock(
final List<EthPeer> peers, final NewBlockHash newBlock) {
final GetBlockFromPeersTask getBlockTask =
GetBlockFromPeersTask.create(
peers, protocolSchedule, ethContext, newBlock.hash(), newBlock.number(), metricsSystem);
peers,
protocolSchedule,
ethContext,
Optional.of(newBlock.hash()),
newBlock.number(),
metricsSystem);
return getBlockTask
.run()
.thenCompose((r) -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()));

@ -17,15 +17,18 @@ package org.hyperledger.besu.ethereum.eth.sync.state;
import static java.util.Collections.newSetFromMap;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.cache.ImmutablePendingBlock;
import org.hyperledger.besu.ethereum.eth.sync.state.cache.PendingBlockCache;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@ -116,4 +119,11 @@ public class PendingBlocksManager {
.map(ImmutablePendingBlock::block)
.collect(Collectors.toList());
}
public Optional<BlockHeader> lowestAnnouncedBlock() {
return pendingBlocks.values().stream()
.map(ImmutablePendingBlock::block)
.map(Block::getHeader)
.min(Comparator.comparing(BlockHeader::getNumber));
}
}

@ -212,7 +212,7 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask<List<Bl
GetBlockFromPeerTask.create(
protocolSchedule,
ethContext,
child.getHash(),
Optional.of(child.getHash()),
child.getNumber(),
metricsSystem)
.assignPeer(headersResult.getPeer());

@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.AbstractMessageTas
import org.hyperledger.besu.ethereum.eth.manager.exceptions.EthTaskException;
import org.hyperledger.besu.util.ExceptionUtils;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
@ -47,7 +48,11 @@ public class GetBlockFromPeerTaskTest
@Override
protected EthTask<AbstractPeerTask.PeerTaskResult<Block>> createTask(final Block requestedData) {
return GetBlockFromPeerTask.create(
protocolSchedule, ethContext, requestedData.getHash(), BLOCK_NUMBER, metricsSystem);
protocolSchedule,
ethContext,
Optional.of(requestedData.getHash()),
BLOCK_NUMBER,
metricsSystem);
}
@Override

@ -245,4 +245,18 @@ public class PendingBlocksManagerTest {
}
assertThat(pendingBlocksManager.contains(reorgBlock.getHash())).isTrue();
}
@Test
public void shouldReturnLowestBlockByNumber() {
final BlockDataGenerator gen = new BlockDataGenerator();
final Block parentBlock = gen.block();
final Block childBlock = gen.nextBlock(parentBlock);
final Block childBlock2 = gen.nextBlock(parentBlock);
pendingBlocksManager.registerPendingBlock(parentBlock, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(childBlock, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(childBlock2, NODE_ID_1);
assertThat(pendingBlocksManager.lowestAnnouncedBlock()).contains(parentBlock.getHeader());
}
}

Loading…
Cancel
Save