3943 stop blocks on finalized (#4058)

* adds interfaces for tracking merge state and forchoices
* after 2 finalizations from fcu, disconnect any peers sending new blocks, or connecting with td > ttd
* tests for preventing pow peers from joining
* refactored to separate out merge logic

Signed-off-by: Justin Florentine <justin+github@florentine.us>
pull/4083/head
Justin Florentine 2 years ago committed by GitHub
parent a60a286f93
commit 1a62d2a6c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  2. 7
      besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java
  3. 4
      besu/src/main/java/org/hyperledger/besu/controller/IbftLegacyBesuControllerBuilder.java
  4. 46
      besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java
  5. 5
      besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java
  6. 2
      consensus/ibftlegacy/src/main/java/org/hyperledger/besu/consensus/ibftlegacy/protocol/Istanbul99ProtocolManager.java
  7. 3
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/FinalizedBlockHashSupplier.java
  8. 16
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java
  9. 15
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java
  10. 8
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/TransitionContext.java
  11. 8
      consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/PostMergeContextTest.java
  12. 29
      ethereum/eth/src/main/java/org/hyperledger/besu/consensus/merge/ForkchoiceMessageListener.java
  13. 26
      ethereum/eth/src/main/java/org/hyperledger/besu/consensus/merge/MergeStateHandler.java
  14. 24
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java
  15. 102
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/MergePeerFilter.java
  16. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java
  17. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeersTask.java
  18. 83
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java
  19. 36
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  20. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java
  21. 1
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java

@ -49,6 +49,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.CheckpointBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.ClassicForkPeerValidator;
@ -401,7 +402,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
ethContext,
ethMessages,
scheduler,
peerValidators);
peerValidators,
Optional.empty());
final Optional<SnapProtocolManager> maybeSnapProtocolManager =
createSnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive);
@ -559,7 +561,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final EthContext ethContext,
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators) {
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
return new EthProtocolManager(
protocolContext.getBlockchain(),
networkId,
@ -570,6 +573,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
ethMessages,
ethContext,
peerValidators,
mergePeerFilter,
fastSyncEnabled,
scheduler,
genesisConfig.getForks());

@ -46,6 +46,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -232,7 +233,8 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
final EthContext ethContext,
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators) {
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
return besuControllerBuilderSchedule
.get(0L)
.createEthProtocolManager(
@ -244,7 +246,8 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
ethContext,
ethMessages,
scheduler,
peerValidators);
peerValidators,
mergePeerFilter);
}
@Override

