Only stop block propagation, not tx handling. (#4186)

* on block propagation stop, only kill the new block and new block hash handlers, leave all others (especially txs) alone

Signed-off-by: Justin Florentine <justin+github@florentine.us>
pull/4204/head
Justin Florentine 2 years ago committed by GitHub
parent 9b2fcde2b0
commit 992534e417
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CHANGELOG.md
  2. 21
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthMessages.java
  3. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  4. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java
  5. 3
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java

@ -5,7 +5,9 @@
### Additions and Improvements
- Deprecation warning for Ropsten, Rinkeby, Kiln [#4173](https://github.com/hyperledger/besu/pull/4173)
### Bug Fixes
### Bug Fixes
- Fixes previous known issue [#3890](https://github.com/hyperledger/besu/issues/3890)from RC3 requiring a restart post-merge to continue correct transaction handling.
- Stop producing stack traces when a get headers response only contains the range start header [#4189](https://github.com/hyperledger/besu/pull/4189)
- Upgrade Spotless to 6.8.0 [#4195](https://github.com/hyperledger/besu/pull/4195)
- Upgrade Gradle to 7.5 [#4196](https://github.com/hyperledger/besu/pull/4196)

@ -17,10 +17,14 @@ package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.util.Subscribers;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.annotations.VisibleForTesting;
public class EthMessages {
private final Map<Integer, Subscribers<MessageCallback>> listenersByCode =
new ConcurrentHashMap<>();
@ -47,9 +51,12 @@ public class EthMessages {
.subscribe(callback);
}
public void unsubsribe(final long id) {
for (Subscribers<MessageCallback> subscribers : listenersByCode.values()) {
subscribers.unsubscribe(id);
public void unsubscribe(final long subscriptionId, final int messageCode) {
if (listenersByCode.containsKey(messageCode)) {
listenersByCode.get(messageCode).unsubscribe(subscriptionId);
if (listenersByCode.get(messageCode).getSubscriberCount() < 1) {
listenersByCode.remove(messageCode);
}
}
}
@ -58,6 +65,14 @@ public class EthMessages {
messageResponseConstructorsByCode.put(messageCode, messageResponseConstructor);
}
@VisibleForTesting
public List<Integer> messageCodesHandled() {
List<Integer> retval = new ArrayList<>();
retval.addAll(messageResponseConstructorsByCode.keySet());
retval.addAll(listenersByCode.keySet());
return retval;
}
@FunctionalInterface
public interface MessageCallback {
void exec(EthMessage message);

@ -107,7 +107,10 @@ public class EthPeers {
clock,
permissioningProviders);
final EthPeer ethPeer = connections.putIfAbsent(peerConnection, peer);
LOG.debug("Adding new EthPeer {}", ethPeer);
LOG.debug(
"Adding new EthPeer {} {}",
peer.getShortNodeId(),
ethPeer == null ? "for the first time" : "");
}
public void registerDisconnect(final PeerConnection connection) {

@ -144,8 +144,9 @@ public class BlockPropagationManager {
private void clearListeners() {
onBlockAddedSId.ifPresent(id -> protocolContext.getBlockchain().removeObserver(id));
newBlockSId.ifPresent(id -> ethContext.getEthMessages().unsubsribe(id));
newBlockHashesSId.ifPresent(id -> ethContext.getEthMessages().unsubsribe(id));
newBlockSId.ifPresent(id -> ethContext.getEthMessages().unsubscribe(id, EthPV62.NEW_BLOCK));
newBlockHashesSId.ifPresent(
id -> ethContext.getEthMessages().unsubscribe(id, EthPV62.NEW_BLOCK_HASHES));
onBlockAddedSId = Optional.empty();
newBlockSId = Optional.empty();
newBlockHashesSId = Optional.empty();

@ -46,6 +46,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer.Responder;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockMessage;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
@ -815,6 +816,8 @@ public abstract class AbstractBlockPropagationManagerTest {
blockPropagationManager.start();
syncState.setReachedTerminalDifficulty(true);
assertThat(blockPropagationManager.isRunning()).isFalse();
assertThat(ethProtocolManager.ethContext().getEthMessages().messageCodesHandled())
.doesNotContain(EthPV62.NEW_BLOCK_HASHES, EthPV62.NEW_BLOCK);
}
@Test

Loading…
Cancel
Save