Incremental Optimization(s) on BlockBroadcaster (#911)

--> adapt `BlockBroadcaster` to use `send` method on `EthPeer`

--> adapt `EthProtocolMamnager` to utilize `BlockBroadcaster` to disseminate newly mined block

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
S. Matthew English 6 years ago committed by GitHub
parent 768293385f
commit ccb30641fa
  1. 12
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java
  2. 34
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java
  3. 23
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockBroadcaster.java
  4. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java
  5. 17
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockBroadcasterTest.java

@ -14,7 +14,6 @@ package tech.pegasys.pantheon.ethereum.eth.manager;
import static com.google.common.base.Preconditions.checkArgument;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.ChainState.EstimatedHeightListener;
import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream;
@ -24,7 +23,6 @@ import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockBodiesMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetReceiptsMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.NewBlockMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
@ -132,16 +130,6 @@ public class EthPeer {
}
}
public void propagateBlock(final Block block, final UInt256 totalDifficulty) {
registerKnownBlock(block.getHash());
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
try {
connection.sendForProtocol(protocolName, newBlockMessage);
} catch (PeerNotConnected e) {
LOG.trace("Failed to broadcast new block to peer", e);
}
}
public ResponseStream getHeadersByHash(
final Hash hash, final int maxHeaders, final int skip, final boolean reverse)
throws PeerNotConnected {

@ -20,8 +20,8 @@ import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.EthProtocol;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.eth.messages.NewBlockMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.StatusMessage;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockBroadcaster;
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
@ -37,7 +37,6 @@ import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@ -64,6 +63,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
private final boolean fastSyncEnabled;
private List<Capability> supportedCapabilities;
private final Blockchain blockchain;
private final BlockBroadcaster blockBroadcaster;
EthProtocolManager(
final Blockchain blockchain,
@ -73,7 +73,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final int requestLimit,
final EthScheduler scheduler) {
this.networkId = networkId;
this.scheduler = scheduler;
this.blockchain = blockchain;
this.fastSyncEnabled = fastSyncEnabled;
@ -85,6 +84,8 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
ethMessages = new EthMessages();
ethContext = new EthContext(getSupportedProtocol(), ethPeers, ethMessages, scheduler);
this.blockBroadcaster = new BlockBroadcaster(ethContext);
// Set up request handlers
new EthServer(blockchain, worldStateArchive, ethMessages, requestLimit);
}
@ -271,24 +272,13 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
@Override
public void blockMined(final Block block) {
// This assumes the block has already been included in the chain
final Optional<UInt256> totalDifficulty = blockchain.getTotalDifficultyByHash(block.getHash());
if (!totalDifficulty.isPresent()) {
throw new IllegalStateException(
"Unable to get total difficulty from blockchain for mined block.");
}
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty.get());
ethPeers
.availablePeers()
.forEach(
peer -> {
try {
peer.send(newBlockMessage);
} catch (final PeerNotConnected ex) {
// Peers may disconnect while traversing the list, this is a normal occurrence.
}
});
final UInt256 totalDifficulty =
blockchain
.getTotalDifficultyByHash(block.getHash())
.orElseThrow(
() ->
new IllegalStateException(
"Unable to get total difficulty from blockchain for mined block."));
blockBroadcaster.propagate(block, totalDifficulty);
}
}

@ -14,21 +14,36 @@ package tech.pegasys.pantheon.ethereum.eth.sync;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.messages.NewBlockMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.util.uint.UInt256;
class BlockBroadcaster {
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class BlockBroadcaster {
private static final Logger LOG = LogManager.getLogger();
private final EthContext ethContext;
BlockBroadcaster(final EthContext ethContext) {
public BlockBroadcaster(final EthContext ethContext) {
this.ethContext = ethContext;
}
void propagate(final Block block, final UInt256 difficulty) {
public void propagate(final Block block, final UInt256 totalDifficulty) {
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
ethContext
.getEthPeers()
.availablePeers()
.filter(ethPeer -> !ethPeer.hasSeenBlock(block.getHash()))
.forEach(ethPeer -> ethPeer.propagateBlock(block, difficulty));
.forEach(
ethPeer -> {
ethPeer.registerKnownBlock(block.getHash());
try {
ethPeer.send(newBlockMessage);
} catch (PeerConnection.PeerNotConnected e) {
LOG.trace("Failed to broadcast new block to peer", e);
}
});
}
}

@ -110,7 +110,7 @@ public class BlockPropagationManager<C> {
.subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork);
}
protected void validateAndBroadcastBlock(final Block block) {
private void validateAndBroadcastBlock(final Block block) {
final ProtocolSpec<C> protocolSpec =
protocolSchedule.getByBlockNumber(block.getHeader().getNumber());
final BlockHeaderValidator<C> blockHeaderValidator = protocolSpec.getBlockHeaderValidator();

@ -25,6 +25,8 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.eth.messages.NewBlockMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.Collections;
@ -35,7 +37,7 @@ import org.junit.Test;
public class BlockBroadcasterTest {
@Test
public void blockPropagationUnitTest() {
public void blockPropagationUnitTest() throws PeerConnection.PeerNotConnected {
final EthPeer ethPeer = mock(EthPeer.class);
final EthPeers ethPeers = mock(EthPeers.class);
when(ethPeers.availablePeers()).thenReturn(Stream.of(ethPeer));
@ -45,14 +47,16 @@ public class BlockBroadcasterTest {
final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext);
final Block block = generateBlock();
final NewBlockMessage newBlockMessage =
NewBlockMessage.create(block, block.getHeader().getDifficulty());
blockBroadcaster.propagate(block, UInt256.ZERO);
verify(ethPeer, times(1)).propagateBlock(any(), any());
verify(ethPeer, times(1)).send(newBlockMessage);
}
@Test
public void blockPropagationUnitTestSeenUnseen() {
public void blockPropagationUnitTestSeenUnseen() throws PeerConnection.PeerNotConnected {
final EthPeer ethPeer0 = mock(EthPeer.class);
when(ethPeer0.hasSeenBlock(any())).thenReturn(true);
@ -66,10 +70,13 @@ public class BlockBroadcasterTest {
final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext);
final Block block = generateBlock();
final NewBlockMessage newBlockMessage =
NewBlockMessage.create(block, block.getHeader().getDifficulty());
blockBroadcaster.propagate(block, UInt256.ZERO);
verify(ethPeer0, never()).propagateBlock(any(), any());
verify(ethPeer1, times(1)).propagateBlock(any(), any());
verify(ethPeer0, never()).send(newBlockMessage);
verify(ethPeer1, times(1)).send(newBlockMessage);
}
private Block generateBlock() {

Loading…
Cancel
Save