|
|
|
@ -109,32 +109,6 @@ public class BlockPropagationManager<C> { |
|
|
|
|
.subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void validateAndBroadcastBlock(final Block block) { |
|
|
|
|
final ProtocolSpec<C> protocolSpec = |
|
|
|
|
protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); |
|
|
|
|
final BlockHeaderValidator<C> blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); |
|
|
|
|
final BlockHeader parent = |
|
|
|
|
protocolContext |
|
|
|
|
.getBlockchain() |
|
|
|
|
.getBlockHeader(block.getHeader().getParentHash()) |
|
|
|
|
.orElseThrow( |
|
|
|
|
() -> |
|
|
|
|
new IllegalArgumentException( |
|
|
|
|
"Incapable of retrieving header from non-existent parent of " |
|
|
|
|
+ block.getHeader().getNumber() |
|
|
|
|
+ ".")); |
|
|
|
|
if (blockHeaderValidator.validateHeader( |
|
|
|
|
block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { |
|
|
|
|
final UInt256 totalDifficulty = |
|
|
|
|
protocolContext |
|
|
|
|
.getBlockchain() |
|
|
|
|
.getTotalDifficultyByHash(parent.getHash()) |
|
|
|
|
.get() |
|
|
|
|
.plus(block.getHeader().getDifficulty()); |
|
|
|
|
blockBroadcaster.propagate(block, totalDifficulty); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchain blockchain) { |
|
|
|
|
// Check to see if any of our pending blocks are now ready for import
|
|
|
|
|
final Block newBlock = blockAddedEvent.getBlock(); |
|
|
|
@ -263,6 +237,16 @@ public class BlockPropagationManager<C> { |
|
|
|
|
return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void broadcastBlock(final Block block, final BlockHeader parent) { |
|
|
|
|
final UInt256 totalDifficulty = |
|
|
|
|
protocolContext |
|
|
|
|
.getBlockchain() |
|
|
|
|
.getTotalDifficultyByHash(parent.getHash()) |
|
|
|
|
.get() |
|
|
|
|
.plus(block.getHeader().getDifficulty()); |
|
|
|
|
blockBroadcaster.propagate(block, totalDifficulty); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@VisibleForTesting |
|
|
|
|
CompletableFuture<Block> importOrSavePendingBlock(final Block block) { |
|
|
|
|
// Synchronize to avoid race condition where block import event fires after the
|
|
|
|
@ -292,19 +276,54 @@ public class BlockPropagationManager<C> { |
|
|
|
|
return CompletableFuture.completedFuture(block); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ethContext.getScheduler().scheduleSyncWorkerTask(() -> validateAndBroadcastBlock(block)); |
|
|
|
|
final BlockHeader parent = |
|
|
|
|
protocolContext |
|
|
|
|
.getBlockchain() |
|
|
|
|
.getBlockHeader(block.getHeader().getParentHash()) |
|
|
|
|
.orElseThrow( |
|
|
|
|
() -> |
|
|
|
|
new IllegalArgumentException( |
|
|
|
|
"Incapable of retrieving header from non-existent parent of " |
|
|
|
|
+ block.getHeader().getNumber() |
|
|
|
|
+ ".")); |
|
|
|
|
|
|
|
|
|
// Import block
|
|
|
|
|
final PersistBlockTask<C> importTask = |
|
|
|
|
PersistBlockTask.create( |
|
|
|
|
protocolSchedule, protocolContext, block, HeaderValidationMode.FULL, metricsSystem); |
|
|
|
|
final ProtocolSpec<C> protocolSpec = |
|
|
|
|
protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); |
|
|
|
|
final BlockHeaderValidator<C> blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); |
|
|
|
|
return ethContext |
|
|
|
|
.getScheduler() |
|
|
|
|
.scheduleSyncWorkerTask(importTask::run) |
|
|
|
|
.scheduleSyncWorkerTask( |
|
|
|
|
() -> validateAndProcessPendingBlock(blockHeaderValidator, block, parent)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private CompletableFuture<Block> validateAndProcessPendingBlock( |
|
|
|
|
final BlockHeaderValidator<C> blockHeaderValidator, |
|
|
|
|
final Block block, |
|
|
|
|
final BlockHeader parent) { |
|
|
|
|
if (blockHeaderValidator.validateHeader( |
|
|
|
|
block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { |
|
|
|
|
ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); |
|
|
|
|
return runImportTask(block); |
|
|
|
|
} else { |
|
|
|
|
importingBlocks.remove(block.getHash()); |
|
|
|
|
LOG.warn( |
|
|
|
|
"Failed to import announced block {} ({}).", |
|
|
|
|
block.getHeader().getNumber(), |
|
|
|
|
block.getHash()); |
|
|
|
|
return CompletableFuture.completedFuture(block); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private CompletableFuture<Block> runImportTask(final Block block) { |
|
|
|
|
final PersistBlockTask<C> importTask = |
|
|
|
|
PersistBlockTask.create( |
|
|
|
|
protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); |
|
|
|
|
return importTask |
|
|
|
|
.run() |
|
|
|
|
.whenComplete( |
|
|
|
|
(r, t) -> { |
|
|
|
|
(result, throwable) -> { |
|
|
|
|
importingBlocks.remove(block.getHash()); |
|
|
|
|
if (t != null) { |
|
|
|
|
if (throwable != null) { |
|
|
|
|
LOG.warn( |
|
|
|
|
"Failed to import announced block {} ({}).", |
|
|
|
|
block.getHeader().getNumber(), |
|
|
|
|