Add mechanism to retrieve missing blocks (#4175)

* Add mechanism to retrieve missing blocks that caused BlockPropagationManager to get stuck on pending blocks
* Remove threshold and request the lowest block parent everytime a block is saved for future

Signed-off-by: Gabriel Trintinalia <gabriel.trintinalia@consensys.net>

Co-authored-by: Gabriel Trintinalia <gabriel.trintinalia@consensys.net>
Co-authored-by: garyschulte <garyschulte@gmail.com>
Co-authored-by: Sally MacFarlane <sally.macfarlane@consensys.net>
pull/4180/head
Gabriel-Trintinalia 2 years ago committed by GitHub
parent f838572ec8
commit f05b45d4d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java
  2. 39
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java

@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory;
public class BlockPropagationManager { public class BlockPropagationManager {
private static final Logger LOG = LoggerFactory.getLogger(BlockPropagationManager.class); private static final Logger LOG = LoggerFactory.getLogger(BlockPropagationManager.class);
private final SynchronizerConfiguration config; private final SynchronizerConfiguration config;
private final ProtocolSchedule protocolSchedule; private final ProtocolSchedule protocolSchedule;
private final ProtocolContext protocolContext; private final ProtocolContext protocolContext;
@ -364,6 +363,25 @@ public class BlockPropagationManager {
return getBlockFromPeers(Optional.of(peer), blockHash.number(), Optional.of(blockHash.hash())); return getBlockFromPeers(Optional.of(peer), blockHash.number(), Optional.of(blockHash.hash()));
} }
private void requestParentBlock(final BlockHeader blockHeader) {
if (requestedBlocks.add(blockHeader.getParentHash())) {
retrieveParentBlock(blockHeader);
} else {
LOG.trace("Parent block with hash {} was already requested", blockHeader.getParentHash());
}
}
private CompletableFuture<Block> retrieveParentBlock(final BlockHeader blockHeader) {
final long targetParentBlockNumber = blockHeader.getNumber() - 1L;
final Hash targetParentBlockHash = blockHeader.getParentHash();
LOG.info(
"Retrieving parent {} of block #{} from peers",
targetParentBlockHash,
blockHeader.getNumber());
return getBlockFromPeers(
Optional.empty(), targetParentBlockNumber, Optional.of(targetParentBlockHash));
}
private CompletableFuture<Block> getBlockFromPeers( private CompletableFuture<Block> getBlockFromPeers(
final Optional<EthPeer> preferredPeer, final Optional<EthPeer> preferredPeer,
final long blockNumber, final long blockNumber,
@ -421,6 +439,10 @@ public class BlockPropagationManager {
if (pendingBlocksManager.registerPendingBlock(block, nodeId)) { if (pendingBlocksManager.registerPendingBlock(block, nodeId)) {
LOG.info("Saving announced block {} for future import", block.toLogString()); LOG.info("Saving announced block {} for future import", block.toLogString());
} }
// Request parent of the lowest announced block
pendingBlocksManager.lowestAnnouncedBlock().ifPresent(this::requestParentBlock);
return CompletableFuture.completedFuture(block); return CompletableFuture.completedFuture(block);
} }
} }

@ -59,6 +59,7 @@ import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.testutil.TestClock; import org.hyperledger.besu.testutil.TestClock;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -634,6 +635,44 @@ public abstract class AbstractBlockPropagationManagerTest {
verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class)); verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class));
} }
@Test
public void shouldRequestLowestAnnouncedPendingBlockParent() {
// test if block propagation manager can recover if one block is missed
blockchainUtil.importFirstBlocks(2);
final List<Block> blocks = blockchainUtil.getBlocks().subList(2, 4);
blockPropagationManager.start();
// Create peer and responder
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());
// skip first block then create messages from blocklist
blocks.stream()
.skip(1)
.map(this::createNewBlockHashMessage)
.forEach(
message -> { // Broadcast new block hash message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, message);
});
peer.respondWhile(responder, peer::hasOutstandingRequests);
// assert all blocks were imported
blocks.forEach(
block -> {
assertThat(blockchain.contains(block.getHash())).isTrue();
});
}
private NewBlockHashesMessage createNewBlockHashMessage(final Block block) {
return NewBlockHashesMessage.create(
Collections.singletonList(
new NewBlockHashesMessage.NewBlockHash(
block.getHash(), block.getHeader().getNumber())));
}
@Test @Test
public void verifyBroadcastBlockInvocation() { public void verifyBroadcastBlockInvocation() {
blockchainUtil.importFirstBlocks(2); blockchainUtil.importFirstBlocks(2);

Loading…
Cancel
Save