@ -36,6 +36,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
@ -125,7 +126,8 @@ public class IbftLegacyBesuControllerBuilder extends BesuControllerBuilder {
final EthContext ethContext,
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators) {
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
LOG.info("Operating on IBFT-1.0 network.");
return new Istanbul99ProtocolManager(
protocolContext.getBlockchain(),

@ -25,7 +25,13 @@ import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardChain;
@ -62,7 +68,6 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
transactionPool,
miningParameters,
syncState,
ethProtocolManager,
new BackwardSyncContext(
protocolContext,
protocolSchedule,
@ -73,13 +78,50 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
storageProvider, ScheduleBasedBlockHeaderFunctions.create(protocolSchedule))));
}
@Override
protected EthProtocolManager createEthProtocolManager(
final ProtocolContext protocolContext,
final boolean fastSyncEnabled,
final TransactionPool transactionPool,
final EthProtocolConfiguration ethereumWireProtocolConfiguration,
final EthPeers ethPeers,
final EthContext ethContext,
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
if (mergePeerFilter.isPresent()) {
protocolContext
.getConsensusContext(MergeContext.class)
.observeNewIsPostMergeState(mergePeerFilter.get());
protocolContext
.getConsensusContext(MergeContext.class)
.addNewForkchoiceMessageListener(mergePeerFilter.get());
}
EthProtocolManager ethProtocolManager =
super.createEthProtocolManager(
protocolContext,
fastSyncEnabled,
transactionPool,
ethereumWireProtocolConfiguration,
ethPeers,
ethContext,
ethMessages,
scheduler,
peerValidators,
mergePeerFilter);
return ethProtocolManager;
}
protected MiningCoordinator createTransitionMiningCoordinator(
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final TransactionPool transactionPool,
final MiningParameters miningParameters,
final SyncState syncState,
final EthProtocolManager ethProtocolManager,
final BackwardSyncContext backwardSyncContext) {
this.syncState.set(syncState);

@ -114,7 +114,6 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
transactionPool,
transitionMiningParameters,
syncState,
ethProtocolManager,
transitionBackwardsSyncContext));
initTransitionWatcher(protocolContext, composedCoordinator);
return composedCoordinator;
@ -150,8 +149,8 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
PostMergeContext postMergeContext = protocolContext.getConsensusContext(PostMergeContext.class);
postMergeContext.observeNewIsPostMergeState(
newIsPostMergeState -> {
if (newIsPostMergeState) {
(isPoS, difficultyStoppedAt) -> {
if (isPoS) {
// if we transitioned to post-merge, stop and disable any mining
composedCoordinator.getPreMergeObject().disable();
composedCoordinator.getPreMergeObject().stop();

@ -30,6 +30,7 @@ import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.math.BigInteger;
import java.util.List;
import java.util.Optional;
/** This allows for interoperability with Quorum, but shouldn't be used otherwise. */
public class Istanbul99ProtocolManager extends EthProtocolManager {
@ -56,6 +57,7 @@ public class Istanbul99ProtocolManager extends EthProtocolManager {
ethMessages,
ethContext,
peerValidators,
Optional.empty(),
fastSyncEnabled,
scheduler);
}

@ -14,7 +14,6 @@
*/
package org.hyperledger.besu.consensus.merge;
import org.hyperledger.besu.consensus.merge.MergeContext.NewForkchoiceMessageListener;
import org.hyperledger.besu.datatypes.Hash;
import java.util.Optional;
@ -24,7 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FinalizedBlockHashSupplier
implements Supplier<Optional<Hash>>, NewForkchoiceMessageListener {
implements Supplier<Optional<Hash>>, ForkchoiceMessageListener {
private static final Logger LOG = LoggerFactory.getLogger(FinalizedBlockHashSupplier.class);
private volatile Optional<Hash> lastAnnouncedFinalizedBlockHash = Optional.empty();

@ -36,10 +36,9 @@ public interface MergeContext extends ConsensusContext {
boolean isSyncing();
void observeNewIsPostMergeState(final NewMergeStateCallback newMergeStateCallback);
void observeNewIsPostMergeState(final MergeStateHandler mergeStateHandler);
long addNewForkchoiceMessageListener(
final NewForkchoiceMessageListener newForkchoiceMessageListener);
long addNewForkchoiceMessageListener(final ForkchoiceMessageListener forkchoiceMessageListener);
void removeNewForkchoiceMessageListener(final long subscriberId);
@ -67,15 +66,4 @@ public interface MergeContext extends ConsensusContext {
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash);
interface NewMergeStateCallback {
void onNewIsPostMergeState(final boolean newIsPostMergeState);
}
interface NewForkchoiceMessageListener {
void onNewForkchoiceMessage(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash);
}
}

@ -41,9 +41,9 @@ public class PostMergeContext implements MergeContext {
// initial postMerge state is indeterminate until it is set:
private final AtomicReference<Optional<Boolean>> isPostMerge =
new AtomicReference<>(Optional.empty());
private final Subscribers<NewMergeStateCallback> newMergeStateCallbackSubscribers =
private final Subscribers<MergeStateHandler> newMergeStateCallbackSubscribers =
Subscribers.create();
private final Subscribers<NewForkchoiceMessageListener> newForkchoiceMessageCallbackSubscribers =
private final Subscribers<ForkchoiceMessageListener> newForkchoiceMessageCallbackSubscribers =
Subscribers.create();
private final EvictingQueue<PayloadTuple> blocksInProgress =
@ -99,7 +99,8 @@ public class PostMergeContext implements MergeContext {
if (oldState.isEmpty() || oldState.get() != newState) {
newMergeStateCallbackSubscribers.forEach(
newMergeStateCallback -> newMergeStateCallback.onNewIsPostMergeState(newState));
newMergeStateCallback ->
newMergeStateCallback.mergeStateChanged(newState, Optional.of(totalDifficulty)));
}
}
@ -123,14 +124,14 @@ public class PostMergeContext implements MergeContext {
}
@Override
public void observeNewIsPostMergeState(final NewMergeStateCallback newMergeStateCallback) {
newMergeStateCallbackSubscribers.subscribe(newMergeStateCallback);
public void observeNewIsPostMergeState(final MergeStateHandler mergeStateHandler) {
newMergeStateCallbackSubscribers.subscribe(mergeStateHandler);
}
@Override
public long addNewForkchoiceMessageListener(
final NewForkchoiceMessageListener newForkchoiceMessageListener) {
return newForkchoiceMessageCallbackSubscribers.subscribe(newForkchoiceMessageListener);
final ForkchoiceMessageListener forkchoiceMessageListener) {
return newForkchoiceMessageCallbackSubscribers.subscribe(forkchoiceMessageListener);
}
@Override

@ -68,14 +68,14 @@ public class TransitionContext implements MergeContext {
}
@Override
public void observeNewIsPostMergeState(final NewMergeStateCallback newMergeStateCallback) {
postMergeContext.observeNewIsPostMergeState(newMergeStateCallback);
public void observeNewIsPostMergeState(final MergeStateHandler mergeStateHandler) {
postMergeContext.observeNewIsPostMergeState(mergeStateHandler);
}
@Override
public long addNewForkchoiceMessageListener(
final NewForkchoiceMessageListener newForkchoiceMessageListener) {
return postMergeContext.addNewForkchoiceMessageListener(newForkchoiceMessageListener);
final ForkchoiceMessageListener forkchoiceMessageListener) {
return postMergeContext.addNewForkchoiceMessageListener(forkchoiceMessageListener);
}
@Override

@ -22,7 +22,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.consensus.merge.MergeContext.NewMergeStateCallback;
import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@ -169,12 +168,13 @@ public class PostMergeContextTest {
assertThat(postMergeContext.retrieveBlockById(evictedPayloadId)).isEmpty();
}
private static class MergeStateChangeCollector implements NewMergeStateCallback {
private static class MergeStateChangeCollector implements MergeStateHandler {
final List<Boolean> stateChanges = new ArrayList<>();
@Override
public void onNewIsPostMergeState(final boolean newIsPostMergeState) {
stateChanges.add(newIsPostMergeState);
public void mergeStateChanged(
final boolean isPoS, final Optional<Difficulty> difficultyStoppedAt) {
stateChanges.add(isPoS);
}
public void reset() {

@ -0,0 +1,29 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.consensus.merge;
import org.hyperledger.besu.datatypes.Hash;
import java.util.Optional;
public interface ForkchoiceMessageListener {
void onNewForkchoiceMessage(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash);
}

@ -0,0 +1,26 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.consensus.merge;
import org.hyperledger.besu.ethereum.core.Difficulty;
import java.util.Optional;
public interface MergeStateHandler {
void mergeStateChanged(final boolean isPoS, final Optional<Difficulty> difficultyStoppedAt);
}

@ -73,6 +73,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
private final List<PeerValidator> peerValidators;
// The max size of messages (in bytes)
private final int maxMessageSize;
private final Optional<MergePeerFilter> mergePeerFilter;
public EthProtocolManager(
final Blockchain blockchain,
@ -84,6 +85,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final EthMessages ethMessages,
final EthContext ethContext,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final boolean fastSyncEnabled,
final EthScheduler scheduler,
final ForkIdManager forkIdManager) {
@ -92,9 +94,9 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
this.scheduler = scheduler;
this.blockchain = blockchain;
this.maxMessageSize = ethereumWireProtocolConfiguration.getMaxMessageSize();
this.mergePeerFilter = mergePeerFilter;
this.shutdown = new CountDownLatch(1);
genesisHash = blockchain.getBlockHashByNumber(0L).get();
this.genesisHash = blockchain.getBlockHashByNumber(0L).orElse(Hash.ZERO);
this.forkIdManager = forkIdManager;
@ -131,6 +133,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final EthMessages ethMessages,
final EthContext ethContext,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final boolean fastSyncEnabled,
final EthScheduler scheduler) {
this(
@ -143,6 +146,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
ethMessages,
ethContext,
peerValidators,
mergePeerFilter,
fastSyncEnabled,
scheduler,
new ForkIdManager(
@ -161,6 +165,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final EthMessages ethMessages,
final EthContext ethContext,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final boolean fastSyncEnabled,
final EthScheduler scheduler,
final List<Long> forks) {
@ -174,6 +179,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
ethMessages,
ethContext,
peerValidators,
mergePeerFilter,
fastSyncEnabled,
scheduler,
new ForkIdManager(
@ -240,7 +246,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final EthPeer ethPeer = ethPeers.peer(message.getConnection());
if (ethPeer == null) {
LOG.debug(
"Ignoring message received from unknown peer connection: " + message.getConnection());
"Ignoring message received from unknown peer connection: {}", message.getConnection());
return;
}
@ -271,6 +277,13 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
return;
}
if (this.mergePeerFilter.isPresent()) {
if (this.mergePeerFilter.get().disconnectIfGossipingBlocks(message, ethPeer)) {
handleDisconnect(ethPeer.getConnection(), DisconnectReason.SUBPROTOCOL_TRIGGERED, false);
return;
}
}
final EthMessage ethMessage = new EthMessage(ethPeer, messageData);
if (!ethPeer.validateReceivedMessage(ethMessage, getSupportedProtocol())) {
@ -377,6 +390,11 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
networkId,
status.genesisHash());
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
} else if (mergePeerFilter.isPresent()) {
boolean disconnected = mergePeerFilter.get().disconnectIfPoW(status, peer);
if (disconnected) {
handleDisconnect(peer.getConnection(), DisconnectReason.SUBPROTOCOL_TRIGGERED, false);
}
} else {
LOG.debug("Received status message from {}: {}", peer, status);
peer.registerStatusReceived(

@ -0,0 +1,102 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.consensus.merge.ForkchoiceMessageListener;
import org.hyperledger.besu.consensus.merge.MergeStateHandler;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.StatusMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MergePeerFilter implements MergeStateHandler, ForkchoiceMessageListener {
private Optional<Difficulty> powTerminalDifficulty = Optional.of(Difficulty.MAX_VALUE);
private final StampedLock powTerminalDifficultyLock = new StampedLock();
private Hash lastFinalized = Hash.ZERO;
private final AtomicLong numFinalizedSeen = new AtomicLong(0);
private static final Logger LOG = LoggerFactory.getLogger(MergePeerFilter.class);
public boolean disconnectIfPoW(final StatusMessage status, final EthPeer peer) {
long lockStamp = this.powTerminalDifficultyLock.readLock();
try {
if (this.powTerminalDifficulty.isPresent()
&& status.totalDifficulty().greaterThan(this.powTerminalDifficulty.get())) {
LOG.debug(
"Disconnecting peer with difficulty {}, likely still on PoW chain",
status.totalDifficulty());
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
return true;
} else {
return false;
}
} finally {
this.powTerminalDifficultyLock.unlockRead(lockStamp);
}
}
public boolean disconnectIfGossipingBlocks(final Message message, final EthPeer peer) {
final int code = message.getData().getCode();
if (isFinalized() && (code == EthPV62.NEW_BLOCK || code == EthPV62.NEW_BLOCK_HASHES)) {
LOG.debug("disconnecting peer for sending new blocks after transition to PoS");
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
return true;
} else {
return false;
}
}
private boolean isFinalized() {
return this.numFinalizedSeen.get() > 1;
}
@Override
public void onNewForkchoiceMessage(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash) {
if (maybeFinalizedBlockHash.isPresent()
&& !maybeFinalizedBlockHash.get().equals(this.lastFinalized)) {
this.lastFinalized = maybeFinalizedBlockHash.get();
this.numFinalizedSeen.getAndIncrement();
LOG.debug("have seen {} finalized blocks", this.numFinalizedSeen);
}
}
@Override
public void mergeStateChanged(
final boolean isPoS, final Optional<Difficulty> difficultyStoppedAt) {
if (isPoS && difficultyStoppedAt.isPresent()) {
LOG.debug("terminal difficulty set to {}", difficultyStoppedAt.get().getValue());
long lockStamp = this.powTerminalDifficultyLock.writeLock();
try {
this.powTerminalDifficulty = difficultyStoppedAt;
} finally {
this.powTerminalDifficultyLock.unlockWrite(lockStamp);
}
}
}
}

@ -113,7 +113,7 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
}
if (cause instanceof NoAvailablePeersException) {
LOG.info(
LOG.debug(
"No useful peer found, checking remaining current peers for usefulness: {}",
ethContext.getEthPeers().peerCount());
// Wait for new peer to connect

@ -60,11 +60,11 @@ public class WaitForPeersTask extends AbstractEthTask<Void> {
(peer) -> {
final int peerCount = ethPeers.peerCount();
if (peerCount >= targetPeerCount) {
LOG.info("Complete: {} peers connected.", targetPeerCount);
LOG.debug("Complete: {} peers connected.", targetPeerCount);
// We hit our target
result.complete(null);
} else {
LOG.info(
LOG.debug(
"Waiting for {} total peers to connect. {} peers currently connected.",
targetPeerCount,
peerCount);

@ -65,6 +65,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.DefaultMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
@ -77,6 +78,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -219,6 +221,55 @@ public final class EthProtocolManagerTest {
}
}
@Test
public void disconnectNewPoWPeers() {
MergePeerFilter mergePeerFilter = new MergePeerFilter();
try (final EthProtocolManager ethManager =
EthProtocolManagerTestUtil.create(
blockchain,
protocolContext.getWorldStateArchive(),
transactionPool,
EthProtocolConfiguration.defaultConfig(),
Optional.of(mergePeerFilter))) {
final MockPeerConnection workPeer = setupPeer(ethManager, (cap, msg, conn) -> {});
final MockPeerConnection stakePeer = setupPeer(ethManager, (cap, msg, conn) -> {});
final StatusMessage workPeerStatus =
StatusMessage.create(
EthProtocol.EthVersion.V63,
BigInteger.ONE,
blockchain.getChainHead().getTotalDifficulty().add(20),
blockchain.getChainHeadHash(),
blockchain.getBlockHeader(BlockHeader.GENESIS_BLOCK_NUMBER).get().getHash());
final StatusMessage stakePeerStatus =
StatusMessage.create(
EthProtocol.EthVersion.V63,
BigInteger.ONE,
blockchain.getChainHead().getTotalDifficulty(),
blockchain.getChainHeadHash(),
blockchain.getBlockHeader(BlockHeader.GENESIS_BLOCK_NUMBER).get().getHash());
ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(stakePeer, stakePeerStatus));
mergePeerFilter.mergeStateChanged(
true, Optional.of(blockchain.getChainHead().getTotalDifficulty()));
mergePeerFilter.onNewForkchoiceMessage(
Hash.EMPTY, Optional.of(Hash.hash(Bytes.of(1))), Hash.EMPTY);
mergePeerFilter.onNewForkchoiceMessage(
Hash.EMPTY, Optional.of(Hash.hash(Bytes.of(2))), Hash.EMPTY);
ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(workPeer, workPeerStatus));
assertThat(workPeer.isDisconnected()).isTrue();
assertThat(workPeer.getDisconnectReason()).isPresent();
assertThat(workPeer.getDisconnectReason())
.get()
.isEqualTo(DisconnectReason.SUBPROTOCOL_TRIGGERED);
assertThat(stakePeer.isDisconnected()).isFalse();
}
}
@Test
public void doNotDisconnectOnLargeMessageWithinLimits() {
try (final EthProtocolManager ethManager =
@ -311,7 +362,7 @@ public final class EthProtocolManagerTest {
final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message);
final List<BlockHeader> headers =
Lists.newArrayList(headersMsg.getHeaders(protocolSchedule));
assertThat(headers.size()).isEqualTo(blockCount);
assertThat(headers).hasSize(blockCount);
for (int i = 0; i < blockCount; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i);
}
@ -350,7 +401,7 @@ public final class EthProtocolManagerTest {
final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message);
final List<BlockHeader> headers =
Lists.newArrayList(headersMsg.getHeaders(protocolSchedule));
assertThat(headers.size()).isEqualTo(limit);
assertThat(headers).hasSize(limit);
for (int i = 0; i < limit; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i);
}
@ -386,7 +437,7 @@ public final class EthProtocolManagerTest {
final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message);
final List<BlockHeader> headers =
Lists.newArrayList(headersMsg.getHeaders(protocolSchedule));
assertThat(headers.size()).isEqualTo(blockCount);
assertThat(headers).hasSize(blockCount);
for (int i = 0; i < blockCount; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(endBlock - i);
}
@ -424,7 +475,7 @@ public final class EthProtocolManagerTest {
final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message);
final List<BlockHeader> headers =
Lists.newArrayList(headersMsg.getHeaders(protocolSchedule));
assertThat(headers.size()).isEqualTo(blockCount);
assertThat(headers).hasSize(blockCount);
for (int i = 0; i < blockCount; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i * (skip + 1));
}
@ -463,7 +514,7 @@ public final class EthProtocolManagerTest {
final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message);
final List<BlockHeader> headers =
Lists.newArrayList(headersMsg.getHeaders(protocolSchedule));
assertThat(headers.size()).isEqualTo(blockCount);
assertThat(headers).hasSize(blockCount);
for (int i = 0; i < blockCount; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(endBlock - i * (skip + 1));
}
@ -522,7 +573,7 @@ public final class EthProtocolManagerTest {
final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message);
final List<BlockHeader> headers =
Lists.newArrayList(headersMsg.getHeaders(protocolSchedule));
assertThat(headers.size()).isEqualTo(2);
assertThat(headers).hasSize(2);
for (int i = 0; i < 2; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i);
}
@ -559,7 +610,7 @@ public final class EthProtocolManagerTest {
final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message);
final List<BlockHeader> headers =
Lists.newArrayList(headersMsg.getHeaders(protocolSchedule));
assertThat(headers.size()).isEqualTo(0);
assertThat(headers).isEmpty();
done.complete(null);
};
final PeerConnection peer = setupPeer(ethManager, onSend);
@ -603,7 +654,7 @@ public final class EthProtocolManagerTest {
final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(message);
final List<BlockBody> bodies =
Lists.newArrayList(blocksMessage.bodies(protocolSchedule));
assertThat(bodies.size()).isEqualTo(blockCount);
assertThat(bodies).hasSize(blockCount);
for (int i = 0; i < blockCount; i++) {
assertThat(expectedBlocks[i].getBody()).isEqualTo(bodies.get(i));
}
@ -654,7 +705,7 @@ public final class EthProtocolManagerTest {
final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(message);
final List<BlockBody> bodies =
Lists.newArrayList(blocksMessage.bodies(protocolSchedule));
assertThat(bodies.size()).isEqualTo(limit);
assertThat(bodies).hasSize(limit);
for (int i = 0; i < limit; i++) {
assertThat(expectedBlocks[i].getBody()).isEqualTo(bodies.get(i));
}
@ -698,7 +749,7 @@ public final class EthProtocolManagerTest {
final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(message);
final List<BlockBody> bodies =
Lists.newArrayList(blocksMessage.bodies(protocolSchedule));
assertThat(bodies.size()).isEqualTo(1);
assertThat(bodies).hasSize(1);
assertThat(expectedBlock.getBody()).isEqualTo(bodies.get(0));
done.complete(null);
};
@ -743,7 +794,7 @@ public final class EthProtocolManagerTest {
final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(message);
final List<List<TransactionReceipt>> receipts =
Lists.newArrayList(receiptsMessage.receipts());
assertThat(receipts.size()).isEqualTo(blockCount);
assertThat(receipts).hasSize(blockCount);
for (int i = 0; i < blockCount; i++) {
assertThat(expectedReceipts.get(i)).isEqualTo(receipts.get(i));
}
@ -793,7 +844,7 @@ public final class EthProtocolManagerTest {
final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(message);
final List<List<TransactionReceipt>> receipts =
Lists.newArrayList(receiptsMessage.receipts());
assertThat(receipts.size()).isEqualTo(limit);
assertThat(receipts).hasSize(limit);
for (int i = 0; i < limit; i++) {
assertThat(expectedReceipts.get(i)).isEqualTo(receipts.get(i));
}
@ -837,7 +888,7 @@ public final class EthProtocolManagerTest {
final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(message);
final List<List<TransactionReceipt>> receipts =
Lists.newArrayList(receiptsMessage.receipts());
assertThat(receipts.size()).isEqualTo(1);
assertThat(receipts).hasSize(1);
assertThat(expectedReceipts).isEqualTo(receipts.get(0));
done.complete(null);
};
@ -885,7 +936,7 @@ public final class EthProtocolManagerTest {
assertThat(message.getCode()).isEqualTo(EthPV63.NODE_DATA);
final NodeDataMessage receiptsMessage = NodeDataMessage.readFrom(message);
final List<Bytes> nodeData = receiptsMessage.nodeData();
assertThat(nodeData.size()).isEqualTo(blockCount);
assertThat(nodeData).hasSize(blockCount);
for (int i = 0; i < blockCount; i++) {
assertThat(expectedResults.get(i)).isEqualTo(nodeData.get(i));
}
@ -952,7 +1003,7 @@ public final class EthProtocolManagerTest {
assertThat(msg.totalDifficulty(protocolSchdeule)).isEqualTo(expectedTotalDifficulty);
}
assertThat(receivingPeerCaptor.getAllValues().containsAll(peers)).isTrue();
assertThat(receivingPeerCaptor.getAllValues()).containsAll(peers);
}
}
@ -994,7 +1045,7 @@ public final class EthProtocolManagerTest {
final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message);
final List<BlockHeader> headers =
Lists.newArrayList(headersMsg.getHeaders(protocolSchedule));
assertThat(headers.size()).isEqualTo(receivedBlockCount);
assertThat(headers).hasSize(receivedBlockCount);
for (int i = 0; i < receivedBlockCount; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(receivedBlockCount - 1 - i);
}

@ -40,6 +40,7 @@ import org.hyperledger.besu.testutil.TestClock;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Optional;
import java.util.OptionalLong;
public class EthProtocolManagerTestUtil {
@ -58,6 +59,40 @@ public class EthProtocolManagerTestUtil {
ethereumWireProtocolConfiguration);
}
public static EthProtocolManager create(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final TransactionPool transactionPool,
final EthProtocolConfiguration ethereumWireProtocolConfiguration,
final Optional<MergePeerFilter> mergePeerFilter) {
EthPeers peers =
new EthPeers(
EthProtocol.NAME,
TestClock.fixed(),
new NoOpMetricsSystem(),
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
EthMessages messages = new EthMessages();
EthScheduler ethScheduler = new DeterministicEthScheduler(TimeoutPolicy.NEVER_TIMEOUT);
EthContext ethContext = new EthContext(peers, messages, ethScheduler);
return new EthProtocolManager(
blockchain,
BigInteger.ONE,
worldStateArchive,
transactionPool,
ethereumWireProtocolConfiguration,
peers,
messages,
ethContext,
Collections.emptyList(),
mergePeerFilter,
false,
ethScheduler,
new ForkIdManager(blockchain, Collections.emptyList(), false));
}
public static EthProtocolManager create(
final Blockchain blockchain,
final EthScheduler ethScheduler,
@ -101,6 +136,7 @@ public class EthProtocolManagerTestUtil {
ethMessages,
ethContext,
Collections.emptyList(),
Optional.empty(),
false,
ethScheduler,
forkIdManager);

@ -67,6 +67,7 @@ import java.math.BigInteger;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import io.vertx.core.Vertx;
@ -157,6 +158,7 @@ public class TestNode implements Closeable {
ethMessages,
ethContext,
Collections.emptyList(),
Optional.empty(),
false,
scheduler);

@ -115,6 +115,7 @@ public class TransactionPoolFactoryTest {
mock(EthMessages.class),
ethContext,
Collections.emptyList(),
Optional.empty(),
true,
mock(EthScheduler.class),
mock(ForkIdManager.class));

Loading…
Cancel
Save