Improve pending blocks retrieval mechanism (#4227)

* Add more log to retrieve parent method
* Request the lowest pending ancestor when saving a block
* Replace recursive implementation with iterative when getting pending ancestors of Block
* Decrease scope of synchronized block to reflect only the event of adding pending block to the list
* Add fork to the chain so test is more representative

Signed-off-by: Gabriel Trintinalia <gabriel.trintinalia@consensys.net>

Signed-off-by: Gabriel Trintinalia <gabriel.trintinalia@consensys.net>
Co-authored-by: Gabriel Trintinalia <gabriel.trintinalia@consensys.net>
pull/4259/head
Gabriel-Trintinalia 2 years ago committed by GitHub
parent 9d476ea1f8
commit f847ead5c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 131
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java
  3. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManager.java
  4. 30
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java
  5. 47
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManagerTest.java

@ -5,6 +5,7 @@
### Additions and Improvements
- Introduce a cap to reputation score increase [#4230](https://github.com/hyperledger/besu/pull/4230)
- Add experimental CLI option for `--Xp2p-peer-lower-bound` [#4200](https://github.com/hyperledger/besu/pull/4200)
- Improve pending blocks retrieval mechanism [#4227](https://github.com/hyperledger/besu/pull/4227)
### Bug Fixes
- Fixes off-by-one error for mainnet TTD fallback [#4223](https://github.com/hyperledger/besu/pull/4223)

@ -155,29 +155,49 @@ public class BlockPropagationManager {
private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
// Check to see if any of our pending blocks are now ready for import
final Block newBlock = blockAddedEvent.getBlock();
traceLambda(
LOG,
"Block added event type {} for block {}. Current status {}",
blockAddedEvent::getEventType,
newBlock::toLogString,
() -> this);
// If there is no children to process, maybe try non announced blocks
if (!maybeProcessPendingChildrenBlocks(newBlock)) {
traceLambda(
LOG, "There are no pending blocks ready to import for block {}", newBlock::toLogString);
maybeProcessNonAnnouncedBlocks(newBlock);
}
if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
final long head = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long cutoff = head + config.getBlockPropagationRange().lowerEndpoint();
pendingBlocksManager.purgeBlocksOlderThan(cutoff);
}
}
/**
* Process pending Children if any
*
* @param block the block to process the children
* @return true if block has any pending child
*/
private boolean maybeProcessPendingChildrenBlocks(final Block block) {
final List<Block> readyForImport;
synchronized (pendingBlocksManager) {
// Remove block from pendingBlocks list
pendingBlocksManager.deregisterPendingBlock(newBlock);
pendingBlocksManager.deregisterPendingBlock(block);
// Import any pending blocks that are children of the newly added block
readyForImport = pendingBlocksManager.childrenOf(newBlock.getHash());
readyForImport = pendingBlocksManager.childrenOf(block.getHash());
}
traceLambda(
LOG,
"Block added event type {} for block {}. Current status {}",
blockAddedEvent::getEventType,
newBlock::toLogString,
() -> this);
if (!readyForImport.isEmpty()) {
traceLambda(
LOG,
"Ready to import pending blocks found [{}] for block {}",
() -> readyForImport.stream().map(Block::toLogString).collect(Collectors.joining(", ")),
newBlock::toLogString);
block::toLogString);
final Supplier<CompletableFuture<List<Block>>> importBlocksTask =
PersistBlockTask.forUnorderedBlocks(
@ -193,25 +213,17 @@ public class BlockPropagationManager {
.whenComplete(
(r, t) -> {
if (r != null) {
LOG.info("Imported {} pending blocks", r.size());
LOG.info(
"Imported {} pending blocks: {}",
r.size(),
r.stream().map(b -> b.getHeader().getNumber()).collect(Collectors.toList()));
}
if (t != null) {
LOG.error("Error importing pending blocks", t);
}
});
} else {
traceLambda(
LOG, "There are no pending blocks ready to import for block {}", newBlock::toLogString);
maybeProcessNonAnnouncedBlocks(newBlock);
}
if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
final long head = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long cutoff = head + config.getBlockPropagationRange().lowerEndpoint();
pendingBlocksManager.purgeBlocksOlderThan(cutoff);
}
return !readyForImport.isEmpty();
}
private void maybeProcessNonAnnouncedBlocks(final Block newBlock) {
@ -223,13 +235,13 @@ public class BlockPropagationManager {
.map(ProcessableBlockHeader::getNumber)
.ifPresent(
minAnnouncedBlockNumber -> {
long distance = minAnnouncedBlockNumber - localHeadBlockNumber;
final long distance = minAnnouncedBlockNumber - localHeadBlockNumber;
LOG.trace(
"Found lowest announced block {} with distance {}",
minAnnouncedBlockNumber,
distance);
long firstNonAnnouncedBlockNumber = newBlock.getHeader().getNumber() + 1;
final long firstNonAnnouncedBlockNumber = newBlock.getHeader().getNumber() + 1;
if (distance < config.getBlockPropagationRange().upperEndpoint()
&& minAnnouncedBlockNumber > firstNonAnnouncedBlockNumber) {
@ -364,21 +376,19 @@ public class BlockPropagationManager {
return getBlockFromPeers(Optional.of(peer), blockHash.number(), Optional.of(blockHash.hash()));
}
private void requestParentBlock(final BlockHeader blockHeader) {
private void requestParentBlock(final Block block) {
final BlockHeader blockHeader = block.getHeader();
if (requestedBlocks.add(blockHeader.getParentHash())) {
retrieveParentBlock(blockHeader);
} else {
LOG.trace("Parent block with hash {} was already requested", blockHeader.getParentHash());
LOG.debug("Parent block with hash {} is already requested", blockHeader.getParentHash());
}
}
private CompletableFuture<Block> retrieveParentBlock(final BlockHeader blockHeader) {
final long targetParentBlockNumber = blockHeader.getNumber() - 1L;
final Hash targetParentBlockHash = blockHeader.getParentHash();
LOG.info(
"Retrieving parent {} of block #{} from peers",
targetParentBlockHash,
blockHeader.getNumber());
LOG.info("Retrieving parent {} of block {}", targetParentBlockHash, blockHeader.toLogString());
return getBlockFromPeers(
Optional.empty(), targetParentBlockNumber, Optional.of(targetParentBlockHash));
}
@ -434,18 +444,13 @@ public class BlockPropagationManager {
// invoked for the parent of this block before we are able to register it.
traceLambda(LOG, "Import or save pending block {}", block::toLogString);
synchronized (pendingBlocksManager) {
if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) {
// Block isn't connected to local chain, save it to pending blocks collection
if (pendingBlocksManager.registerPendingBlock(block, nodeId)) {
LOG.info("Saving announced block {} for future import", block.toLogString());
}
// Request parent of the lowest announced block
pendingBlocksManager.lowestAnnouncedBlock().ifPresent(this::requestParentBlock);
return CompletableFuture.completedFuture(block);
if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) {
// Block isn't connected to local chain, save it to pending blocks collection
if (savePendingBlock(block, nodeId)) {
// if block is saved as pending, try to resolve it
maybeProcessPendingBlocks(block);
}
return CompletableFuture.completedFuture(block);
}
if (!importingBlocks.add(block.getHash())) {
@ -480,6 +485,48 @@ public class BlockPropagationManager {
blockHeaderValidator, block, parent, badBlockManager));
}
/**
* Save the given block.
*
* @param block the block to track
* @param nodeId node that sent the block
* @return true if the block was added (was not previously present)
*/
private boolean savePendingBlock(final Block block, final Bytes nodeId) {
synchronized (pendingBlocksManager) {
if (pendingBlocksManager.registerPendingBlock(block, nodeId)) {
LOG.info(
"Saved announced block for future import {} - {} saved block(s)",
block.toLogString(),
pendingBlocksManager.size());
return true;
}
return false;
}
}
/**
* Try to request the lowest ancestor for the given pending block or process the descendants if
* the ancestor is already in the chain
*/
private void maybeProcessPendingBlocks(final Block block) {
// Try to get the lowest ancestor pending for this block, so we can import it
final Optional<Block> lowestPending = pendingBlocksManager.pendingAncestorBlockOf(block);
if (lowestPending.isPresent()) {
final Block lowestPendingBlock = lowestPending.get();
// If the parent of the lowest ancestor is not in the chain, request it.
if (!protocolContext
.getBlockchain()
.contains(lowestPendingBlock.getHeader().getParentHash())) {
requestParentBlock(lowestPendingBlock);
} else {
LOG.trace("Parent block is already in the chain");
// if the parent is already imported, process its children
maybeProcessPendingChildrenBlocks(lowestPendingBlock);
}
}
}
private CompletableFuture<Block> validateAndProcessPendingBlock(
final BlockHeaderValidator blockHeaderValidator,
final Block block,

@ -108,6 +108,10 @@ public class PendingBlocksManager {
return pendingBlocks.containsKey(blockHash);
}
public int size() {
return pendingBlocks.size();
}
public List<Block> childrenOf(final Hash parentBlock) {
final Set<Hash> blocksByParent = pendingBlocksByParentHash.get(parentBlock);
if (blocksByParent == null || blocksByParent.size() == 0) {
@ -127,6 +131,22 @@ public class PendingBlocksManager {
.min(Comparator.comparing(BlockHeader::getNumber));
}
/**
* Get the lowest pending ancestor block saved for a block
*
* @param block target block
* @return An optional with the lowest ancestor pending block
*/
public Optional<Block> pendingAncestorBlockOf(final Block block) {
Block ancestor = block;
int ancestorLevel = 0;
while (pendingBlocks.containsKey(ancestor.getHeader().getParentHash())
&& ancestorLevel++ < pendingBlocks.size()) {
ancestor = pendingBlocks.get(ancestor.getHeader().getParentHash()).block();
}
return Optional.of(ancestor);
}
@Override
public String toString() {
return "PendingBlocksManager{"

@ -667,6 +667,36 @@ public abstract class AbstractBlockPropagationManagerTest {
});
}
@Test
public void shouldRequestLowestAnnouncedPendingBlockParent_twoMissingBlocks() {
// test if block propagation manager can recover if one block is missed
blockchainUtil.importFirstBlocks(2);
final List<Block> blocks = blockchainUtil.getBlocks().subList(2, 6);
blockPropagationManager.start();
// Create peer and responder
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());
// skip two block then create messages from blocklist
blocks.stream()
.skip(2)
.map(this::createNewBlockHashMessage)
.forEach(
message -> { // Broadcast new block hash message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, message);
});
peer.respondWhile(responder, peer::hasOutstandingRequests);
// assert all blocks were imported
blocks.forEach(
block -> {
assertThat(blockchain.contains(block.getHash())).isTrue();
});
}
private NewBlockHashesMessage createNewBlockHashMessage(final Block block) {
return NewBlockHashesMessage.create(
Collections.singletonList(

@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import org.junit.Before;
@ -163,7 +164,8 @@ public class PendingBlocksManagerTest {
pendingBlocksManager.registerPendingBlock(childBlockFromNodeTwo, NODE_ID_2);
// check blocks from node 1 in the cache (node 1 should replace the lowest priority block)
List<Block> pendingBlocksForParent = pendingBlocksManager.childrenOf(parentBlock.getHash());
final List<Block> pendingBlocksForParent =
pendingBlocksManager.childrenOf(parentBlock.getHash());
for (int i = 0; i < nbBlocks; i++) {
final Block foundBlock = childBlockFromNodeOne.poll();
if (i != 0) {
@ -236,7 +238,7 @@ public class PendingBlocksManagerTest {
// check blocks in the cache
// and verify remove the block with the lowest priority (BLOCK-2)
for (Block block : childBlockFromNodeOne) {
for (final Block block : childBlockFromNodeOne) {
if (block.getHeader().getNumber() == 2) {
assertThat(pendingBlocksManager.contains(block.getHash())).isFalse();
} else {
@ -259,4 +261,45 @@ public class PendingBlocksManagerTest {
assertThat(pendingBlocksManager.lowestAnnouncedBlock()).contains(parentBlock.getHeader());
}
@Test
public void shouldReturnLowestAncestorPendingBlock() {
final BlockDataGenerator gen = new BlockDataGenerator();
final Block parentBlock = gen.block();
final Block block = gen.nextBlock(parentBlock);
final Block child = gen.nextBlock(block);
final Block forkBlock = gen.nextBlock(parentBlock);
final Block forkChild = gen.nextBlock(forkBlock);
// register chain with one missing block
pendingBlocksManager.registerPendingBlock(block, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(child, NODE_ID_1);
// Register fork with one missing parent
pendingBlocksManager.registerPendingBlock(forkBlock, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(forkChild, NODE_ID_1);
// assert it is able to follow the chain
final Optional<Block> blockAncestor = pendingBlocksManager.pendingAncestorBlockOf(child);
assertThat(blockAncestor.get().getHeader().getHash()).isEqualTo(block.getHeader().getHash());
// assert it is able to follow the fork
final Optional<Block> forkAncestor = pendingBlocksManager.pendingAncestorBlockOf(forkChild);
assertThat(forkAncestor.get().getHeader().getHash()).isEqualTo(forkBlock.getHeader().getHash());
// Both forks result in the same parent
assertThat(forkAncestor.get().getHeader().getParentHash())
.isEqualTo(blockAncestor.get().getHeader().getParentHash());
}
@Test
public void shouldReturnLowestAncestorPendingBlock_sameBlock() {
final BlockDataGenerator gen = new BlockDataGenerator();
final Block block = gen.block();
pendingBlocksManager.registerPendingBlock(block, NODE_ID_1);
final Optional<Block> b = pendingBlocksManager.pendingAncestorBlockOf(block);
assertThat(b).contains(block);
}
}

Loading…
Cancel
Save