Increase node diversity when downloading blocks (#2033)

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
Co-authored-by: Ratan Rai Sur <ratan.r.sur@gmail.com>
pull/2042/head
matkt 4 years ago committed by GitHub
parent 30ef44330e
commit f21ccccfa3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CHANGELOG.md
  2. 97
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeersTask.java
  3. 23
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java
  4. 59
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManagerTest.java

@ -3,10 +3,12 @@
## 21.1.3
### Additions and Improvements
* Increase node diversity when downloading blocks [\#2033](https://github.com/hyperledger/besu/pull/2033)
### Bug Fixes
* Ethereum Node Records are now dynamically recalculated when we pass network upgrade blocks. This allows for better peering through transitions without needing to restart the node. [\#1998](https://github.com/hyperledger/besu/pull/1998)
### Early Access Features
#### Previously identified known issues

@ -0,0 +1,97 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/** Downloads a block from a peer. Will complete exceptionally if block cannot be downloaded. */
public class GetBlockFromPeersTask extends AbstractEthTask<AbstractPeerTask.PeerTaskResult<Block>> {
private static final Logger LOG = LogManager.getLogger();
private final List<EthPeer> peers;
private final EthContext ethContext;
private final ProtocolSchedule protocolSchedule;
private final Hash hash;
private final long blockNumber;
private final MetricsSystem metricsSystem;
protected GetBlockFromPeersTask(
final List<EthPeer> peers,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
super(metricsSystem);
this.peers = peers;
this.ethContext = ethContext;
this.blockNumber = blockNumber;
this.metricsSystem = metricsSystem;
this.protocolSchedule = protocolSchedule;
this.hash = hash;
}
public static GetBlockFromPeersTask create(
final List<EthPeer> peers,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
return new GetBlockFromPeersTask(
peers, protocolSchedule, ethContext, hash, blockNumber, metricsSystem);
}
@Override
protected void executeTask() {
LOG.debug("Downloading block {} from peers {}.", hash, peers.stream().map(EthPeer::toString));
getBlockFromPeers(peers);
}
private void getBlockFromPeers(final List<EthPeer> peers) {
if (peers.isEmpty()) {
result.completeExceptionally(new IncompleteResultsException());
}
final EthPeer peer = peers.get(0);
if (peer.isDisconnected()) {
getBlockFromPeers(peers.subList(1, peers.size()));
}
LOG.debug("Trying downloading block {} from peer {}.", hash, peer);
final AbstractPeerTask<Block> getBlockTask =
GetBlockFromPeerTask.create(protocolSchedule, ethContext, hash, blockNumber, metricsSystem)
.assignPeer(peer);
getBlockTask
.run()
.whenComplete(
(r, t) -> {
if (t != null) {
getBlockFromPeers(peers.subList(1, peers.size()));
} else {
result.complete(r);
}
});
}
}

@ -26,8 +26,7 @@ import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessage;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetBlockFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetBlockFromPeersTask;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage.NewBlockHash;
@ -229,8 +228,12 @@ public class BlockPropagationManager {
// Process known blocks we care about
for (final NewBlockHash newBlock : newBlocks) {
EthPeer bestPeer = ethContext.getEthPeers().bestPeer().orElse(message.getPeer());
processAnnouncedBlock(bestPeer, newBlock)
final List<EthPeer> peers =
ethContext.getEthPeers().streamBestPeers().collect(Collectors.toList());
if (!peers.contains(message.getPeer())) {
peers.add(message.getPeer());
}
processAnnouncedBlock(peers, newBlock)
.whenComplete((r, t) -> requestedBlocks.remove(newBlock.hash()));
}
} catch (final RLPException e) {
@ -243,15 +246,13 @@ public class BlockPropagationManager {
}
private CompletableFuture<Block> processAnnouncedBlock(
final EthPeer peer, final NewBlockHash newBlock) {
final AbstractPeerTask<Block> getBlockTask =
GetBlockFromPeerTask.create(
protocolSchedule, ethContext, newBlock.hash(), newBlock.number(), metricsSystem)
.assignPeer(peer);
final List<EthPeer> peers, final NewBlockHash newBlock) {
final GetBlockFromPeersTask getBlockTask =
GetBlockFromPeersTask.create(
peers, protocolSchedule, ethContext, newBlock.hash(), newBlock.number(), metricsSystem);
return getBlockTask
.run()
.thenCompose((r) -> importOrSavePendingBlock(r.getResult(), peer.nodeId()));
.thenCompose((r) -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()));
}
private void broadcastBlock(final Block block, final BlockHeader parent) {

@ -697,4 +697,63 @@ public class BlockPropagationManagerTest {
verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class));
}
@Test
public void shouldTryWithAnotherPeerWhenFailedDownloadingBlock() {
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
// Sanity check
assertThat(blockchain.contains(nextBlock.getHash())).isFalse();
blockPropagationManager.start();
// Setup peer and messages
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final RespondingEthPeer secondPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockHashesMessage nextAnnouncement =
NewBlockHashesMessage.create(
Collections.singletonList(
new NewBlockHashesMessage.NewBlockHash(
nextBlock.getHash(), nextBlock.getHeader().getNumber())));
// Broadcast first message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement);
peer.respondWhile(RespondingEthPeer.emptyResponder(), peer::hasOutstandingRequests);
secondPeer.respondWhile(
RespondingEthPeer.blockchainResponder(fullBlockchain), secondPeer::hasOutstandingRequests);
assertThat(blockchain.contains(nextBlock.getHash())).isTrue();
}
@Test
public void shouldThrowErrorWhenNoValidPeerAvailable() {
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
// Sanity check
assertThat(blockchain.contains(nextBlock.getHash())).isFalse();
blockPropagationManager.start();
// Setup peer and messages
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final RespondingEthPeer secondPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockHashesMessage nextAnnouncement =
NewBlockHashesMessage.create(
Collections.singletonList(
new NewBlockHashesMessage.NewBlockHash(
nextBlock.getHash(), nextBlock.getHeader().getNumber())));
// Broadcast first message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement);
peer.respondWhile(RespondingEthPeer.emptyResponder(), peer::hasOutstandingRequests);
secondPeer.respondWhile(RespondingEthPeer.emptyResponder(), secondPeer::hasOutstandingRequests);
assertThat(blockchain.contains(nextBlock.getHash())).isFalse();
}
}

Loading…
Cancel
Save