[NC-2058] Improve block propagation time (#808)

* o

* add test

* clean up

* scaffolding

* update

* update

* comments

* add test

* update

* update ii

* format

* update ii

* fix

* verifyBroadcastBlockInvocation

* test

* update

* update to difficulty calculation

* remove BlockBroadcasterTest from this pr

* update

* update

* update II

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
S. Matthew English 6 years ago committed by GitHub
parent bf710da941
commit f7849ac06e
  1. 1
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java
  2. 34
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockBroadcaster.java
  3. 47
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java
  4. 3
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java
  5. 32
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java

@ -133,6 +133,7 @@ public class EthPeer {
}
public void propagateBlock(final Block block, final UInt256 totalDifficulty) {
registerKnownBlock(block.getHash());
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
try {
connection.sendForProtocol(protocolName, newBlockMessage);

@ -0,0 +1,34 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.util.uint.UInt256;
class BlockBroadcaster {
private final EthContext ethContext;
BlockBroadcaster(final EthContext ethContext) {
this.ethContext = ethContext;
}
void propagate(final Block block, final UInt256 difficulty) {
ethContext
.getEthPeers()
.availablePeers()
.filter(ethPeer -> !ethPeer.hasSeenBlock(block.getHash()))
.forEach(ethPeer -> ethPeer.propagateBlock(block, difficulty));
}
}

@ -17,6 +17,7 @@ import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent.EventType;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
@ -30,8 +31,10 @@ import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetBlockFromPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.metrics.LabelledMetric;
@ -63,6 +66,7 @@ public class BlockPropagationManager<C> {
private final EthContext ethContext;
private final SyncState syncState;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final BlockBroadcaster blockBroadcaster;
private final AtomicBoolean started = new AtomicBoolean(false);
@ -77,13 +81,14 @@ public class BlockPropagationManager<C> {
final EthContext ethContext,
final SyncState syncState,
final PendingBlocks pendingBlocks,
final LabelledMetric<OperationTimer> ethTasksTimer) {
final LabelledMetric<OperationTimer> ethTasksTimer,
final BlockBroadcaster blockBroadcaster) {
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
this.blockBroadcaster = blockBroadcaster;
this.syncState = syncState;
this.pendingBlocks = pendingBlocks;
}
@ -105,6 +110,32 @@ public class BlockPropagationManager<C> {
.subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork);
}
protected void validateAndBroadcastBlock(final Block block) {
final ProtocolSpec<C> protocolSpec =
protocolSchedule.getByBlockNumber(block.getHeader().getNumber());
final BlockHeaderValidator<C> blockHeaderValidator = protocolSpec.getBlockHeaderValidator();
final BlockHeader parent =
protocolContext
.getBlockchain()
.getBlockHeader(block.getHeader().getParentHash())
.orElseThrow(
() ->
new IllegalArgumentException(
"Incapable of retrieving header from non-existent parent of "
+ block.getHeader().getNumber()
+ "."));
if (blockHeaderValidator.validateHeader(
block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) {
final UInt256 totalDifficulty =
protocolContext
.getBlockchain()
.getTotalDifficultyByHash(parent.getHash())
.get()
.plus(block.getHeader().getDifficulty());
blockBroadcaster.propagate(block, totalDifficulty);
}
}
private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchain blockchain) {
// Check to see if any of our pending blocks are now ready for import
final Block newBlock = blockAddedEvent.getBlock();
@ -144,13 +175,6 @@ public class BlockPropagationManager<C> {
}
}
void broadcastBlock(final Block block, final UInt256 difficulty) {
ethContext
.getEthPeers()
.availablePeers()
.forEach(ethPeer -> ethPeer.propagateBlock(block, difficulty));
}
void handleNewBlockFromNetwork(final EthMessage message) {
final Blockchain blockchain = protocolContext.getBlockchain();
final NewBlockMessage newBlockMessage = NewBlockMessage.readFrom(message.getData());
@ -158,9 +182,6 @@ public class BlockPropagationManager<C> {
final Block block = newBlockMessage.block(protocolSchedule);
final UInt256 totalDifficulty = newBlockMessage.totalDifficulty(protocolSchedule);
// TODO: Extract broadcast functionality to independent class.
// broadcastBlock(block, totalDifficulty);
message.getPeer().chainState().updateForAnnouncedBlock(block.getHeader(), totalDifficulty);
// Return early if we don't care about this block
@ -272,6 +293,8 @@ public class BlockPropagationManager<C> {
return CompletableFuture.completedFuture(block);
}
validateAndBroadcastBlock(block);
// Import block
final PersistBlockTask<C> importTask =
PersistBlockTask.create(

@ -72,7 +72,8 @@ public class DefaultSynchronizer<C> implements Synchronizer {
ethContext,
syncState,
new PendingBlocks(),
ethTasksTimer);
ethTasksTimer,
new BlockBroadcaster(ethContext));
ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer);

@ -62,6 +62,7 @@ public class BlockPropagationManagerTest {
private ProtocolSchedule<Void> protocolSchedule;
private ProtocolContext<Void> protocolContext;
private MutableBlockchain blockchain;
private BlockBroadcaster blockBroadcaster;
private EthProtocolManager ethProtocolManager;
private BlockPropagationManager<Void> blockPropagationManager;
private SynchronizerConfiguration syncConfig;
@ -90,6 +91,7 @@ public class BlockPropagationManagerTest {
EthProtocolManagerTestUtil.create(blockchain, blockchainUtil.getWorldArchive());
syncConfig = SynchronizerConfiguration.builder().blockPropagationRange(-3, 5).build();
syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
blockBroadcaster = mock(BlockBroadcaster.class);
blockPropagationManager =
new BlockPropagationManager<>(
syncConfig,
@ -98,7 +100,8 @@ public class BlockPropagationManagerTest {
ethProtocolManager.ethContext(),
syncState,
pendingBlocks,
ethTasksTimer);
ethTasksTimer,
blockBroadcaster);
}
@Test
@ -471,7 +474,8 @@ public class BlockPropagationManagerTest {
ethProtocolManager.ethContext(),
syncState,
pendingBlocks,
ethTasksTimer);
ethTasksTimer,
blockBroadcaster);
final BlockDataGenerator gen = new BlockDataGenerator();
// Import some blocks
@ -551,7 +555,8 @@ public class BlockPropagationManagerTest {
ethContext,
syncState,
pendingBlocks,
ethTasksTimer);
ethTasksTimer,
blockBroadcaster);
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
@ -561,4 +566,25 @@ public class BlockPropagationManagerTest {
verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class));
}
@Test
public void verifyBroadcastBlockInvocation() {
blockchainUtil.importFirstBlocks(2);
final Block block = blockchainUtil.getBlock(2);
blockPropagationManager.start();
// Setup peer and messages
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final UInt256 totalDifficulty = fullBlockchain.getTotalDifficultyByHash(block.getHash()).get();
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, newBlockMessage);
final Responder responder = RespondingEthPeer.blockchainResponder(fullBlockchain);
peer.respondWhile(responder, peer::hasOutstandingRequests);
verify(blockBroadcaster, times(1)).propagate(block, totalDifficulty);
}
}

Loading…
Cancel
Save