diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java index 5fc4a56c80..06867e22e1 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java @@ -133,6 +133,7 @@ public class EthPeer { } public void propagateBlock(final Block block, final UInt256 totalDifficulty) { + registerKnownBlock(block.getHash()); final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty); try { connection.sendForProtocol(protocolName, newBlockMessage); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockBroadcaster.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockBroadcaster.java new file mode 100644 index 0000000000..fecffaaf4c --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockBroadcaster.java @@ -0,0 +1,34 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync; + +import tech.pegasys.pantheon.ethereum.core.Block; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.util.uint.UInt256; + +class BlockBroadcaster { + + private final EthContext ethContext; + + BlockBroadcaster(final EthContext ethContext) { + this.ethContext = ethContext; + } + + void propagate(final Block block, final UInt256 difficulty) { + ethContext + .getEthPeers() + .availablePeers() + .filter(ethPeer -> !ethPeer.hasSeenBlock(block.getHash())) + .forEach(ethPeer -> ethPeer.propagateBlock(block, difficulty)); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index dca15d6539..a05764cd41 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -17,6 +17,7 @@ import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent; import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent.EventType; import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.core.Block; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; @@ -30,8 +31,10 @@ import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetBlockFromPeerTask; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask; +import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.rlp.RLPException; import tech.pegasys.pantheon.metrics.LabelledMetric; @@ -63,6 +66,7 @@ public class BlockPropagationManager { private final EthContext ethContext; private final SyncState syncState; private final LabelledMetric ethTasksTimer; + private final BlockBroadcaster blockBroadcaster; private final AtomicBoolean started = new AtomicBoolean(false); @@ -77,13 +81,14 @@ public class BlockPropagationManager { final EthContext ethContext, final SyncState syncState, final PendingBlocks pendingBlocks, - final LabelledMetric ethTasksTimer) { + final LabelledMetric ethTasksTimer, + final BlockBroadcaster blockBroadcaster) { this.config = config; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; this.ethTasksTimer = ethTasksTimer; - + this.blockBroadcaster = blockBroadcaster; this.syncState = syncState; this.pendingBlocks = pendingBlocks; } @@ -105,6 +110,32 @@ public class BlockPropagationManager { .subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork); } + protected void validateAndBroadcastBlock(final Block block) { + final ProtocolSpec protocolSpec = + protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); + final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); + final BlockHeader parent = + protocolContext + .getBlockchain() + .getBlockHeader(block.getHeader().getParentHash()) + .orElseThrow( + () -> + new IllegalArgumentException( + "Incapable of retrieving header from non-existent parent of " + + block.getHeader().getNumber() + + ".")); + if (blockHeaderValidator.validateHeader( + block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { + final UInt256 totalDifficulty = + protocolContext + .getBlockchain() + .getTotalDifficultyByHash(parent.getHash()) + .get() + .plus(block.getHeader().getDifficulty()); + blockBroadcaster.propagate(block, totalDifficulty); + } + } + private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchain blockchain) { // Check to see if any of our pending blocks are now ready for import final Block newBlock = blockAddedEvent.getBlock(); @@ -144,13 +175,6 @@ public class BlockPropagationManager { } } - void broadcastBlock(final Block block, final UInt256 difficulty) { - ethContext - .getEthPeers() - .availablePeers() - .forEach(ethPeer -> ethPeer.propagateBlock(block, difficulty)); - } - void handleNewBlockFromNetwork(final EthMessage message) { final Blockchain blockchain = protocolContext.getBlockchain(); final NewBlockMessage newBlockMessage = NewBlockMessage.readFrom(message.getData()); @@ -158,9 +182,6 @@ public class BlockPropagationManager { final Block block = newBlockMessage.block(protocolSchedule); final UInt256 totalDifficulty = newBlockMessage.totalDifficulty(protocolSchedule); - // TODO: Extract broadcast functionality to independent class. - // broadcastBlock(block, totalDifficulty); - message.getPeer().chainState().updateForAnnouncedBlock(block.getHeader(), totalDifficulty); // Return early if we don't care about this block @@ -272,6 +293,8 @@ public class BlockPropagationManager { return CompletableFuture.completedFuture(block); } + validateAndBroadcastBlock(block); + // Import block final PersistBlockTask importTask = PersistBlockTask.create( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index a177da7725..9a50612103 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -72,7 +72,8 @@ public class DefaultSynchronizer implements Synchronizer { ethContext, syncState, new PendingBlocks(), - ethTasksTimer); + ethTasksTimer, + new BlockBroadcaster(ethContext)); ChainHeadTracker.trackChainHeadForPeers( ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java index 71603e0a02..97f5239776 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java @@ -62,6 +62,7 @@ public class BlockPropagationManagerTest { private ProtocolSchedule protocolSchedule; private ProtocolContext protocolContext; private MutableBlockchain blockchain; + private BlockBroadcaster blockBroadcaster; private EthProtocolManager ethProtocolManager; private BlockPropagationManager blockPropagationManager; private SynchronizerConfiguration syncConfig; @@ -90,6 +91,7 @@ public class BlockPropagationManagerTest { EthProtocolManagerTestUtil.create(blockchain, blockchainUtil.getWorldArchive()); syncConfig = SynchronizerConfiguration.builder().blockPropagationRange(-3, 5).build(); syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers()); + blockBroadcaster = mock(BlockBroadcaster.class); blockPropagationManager = new BlockPropagationManager<>( syncConfig, @@ -98,7 +100,8 @@ public class BlockPropagationManagerTest { ethProtocolManager.ethContext(), syncState, pendingBlocks, - ethTasksTimer); + ethTasksTimer, + blockBroadcaster); } @Test @@ -471,7 +474,8 @@ public class BlockPropagationManagerTest { ethProtocolManager.ethContext(), syncState, pendingBlocks, - ethTasksTimer); + ethTasksTimer, + blockBroadcaster); final BlockDataGenerator gen = new BlockDataGenerator(); // Import some blocks @@ -551,7 +555,8 @@ public class BlockPropagationManagerTest { ethContext, syncState, pendingBlocks, - ethTasksTimer); + ethTasksTimer, + blockBroadcaster); blockchainUtil.importFirstBlocks(2); final Block nextBlock = blockchainUtil.getBlock(2); @@ -561,4 +566,25 @@ public class BlockPropagationManagerTest { verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class)); } + + @Test + public void verifyBroadcastBlockInvocation() { + blockchainUtil.importFirstBlocks(2); + final Block block = blockchainUtil.getBlock(2); + blockPropagationManager.start(); + + // Setup peer and messages + final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); + + final UInt256 totalDifficulty = fullBlockchain.getTotalDifficultyByHash(block.getHash()).get(); + final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty); + + // Broadcast message + EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, newBlockMessage); + + final Responder responder = RespondingEthPeer.blockchainResponder(fullBlockchain); + peer.respondWhile(responder, peer::hasOutstandingRequests); + + verify(blockBroadcaster, times(1)).propagate(block, totalDifficulty); + } }