Refactor to async retrieve blocks, and change peer when retrying to get a block (#3326)

* Refactor to async retrieve blocks, and change peer when retrying to get a block

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Apply suggested changes

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Remove deprecated class

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Add more logs arond block synchronizer

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* First try do download the block from the peer that announced it

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Avoid redownload non annunced blocks

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Conditionally log at trace level

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Use max number of peers when retrying to download a block to try all peers

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Use the shared Slf4jLambdaHelper, instead of custom helper

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/3358/head
fab-10 3 years ago committed by GitHub
parent d1d9e49ba0
commit e6210f9209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      CHANGELOG.md
  2. 5
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java
  3. 3
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  4. 9
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  5. 5
      besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
  6. 2
      besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java
  7. 2
      consensus/ibftlegacy/src/test/java/org/hyperledger/besu/consensus/ibftlegacy/protocol/Istanbul99ProtocolManagerTest.java
  8. 4
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Block.java
  9. 15
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  10. 8
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java
  11. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTask.java
  12. 98
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeersTask.java
  13. 93
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTask.java
  14. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewBlockHashesMessage.java
  15. 202
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java
  16. 12
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTracker.java
  17. 9
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java
  18. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullImportBlockStep.java
  19. 18
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java
  20. 14
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManager.java
  21. 13
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManager.java
  22. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/PersistBlockTask.java
  23. 31
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthContextTestUtil.java
  24. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  25. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java
  26. 13
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java
  27. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java
  28. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java
  29. 2
      ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java

@ -5,10 +5,11 @@
### Additions and Improvements
- Updated besu-native to version 0.4.3 [#3331](https://github.com/hyperledger/besu/pull/3331)
- Refactor synchronizer to asynchronously retrieve blocks from peers, and to change peer when retrying to get a block. [#3326](https://github.com/hyperledger/besu/pull/3326)
### Bug Fixes
- Prevent node from peering to itself [#3342](https://github.com/hyperledger/besu/pull/3342)
- Fix an `IndexOutOfBoundsException` exception when getting block from peers. [#3304](https://github.com/hyperledger/besu/issues/3304)
## 22.1.0-RC3
- Changing the order in which we traverse the word state tree during fast sync. This should improve fast sync during subsequent pivot changes.

@ -159,6 +159,8 @@ public class ThreadBesuNodeRunner implements BesuNodeRunner {
.strictTransactionReplayProtectionEnabled(node.isStrictTxReplayProtectionEnabled())
.build();
final int maxPeers = 25;
final BesuController besuController =
builder
.synchronizerConfiguration(new SynchronizerConfiguration.Builder().build())
@ -178,6 +180,7 @@ public class ThreadBesuNodeRunner implements BesuNodeRunner {
.map(
(pkiConfig) -> new PkiBlockCreationConfigurationProvider().load(pkiConfig)))
.evmConfiguration(EvmConfiguration.DEFAULT)
.maxPeers(maxPeers)
.build();
final RunnerBuilder runnerBuilder = new RunnerBuilder();
@ -191,7 +194,7 @@ public class ThreadBesuNodeRunner implements BesuNodeRunner {
.discovery(node.isDiscoveryEnabled())
.p2pAdvertisedHost(node.getHostName())
.p2pListenPort(0)
.maxPeers(25)
.maxPeers(maxPeers)
.networkingConfiguration(node.getNetworkingConfiguration())
.jsonRpcConfiguration(node.jsonRpcConfiguration())
.webSocketConfiguration(node.webSocketConfiguration())

@ -1862,7 +1862,8 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
.requiredBlocks(requiredBlocks)
.reorgLoggingThreshold(reorgLoggingThreshold)
.evmConfiguration(unstableEvmOptions.toDomainObject())
.dataStorageConfiguration(unstableDataStorageOptions.toDomainObject());
.dataStorageConfiguration(unstableDataStorageOptions.toDomainObject())
.maxPeers(maxPeers);
}
private GraphQLConfiguration graphQLConfiguration() {

@ -115,6 +115,7 @@ public abstract class BesuControllerBuilder {
protected List<NodeMessagePermissioningProvider> messagePermissioningProviders =
Collections.emptyList();
protected EvmConfiguration evmConfiguration;
protected int maxPeers;
public BesuControllerBuilder storageProvider(final StorageProvider storageProvider) {
this.storageProvider = storageProvider;
@ -238,6 +239,11 @@ public abstract class BesuControllerBuilder {
return this;
}
public BesuControllerBuilder maxPeers(final int maxPeers) {
this.maxPeers = maxPeers;
return this;
}
public BesuController build() {
checkNotNull(genesisConfig, "Missing genesis config");
checkNotNull(syncConfig, "Missing sync config");
@ -309,7 +315,8 @@ public abstract class BesuControllerBuilder {
}
}
final EthPeers ethPeers =
new EthPeers(getSupportedProtocol(), clock, metricsSystem, messagePermissioningProviders);
new EthPeers(
getSupportedProtocol(), clock, metricsSystem, maxPeers, messagePermissioningProviders);
final EthMessages ethMessages = new EthMessages();
final EthMessages snapMessages = new EthMessages();

@ -203,6 +203,8 @@ public class BesuCommandTest extends CommandTestAbstract {
public void callingBesuCommandWithoutOptionsMustSyncWithDefaultValues() throws Exception {
parseCommand();
final int maxPeers = 25;
final ArgumentCaptor<EthNetworkConfig> ethNetworkArg =
ArgumentCaptor.forClass(EthNetworkConfig.class);
verify(mockRunnerBuilder).discovery(eq(true));
@ -215,7 +217,7 @@ public class BesuCommandTest extends CommandTestAbstract {
MAINNET_DISCOVERY_URL));
verify(mockRunnerBuilder).p2pAdvertisedHost(eq("127.0.0.1"));
verify(mockRunnerBuilder).p2pListenPort(eq(30303));
verify(mockRunnerBuilder).maxPeers(eq(25));
verify(mockRunnerBuilder).maxPeers(eq(maxPeers));
verify(mockRunnerBuilder).fractionRemoteConnectionsAllowed(eq(0.6f));
verify(mockRunnerBuilder).jsonRpcConfiguration(eq(DEFAULT_JSON_RPC_CONFIGURATION));
verify(mockRunnerBuilder).graphQLConfiguration(eq(DEFAULT_GRAPH_QL_CONFIGURATION));
@ -234,6 +236,7 @@ public class BesuCommandTest extends CommandTestAbstract {
verify(mockControllerBuilder).nodeKey(isNotNull());
verify(mockControllerBuilder).storageProvider(storageProviderArgumentCaptor.capture());
verify(mockControllerBuilder).gasLimitCalculator(eq(GasLimitCalculator.constant()));
verify(mockControllerBuilder).maxPeers(eq(maxPeers));
verify(mockControllerBuilder).build();
assertThat(storageProviderArgumentCaptor.getValue()).isNotNull();

@ -227,7 +227,7 @@ public abstract class CommandTestAbstract {
when(mockControllerBuilder.reorgLoggingThreshold(anyLong())).thenReturn(mockControllerBuilder);
when(mockControllerBuilder.dataStorageConfiguration(any())).thenReturn(mockControllerBuilder);
when(mockControllerBuilder.evmConfiguration(any())).thenReturn(mockControllerBuilder);
when(mockControllerBuilder.maxPeers(anyInt())).thenReturn(mockControllerBuilder);
// doReturn used because of generic BesuController
doReturn(mockController).when(mockControllerBuilder).build();
lenient().when(mockController.getProtocolManager()).thenReturn(mockEthProtocolManager);

@ -107,7 +107,7 @@ public class Istanbul99ProtocolManagerTest {
final CompletableFuture<Void> done = new CompletableFuture<>();
final EthScheduler ethScheduler = new DeterministicEthScheduler(() -> false);
EthPeers peers =
new EthPeers(Istanbul99Protocol.NAME, TestClock.fixed(), new NoOpMetricsSystem());
new EthPeers(Istanbul99Protocol.NAME, TestClock.fixed(), new NoOpMetricsSystem(), 25);
EthMessages messages = new EthMessages();
final BigInteger networkId = BigInteger.ONE;

@ -99,4 +99,8 @@ public class Block {
sb.append("body=").append(body);
return sb.append("}").toString();
}
public String toLogString() {
return getHeader().getNumber() + " (" + getHash() + ")";
}
}

@ -53,22 +53,29 @@ public class EthPeers {
private final String protocolName;
private final Clock clock;
private final List<NodeMessagePermissioningProvider> permissioningProviders;
private final int maxPeers;
private final Subscribers<ConnectCallback> connectCallbacks = Subscribers.create();
private final Subscribers<DisconnectCallback> disconnectCallbacks = Subscribers.create();
private final Collection<PendingPeerRequest> pendingRequests = new CopyOnWriteArrayList<>();
public EthPeers(final String protocolName, final Clock clock, final MetricsSystem metricsSystem) {
this(protocolName, clock, metricsSystem, Collections.emptyList());
public EthPeers(
final String protocolName,
final Clock clock,
final MetricsSystem metricsSystem,
final int maxPeers) {
this(protocolName, clock, metricsSystem, maxPeers, Collections.emptyList());
}
public EthPeers(
final String protocolName,
final Clock clock,
final MetricsSystem metricsSystem,
final int maxPeers,
final List<NodeMessagePermissioningProvider> permissioningProviders) {
this.protocolName = protocolName;
this.clock = clock;
this.permissioningProviders = permissioningProviders;
this.maxPeers = maxPeers;
metricsSystem.createIntegerGauge(
BesuMetricCategory.PEERS,
"pending_peer_requests_current",
@ -161,6 +168,10 @@ public class EthPeers {
return connections.size();
}
public int getMaxPeers() {
return maxPeers;
}
public Stream<EthPeer> streamAllPeers() {
return connections.values().stream();
}

@ -71,6 +71,10 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
assignedPeer = Optional.of(peer);
}
public Optional<EthPeer> getAssignedPeer() {
return assignedPeer;
}
@Override
protected void executeTask() {
if (result.isDone()) {
@ -109,7 +113,9 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
}
if (cause instanceof NoAvailablePeersException) {
LOG.info("No peers available, waiting for peers: {}", ethContext.getEthPeers().peerCount());
LOG.info(
"No useful peer available, waiting for more peers: {}",
ethContext.getEthPeers().peerCount());
// Wait for new peer to connect
final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext, metricsSystem);
executeSubTask(

@ -27,7 +27,6 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -64,7 +63,7 @@ public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
@Override
protected void executeTask() {
final String blockIdentifier = hash.map(Bytes::toHexString).orElse(Long.toString(blockNumber));
final String blockIdentifier = blockNumber + " (" + hash + ")";
LOG.debug(
"Downloading block {} from peer {}.",
blockIdentifier,
@ -82,6 +81,7 @@ public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
t.getCause());
result.completeExceptionally(t);
} else if (r.getResult().isEmpty()) {
r.getPeer().recordUselessResponse("Download block returned an empty result");
LOG.debug(
"Failed to download block {} from peer {} with empty result.",
blockIdentifier,

@ -1,98 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Downloads a block from a peer. Will complete exceptionally if block cannot be downloaded. */
public class GetBlockFromPeersTask extends AbstractEthTask<AbstractPeerTask.PeerTaskResult<Block>> {
private static final Logger LOG = LoggerFactory.getLogger(GetBlockFromPeersTask.class);
private final List<EthPeer> peers;
private final EthContext ethContext;
private final ProtocolSchedule protocolSchedule;
private final Optional<Hash> hash;
private final long blockNumber;
private final MetricsSystem metricsSystem;
protected GetBlockFromPeersTask(
final List<EthPeer> peers,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
super(metricsSystem);
this.peers = peers;
this.ethContext = ethContext;
this.blockNumber = blockNumber;
this.metricsSystem = metricsSystem;
this.protocolSchedule = protocolSchedule;
this.hash = hash;
}
public static GetBlockFromPeersTask create(
final List<EthPeer> peers,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
return new GetBlockFromPeersTask(
peers, protocolSchedule, ethContext, hash, blockNumber, metricsSystem);
}
@Override
protected void executeTask() {
LOG.debug("Downloading block {} from peers {}.", hash, peers.stream().map(EthPeer::toString));
getBlockFromPeers(peers);
}
private void getBlockFromPeers(final List<EthPeer> peers) {
if (peers.isEmpty()) {
result.completeExceptionally(new IncompleteResultsException());
}
final EthPeer peer = peers.get(0);
if (peer.isDisconnected()) {
getBlockFromPeers(peers.subList(1, peers.size()));
}
LOG.debug("Trying downloading block {} from peer {}.", hash, peer);
final AbstractPeerTask<Block> getBlockTask =
GetBlockFromPeerTask.create(protocolSchedule, ethContext, hash, blockNumber, metricsSystem)
.assignPeer(peer);
getBlockTask
.run()
.whenComplete(
(r, t) -> {
if (t != null) {
getBlockFromPeers(peers.subList(1, peers.size()));
} else {
result.complete(r);
}
});
}
}

@ -15,17 +15,20 @@
package org.hyperledger.besu.ethereum.eth.manager.task;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,49 +36,78 @@ import org.slf4j.LoggerFactory;
public class RetryingGetBlockFromPeersTask
extends AbstractRetryingPeerTask<AbstractPeerTask.PeerTaskResult<Block>> {
private static final int DEFAULT_MAX_RETRIES = 5;
private static final Logger LOG = LoggerFactory.getLogger(RetryingGetBlockFromPeersTask.class);
private final ProtocolContext protocolContext;
private final ProtocolSchedule protocolSchedule;
private final Optional<Hash> blockHash;
private final long blockNumber;
private final Set<EthPeer> triedPeers = new HashSet<>();
public RetryingGetBlockFromPeersTask(
protected RetryingGetBlockFromPeersTask(
final ProtocolContext protocolContext,
final EthContext ethContext,
final ProtocolSchedule protocolSchedule,
final MetricsSystem metricsSystem,
final int maxRetries,
final Optional<Hash> blockHash,
final long blockNumber,
final int maxRetries) {
final long blockNumber) {
super(ethContext, maxRetries, Objects::isNull, metricsSystem);
this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule;
this.blockHash = blockHash;
this.blockNumber = blockNumber;
}
public static RetryingGetBlockFromPeersTask create(
final EthContext ethContext,
final ProtocolContext protocolContext,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final MetricsSystem metricsSystem,
final int maxRetries,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
final long blockNumber) {
return new RetryingGetBlockFromPeersTask(
ethContext, protocolSchedule, metricsSystem, hash, blockNumber, DEFAULT_MAX_RETRIES);
protocolContext,
ethContext,
protocolSchedule,
metricsSystem,
maxRetries,
hash,
blockNumber);
}
@Override
public void assignPeer(final EthPeer peer) {
super.assignPeer(peer);
triedPeers.add(peer);
}
@Override
protected CompletableFuture<AbstractPeerTask.PeerTaskResult<Block>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
final GetBlockFromPeersTask getHeadersTask =
GetBlockFromPeersTask.create(
getEthContext().getEthPeers().streamAvailablePeers().collect(Collectors.toList()),
protocolSchedule,
getEthContext(),
blockHash,
blockNumber,
getMetricsSystem());
return executeSubTask(getHeadersTask::run)
final GetBlockFromPeerTask getBlockTask =
GetBlockFromPeerTask.create(
protocolSchedule, getEthContext(), blockHash, blockNumber, getMetricsSystem());
getBlockTask.assignPeer(
assignedPeer
.filter(unused -> getRetryCount() == 1) // first try with the assigned preferred peer
.orElseGet( // then selecting a new one from the pool
() -> {
assignPeer(selectNextPeer());
return getAssignedPeer().get();
}));
LOG.debug(
"Getting block {} ({}) from peer {}, attempt {}",
blockNumber,
blockHash,
getAssignedPeer(),
getRetryCount());
return executeSubTask(getBlockTask::run)
.thenApply(
peerResult -> {
result.complete(peerResult);
@ -83,16 +115,33 @@ public class RetryingGetBlockFromPeersTask
});
}
private EthPeer selectNextPeer() {
return getEthContext()
.getEthPeers()
.streamBestPeers()
.filter(peer -> !triedPeers.contains(peer))
.findFirst()
.orElseThrow(NoAvailablePeersException::new);
}
@Override
protected boolean isRetryableError(final Throwable error) {
return super.isRetryableError(error) || error instanceof IncompleteResultsException;
return (blockNumber > protocolContext.getBlockchain().getChainHeadBlockNumber())
&& (super.isRetryableError(error) || error instanceof IncompleteResultsException);
}
@Override
protected void handleTaskError(final Throwable error) {
if (getRetryCount() < getMaxRetries()) {
LOG.info(
"Failed to get block with hash {} and number {} retrying later", blockHash, blockNumber);
LOG.debug(
"Failed to get block {} ({}) from peer {}, attempt {}, retrying later",
blockNumber,
blockHash,
getAssignedPeer(),
getRetryCount());
} else {
LOG.warn(
"Failed to get block {} ({}) after {} retries", blockNumber, blockHash, getRetryCount());
}
super.handleTaskError(error);
}

@ -103,7 +103,7 @@ public final class NewBlockHashesMessage extends AbstractMessageData {
@Override
public String toString() {
return String.format("New Block Hash [%d: %s]", number, hash);
return number() + " (" + hash() + ")";
}
@Override

@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.sync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
@ -27,7 +29,6 @@ import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessage;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.GetBlockFromPeersTask;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlockFromPeersTask;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage;
@ -45,6 +46,7 @@ import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@ -75,8 +77,10 @@ public class BlockPropagationManager {
private final AtomicBoolean started = new AtomicBoolean(false);
private final Set<Hash> requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Hash> importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Hash> requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Long> requestedNonAnnouncedBlocks =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final PendingBlocksManager pendingBlocksManager;
BlockPropagationManager(
@ -128,12 +132,20 @@ public class BlockPropagationManager {
readyForImport = pendingBlocksManager.childrenOf(newBlock.getHash());
}
LOG.trace(
"Ready for import blocks found {} for {}",
readyForImport,
newBlock.getHeader().getNumber());
traceLambda(
LOG,
"Block added event type {} for block {}. Current status {}",
blockAddedEvent::getEventType,
newBlock::toLogString,
() -> this);
if (!readyForImport.isEmpty()) {
traceLambda(
LOG,
"Ready to import pending blocks found [{}] for block {}",
() -> readyForImport.stream().map(Block::toLogString).collect(Collectors.joining(", ")),
newBlock::toLogString);
final Supplier<CompletableFuture<List<Block>>> importBlocksTask =
PersistBlockTask.forUnorderedBlocks(
protocolSchedule,
@ -150,35 +162,51 @@ public class BlockPropagationManager {
if (r != null) {
LOG.info("Imported {} pending blocks", r.size());
}
if (t != null) {
LOG.error("Error importing pending blocks", t);
}
});
} else {
LOG.trace("Not ready for import blocks found for {}", newBlock.getHeader().getNumber());
traceLambda(
LOG, "There are no pending blocks ready to import for block {}", newBlock::toLogString);
maybeProcessNonAnnouncedBlocks(newBlock);
}
if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
final long head = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long cutoff = head + config.getBlockPropagationRange().lowerEndpoint();
pendingBlocksManager.purgeBlocksOlderThan(cutoff);
}
}
private void maybeProcessNonAnnouncedBlocks(final Block newBlock) {
final long localHeadBlockNumber = protocolContext.getBlockchain().getChainHeadBlockNumber();
if (newBlock.getHeader().getNumber() > localHeadBlockNumber) {
pendingBlocksManager
.lowestAnnouncedBlock()
.map(ProcessableBlockHeader::getNumber)
.ifPresent(
minAnnouncedBlockNumber -> {
long distance =
minAnnouncedBlockNumber
- protocolContext.getBlockchain().getChainHeadBlockNumber();
long distance = minAnnouncedBlockNumber - localHeadBlockNumber;
LOG.trace(
"Found lowest announced block {} with distance {}",
minAnnouncedBlockNumber,
distance);
long firstNonAnnouncedBlockNumber = newBlock.getHeader().getNumber() + 1;
if (distance < config.getBlockPropagationRange().upperEndpoint()
&& minAnnouncedBlockNumber > newBlock.getHeader().getNumber()) {
retrieveMissingAnnouncedBlock(newBlock.getHeader().getNumber() + 1);
&& minAnnouncedBlockNumber > firstNonAnnouncedBlockNumber) {
if (requestedNonAnnouncedBlocks.add(firstNonAnnouncedBlockNumber)) {
retrieveNonAnnouncedBlock(firstNonAnnouncedBlockNumber);
}
}
});
}
if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
final long head = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long cutoff = head + config.getBlockPropagationRange().lowerEndpoint();
pendingBlocksManager.purgeBlocksOlderThan(cutoff);
}
}
private void handleNewBlockFromNetwork(final EthMessage message) {
@ -186,6 +214,13 @@ public class BlockPropagationManager {
final NewBlockMessage newBlockMessage = NewBlockMessage.readFrom(message.getData());
try {
final Block block = newBlockMessage.block(protocolSchedule);
traceLambda(
LOG,
"New block from network {} from peer {}. Current status {}",
block::toLogString,
message::getPeer,
() -> this);
final Difficulty totalDifficulty = newBlockMessage.totalDifficulty(protocolSchedule);
message.getPeer().chainState().updateForAnnouncedBlock(block.getHeader(), totalDifficulty);
@ -195,12 +230,20 @@ public class BlockPropagationManager {
final long bestChainHeight = syncState.bestChainHeight(localChainHeight);
if (!shouldImportBlockAtHeight(
block.getHeader().getNumber(), localChainHeight, bestChainHeight)) {
traceLambda(
LOG,
"Do not import new block from network {}, current chain heights are: local {}, best {}",
block::toLogString,
() -> localChainHeight,
() -> bestChainHeight);
return;
}
if (pendingBlocksManager.contains(block.getHash())) {
traceLambda(LOG, "New block from network {} is already pending", block::toLogString);
return;
}
if (blockchain.contains(block.getHash())) {
traceLambda(LOG, "New block from network {} is already present", block::toLogString);
return;
}
@ -222,6 +265,13 @@ public class BlockPropagationManager {
// Register announced blocks
final List<NewBlockHash> announcedBlocks =
Lists.newArrayList(newBlockHashesMessage.getNewHashes());
traceLambda(
LOG,
"New block hashes from network {} from peer {}. Current status {}",
() -> toLogString(announcedBlocks),
message::getPeer,
() -> this);
for (final NewBlockHash announcedBlock : announcedBlocks) {
message.getPeer().registerKnownBlock(announcedBlock.hash());
message.getPeer().registerHeight(announcedBlock.hash(), announcedBlock.number());
@ -238,32 +288,28 @@ public class BlockPropagationManager {
// Filter for blocks we don't yet know about
final List<NewBlockHash> newBlocks = new ArrayList<>();
for (final NewBlockHash announcedBlock : relevantAnnouncements) {
if (requestedBlocks.contains(announcedBlock.hash())) {
continue;
}
if (pendingBlocksManager.contains(announcedBlock.hash())) {
LOG.trace("New block hash from network {} is already pending", announcedBlock);
continue;
}
if (importingBlocks.contains(announcedBlock.hash())) {
LOG.trace("New block hash from network {} is already importing", announcedBlock);
continue;
}
if (blockchain.contains(announcedBlock.hash())) {
LOG.trace("New block hash from network {} was already imported", announcedBlock);
continue;
}
if (requestedBlocks.add(announcedBlock.hash())) {
newBlocks.add(announcedBlock);
} else {
LOG.trace("New block hash from network {} was already requested", announcedBlock);
}
}
// Process known blocks we care about
for (final NewBlockHash newBlock : newBlocks) {
final List<EthPeer> peers =
ethContext.getEthPeers().streamBestPeers().collect(Collectors.toList());
if (!peers.contains(message.getPeer())) {
peers.add(message.getPeer());
}
processAnnouncedBlock(newBlock)
.whenComplete((r, t) -> requestedBlocks.remove(newBlock.hash()));
processAnnouncedBlock(message.getPeer(), newBlock);
}
} catch (final RLPException e) {
LOG.debug(
@ -274,29 +320,49 @@ public class BlockPropagationManager {
}
}
private CompletableFuture<Block> retrieveMissingAnnouncedBlock(final long blockNumber) {
LOG.trace("Retrieve missing announced block {} from peer", blockNumber);
final List<EthPeer> peers =
ethContext.getEthPeers().streamBestPeers().collect(Collectors.toList());
final GetBlockFromPeersTask getBlockTask =
GetBlockFromPeersTask.create(
peers, protocolSchedule, ethContext, Optional.empty(), blockNumber, metricsSystem);
return getBlockTask
.run()
.thenCompose((r) -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()));
private CompletableFuture<Block> retrieveNonAnnouncedBlock(final long blockNumber) {
LOG.trace("Retrieve non announced block {} from peers", blockNumber);
return getBlockFromPeers(Optional.empty(), blockNumber, Optional.empty());
}
private CompletableFuture<Block> processAnnouncedBlock(
final EthPeer peer, final NewBlockHash blockHash) {
LOG.trace("Retrieve announced block by header {} from peers", blockHash);
return getBlockFromPeers(Optional.of(peer), blockHash.number(), Optional.of(blockHash.hash()));
}
private CompletableFuture<Block> processAnnouncedBlock(final NewBlockHash newBlock) {
private CompletableFuture<Block> getBlockFromPeers(
final Optional<EthPeer> preferredPeer,
final long blockNumber,
final Optional<Hash> blockHash) {
final RetryingGetBlockFromPeersTask getBlockTask =
RetryingGetBlockFromPeersTask.create(
ethContext,
protocolContext,
protocolSchedule,
Optional.of(newBlock.hash()),
newBlock.number(),
metricsSystem);
return getBlockTask
.run()
.thenCompose((r) -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()));
ethContext,
metricsSystem,
ethContext.getEthPeers().getMaxPeers(),
blockHash,
blockNumber);
preferredPeer.ifPresent(getBlockTask::assignPeer);
return ethContext
.getScheduler()
.scheduleSyncWorkerTask(getBlockTask::run)
.thenCompose(r -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()))
.whenComplete(
(r, t) -> {
requestedNonAnnouncedBlocks.remove(blockNumber);
blockHash.ifPresentOrElse(
requestedBlocks::remove,
() -> {
if (r != null) {
// in case we successfully retrieved only by block number, when can remove
// the request by hash too
requestedBlocks.remove(r.getHash());
}
});
});
}
private void broadcastBlock(final Block block, final BlockHeader parent) {
@ -314,28 +380,25 @@ public class BlockPropagationManager {
// Synchronize to avoid race condition where block import event fires after the
// blockchain.contains() check and before the block is registered, causing onBlockAdded() to be
// invoked for the parent of this block before we are able to register it.
LOG.trace("Import or save pending block {}", block.getHeader().getNumber());
traceLambda(LOG, "Import or save pending block {}", block::toLogString);
synchronized (pendingBlocksManager) {
if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) {
// Block isn't connected to local chain, save it to pending blocks collection
if (pendingBlocksManager.registerPendingBlock(block, nodeId)) {
LOG.info(
"Saving announced block {} ({}) for future import",
block.getHeader().getNumber(),
block.getHash());
LOG.info("Saving announced block {} for future import", block.toLogString());
}
return CompletableFuture.completedFuture(block);
}
}
if (!importingBlocks.add(block.getHash())) {
// We're already importing this block.
traceLambda(LOG, "We're already importing this block {}", block::toLogString);
return CompletableFuture.completedFuture(block);
}
if (protocolContext.getBlockchain().contains(block.getHash())) {
// We've already imported this block.
traceLambda(LOG, "We've already imported this block {}", block::toLogString);
importingBlocks.remove(block.getHash());
return CompletableFuture.completedFuture(block);
}
@ -348,8 +411,7 @@ public class BlockPropagationManager {
() ->
new IllegalArgumentException(
"Incapable of retrieving header from non-existent parent of "
+ block.getHeader().getNumber()
+ "."));
+ block.toLogString()));
final ProtocolSpec protocolSpec =
protocolSchedule.getByBlockNumber(block.getHeader().getNumber());
final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator();
@ -374,10 +436,7 @@ public class BlockPropagationManager {
} else {
importingBlocks.remove(block.getHash());
badBlockManager.addBadBlock(block);
LOG.warn(
"Failed to import announced block {} ({}).",
block.getHeader().getNumber(),
block.getHash());
LOG.warn("Failed to import announced block {}", block.toLogString());
return CompletableFuture.completedFuture(block);
}
}
@ -397,10 +456,7 @@ public class BlockPropagationManager {
(result, throwable) -> {
importingBlocks.remove(block.getHash());
if (throwable != null) {
LOG.warn(
"Failed to import announced block {} ({}).",
block.getHeader().getNumber(),
block.getHash());
LOG.warn("Failed to import announced block {}", block.toLogString());
}
});
}
@ -414,4 +470,24 @@ public class BlockPropagationManager {
return importRange.contains(distanceFromLocalHead)
&& importRange.contains(distanceFromBestPeer);
}
private String toLogString(final Collection<NewBlockHash> newBlockHashs) {
return newBlockHashs.stream()
.map(NewBlockHash::toString)
.collect(Collectors.joining(", ", "[", "]"));
}
@Override
public String toString() {
return "BlockPropagationManager{"
+ "requestedBlocks="
+ requestedBlocks
+ ", requestedNonAnnounceBlocks="
+ requestedNonAnnouncedBlocks
+ ", importingBlocks="
+ importingBlocks
+ ", pendingBlocksManager="
+ pendingBlocksManager
+ '}';
}
}

@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.sync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@ -66,7 +68,7 @@ public class ChainHeadTracker implements ConnectCallback {
@Override
public void onPeerConnected(final EthPeer peer) {
LOG.debug("Requesting chain head info for {}", peer);
LOG.debug("Requesting chain head info from {}", peer);
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule,
ethContext,
@ -81,9 +83,13 @@ public class ChainHeadTracker implements ConnectCallback {
final BlockHeader chainHeadHeader = peerResult.getResult().get(0);
peer.chainState().update(chainHeadHeader);
trailingPeerLimiter.enforceTrailingPeerLimit();
debugLambda(
LOG,
"Retrieved chain head info {} from {}",
() -> chainHeadHeader.getNumber() + " (" + chainHeadHeader.getBlockHash() + ")",
() -> peer);
} else {
LOG.debug(
"Failed to retrieve chain head information. Disconnecting " + peer, error);
LOG.debug("Failed to retrieve chain head info. Disconnecting {}", peer, error);
peer.disconnect(DisconnectReason.USELESS_PEER);
}
});

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.EthTaskException;
@ -118,7 +119,7 @@ public class PipelineChainDownloader implements ChainDownloader {
pipelineErrorCounter.inc();
if (ExceptionUtils.rootCause(error) instanceof InvalidBlockException) {
LOG.warn(
"Invalid block detected. Disconnecting from sync target. {}",
"Invalid block detected. Disconnecting from sync target. {}",
ExceptionUtils.rootCause(error).getMessage());
syncState.disconnectSyncTarget(DisconnectReason.BREACH_OF_PROTOCOL);
}
@ -154,6 +155,12 @@ public class PipelineChainDownloader implements ChainDownloader {
}
syncState.setSyncTarget(target.peer(), target.commonAncestor());
currentDownloadPipeline = downloadPipelineFactory.createDownloadPipelineForSyncTarget(target);
debugLambda(
LOG,
"Starting download pipeline for sync target {}, common ancestor {} ({})",
() -> target,
() -> target.commonAncestor().getNumber(),
() -> target.commonAncestor().getBlockHash());
return scheduler.startPipeline(currentDownloadPipeline);
}
}

@ -59,7 +59,7 @@ public class FullImportBlockStep implements Consumer<Block> {
if (ethContext != null && ethContext.getEthPeers().peerCount() >= 0) {
peerCount = ethContext.getEthPeers().peerCount();
}
if (blockNumber % 200 == 0) {
if (blockNumber % 200 == 0 || LOG.isTraceEnabled()) {
final long nowMilli = Instant.now().toEpochMilli();
final long deltaMilli = nowMilli - lastReportMillis;
final String mgps =

@ -34,7 +34,11 @@ import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.services.pipeline.PipelineBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory {
private static final Logger LOG = LoggerFactory.getLogger(FullSyncDownloadPipelineFactory.class);
private final SynchronizerConfiguration syncConfig;
private final ProtocolSchedule protocolSchedule;
@ -113,8 +117,16 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory
final EthPeer peer, final BlockHeader lastCheckpointHeader) {
final boolean caughtUpToPeer =
peer.chainState().getEstimatedHeight() <= lastCheckpointHeader.getNumber();
return !peer.isDisconnected()
&& !caughtUpToPeer
&& !betterSyncTargetEvaluator.shouldSwitchSyncTarget(peer);
final boolean isDisconnected = peer.isDisconnected();
final boolean shouldSwitchSyncTarget = betterSyncTargetEvaluator.shouldSwitchSyncTarget(peer);
LOG.debug(
"shouldContinueDownloadingFromPeer? {}, disconnected {}, caughtUp {}, shouldSwitchSyncTarget {}",
peer,
isDisconnected,
caughtUpToPeer,
shouldSwitchSyncTarget);
return !isDisconnected && !caughtUpToPeer && !shouldSwitchSyncTarget;
}
}

@ -73,18 +73,26 @@ class FullSyncTargetManager extends SyncTargetManager {
protected CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget() {
final Optional<EthPeer> maybeBestPeer = ethContext.getEthPeers().bestPeerWithHeightEstimate();
if (!maybeBestPeer.isPresent()) {
LOG.info("No sync target, waiting for peers: {}", ethContext.getEthPeers().peerCount());
LOG.info(
"No sync target, waiting for peers. Current peers: {}",
ethContext.getEthPeers().peerCount());
return completedFuture(Optional.empty());
} else {
final EthPeer bestPeer = maybeBestPeer.get();
if (isSyncTargetReached(bestPeer)) {
// We're caught up to our best peer, try again when a new peer connects
LOG.debug(
"Caught up to best peer: {}, Peers: {}",
bestPeer.chainState().getEstimatedHeight(),
"Caught up to best peer: {}, chain state: {}. Current peers: {}",
bestPeer,
bestPeer.chainState(),
ethContext.getEthPeers().peerCount());
return completedFuture(Optional.empty());
}
LOG.debug(
"Best peer: {}, chain state: {}. Current peers: {}",
bestPeer,
bestPeer.chainState(),
ethContext.getEthPeers().peerCount());
return completedFuture(maybeBestPeer);
}
}

@ -126,4 +126,17 @@ public class PendingBlocksManager {
.map(Block::getHeader)
.min(Comparator.comparing(BlockHeader::getNumber));
}
@Override
public String toString() {
return "PendingBlocksManager{"
+ "pendingBlocks ["
+ pendingBlocks.values().stream()
.map(ImmutablePendingBlock::block)
.map(b -> b.getHeader().getNumber() + " (" + b.getHash() + ")")
.collect(Collectors.joining(", "))
+ "], pendingBlocksByParentHash="
+ pendingBlocksByParentHash
+ '}';
}
}

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.sync.tasks;
import static com.google.common.base.Preconditions.checkArgument;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Block;
@ -196,6 +197,7 @@ public class PersistBlockTask extends AbstractEthTask<Block> {
final ProtocolSpec protocolSpec =
protocolSchedule.getByBlockNumber(block.getHeader().getNumber());
final BlockImporter blockImporter = protocolSpec.getBlockImporter();
debugLambda(LOG, "Running import task for block {}", block::toLogString);
blockImported = blockImporter.importBlock(protocolContext, block, validateHeaders);
if (!blockImported) {
result.completeExceptionally(

@ -1,31 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.testutil.TestClock;
public class EthContextTestUtil {
private static final String PROTOCOL_NAME = "ETH";
public static EthContext createTestEthContext(final TimeoutPolicy timeoutPolicy) {
return new EthContext(
new EthPeers(PROTOCOL_NAME, TestClock.fixed(), new NoOpMetricsSystem()),
new EthMessages(),
new DeterministicEthScheduler(timeoutPolicy));
}
}

@ -114,7 +114,7 @@ public class EthProtocolManagerTestUtil {
final WorldStateArchive worldStateArchive,
final TransactionPool transactionPool,
final EthProtocolConfiguration configuration) {
EthPeers peers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), new NoOpMetricsSystem());
EthPeers peers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), new NoOpMetricsSystem(), 25);
EthMessages messages = new EthMessages();
return create(
@ -130,7 +130,7 @@ public class EthProtocolManagerTestUtil {
public static EthProtocolManager create(
final Blockchain blockchain, final EthScheduler ethScheduler) {
EthPeers peers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), new NoOpMetricsSystem());
EthPeers peers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), new NoOpMetricsSystem(), 25);
EthMessages messages = new EthMessages();
return create(

@ -84,7 +84,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
public void setupTest() {
peersDoTimeout = new AtomicBoolean(false);
peerCountToTimeout = new AtomicInteger(0);
ethPeers = spy(new EthPeers(EthProtocol.NAME, TestClock.fixed(), metricsSystem));
ethPeers = spy(new EthPeers(EthProtocol.NAME, TestClock.fixed(), metricsSystem, 25));
final EthMessages ethMessages = new EthMessages();
final EthScheduler ethScheduler =
new DeterministicEthScheduler(

@ -572,7 +572,9 @@ public abstract class AbstractBlockPropagationManagerTest {
.thenReturn(new CompletableFuture<>());
final EthContext ethContext =
new EthContext(
new EthPeers("eth", TestClock.fixed(), metricsSystem), new EthMessages(), ethScheduler);
new EthPeers("eth", TestClock.fixed(), metricsSystem, 25),
new EthMessages(),
ethScheduler);
final BlockPropagationManager blockPropagationManager =
new BlockPropagationManager(
syncConfig,
@ -629,7 +631,9 @@ public abstract class AbstractBlockPropagationManagerTest {
});
final EthContext ethContext =
new EthContext(
new EthPeers("eth", TestClock.fixed(), metricsSystem), new EthMessages(), ethScheduler);
new EthPeers("eth", TestClock.fixed(), metricsSystem, 25),
new EthMessages(),
ethScheduler);
final BlockPropagationManager blockPropagationManager =
new BlockPropagationManager(
syncConfig,
@ -673,7 +677,10 @@ public abstract class AbstractBlockPropagationManagerTest {
// Setup peer and messages
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final RespondingEthPeer secondPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 2);
// Pretend the second peer is busier, so the first is selected a first
when(spy(secondPeer.getEthPeer()).outstandingRequests()).thenReturn(1);
final NewBlockHashesMessage nextAnnouncement =
NewBlockHashesMessage.create(

@ -123,7 +123,7 @@ public class TestNode implements Closeable {
final EthMessages ethMessages = new EthMessages();
final EthPeers ethPeers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), metricsSystem);
final EthPeers ethPeers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), metricsSystem, 25);
final EthScheduler scheduler = new EthScheduler(1, 1, 1, metricsSystem);
final EthContext ethContext = new EthContext(ethPeers, ethMessages, scheduler);

@ -60,7 +60,7 @@ public class TransactionPoolFactoryTest {
when(blockchain.getBlockByNumber(anyLong())).thenReturn(Optional.of(mock(Block.class)));
when(blockchain.getBlockHashByNumber(anyLong())).thenReturn(Optional.of(mock(Hash.class)));
when(context.getBlockchain()).thenReturn(blockchain);
final EthPeers ethPeers = new EthPeers("ETH", TestClock.fixed(), new NoOpMetricsSystem());
final EthPeers ethPeers = new EthPeers("ETH", TestClock.fixed(), new NoOpMetricsSystem(), 25);
final EthContext ethContext = mock(EthContext.class);
when(ethContext.getEthMessages()).thenReturn(mock(EthMessages.class));
when(ethContext.getEthPeers()).thenReturn(ethPeers);

@ -189,7 +189,7 @@ public class RetestethContext {
// mining support
final EthPeers ethPeers = new EthPeers("reteseth", retestethClock, metricsSystem);
final EthPeers ethPeers = new EthPeers("reteseth", retestethClock, metricsSystem, 0);
final SyncState syncState = new SyncState(blockchain, ethPeers);
ethScheduler = new EthScheduler(1, 1, 1, 1, metricsSystem);

Loading…
Cancel
Save