Stop the BlockPropagationManager when it receives the TTD reached event (#3809)

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

Co-authored-by: Justin Florentine <justin+github@florentine.us>
pull/3811/head
Fabio Di Fabio 3 years ago committed by GitHub
parent 28dc97d35e
commit 35fe0fe046
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      CHANGELOG.md
  2. 13
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java
  3. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  4. 128
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java
  5. 36
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/BonsaiBlockPropagationManagerTest.java
  6. 36
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/ForestBlockPropagationManagerTest.java

@ -6,6 +6,9 @@
- GraphQL - allow null log topics in queries which match any topic [#3662](https://github.com/hyperledger/besu/pull/3662)
- multi-arch docker builds for amd64 and arm64 [#2954](https://github.com/hyperledger/besu/pull/2954)
### Bug Fixes
- Stop the BlockPropagationManager when it receives the TTD reached event [#3809](https://github.com/hyperledger/besu/pull/3809)
## 22.4.0
### Breaking Changes
@ -27,7 +30,7 @@
- In the Besu EVM Library all references to SHA3 have been renamed to the more accurate name Kecack256, including class names and comment. [#3749](https://github.com/hyperledger/besu/pull/3749)
### Additions and Improvements
- Onchain node permissioning
- Onchain node permissioning
- Log the enodeURL that was previously only throwing an IllegalStateException during the isPermitted check [#3697](https://github.com/hyperledger/besu/pull/3697),
- Fail startup if node permissioning smart contract version does not match [#3765](https://github.com/hyperledger/besu/pull/3765)
- \[EXPERIMENTAL\] Add snapsync `--sync-mode="X_SNAP"` (only as client) [#3710](https://github.com/hyperledger/besu/pull/3710)

@ -103,6 +103,7 @@ public class BlockPropagationManager {
this.blockBroadcaster = blockBroadcaster;
this.syncState = syncState;
this.pendingBlocksManager = pendingBlocksManager;
this.syncState.subscribeTTDReached(this::reactToTTDReachedEvent);
}
public void start() {
@ -123,6 +124,10 @@ public class BlockPropagationManager {
}
}
public boolean isRunning() {
return started.get();
}
private void setupListeners() {
onBlockAddedSId =
Optional.of(protocolContext.getBlockchain().observeBlockAdded(this::onBlockAdded));
@ -505,6 +510,14 @@ public class BlockPropagationManager {
.collect(Collectors.joining(", ", "[", "]"));
}
private void reactToTTDReachedEvent(final boolean ttdReached) {
if (ttdReached) {
stop();
} else if (!started.get()) {
start();
}
}
@Override
public String toString() {
return "BlockPropagationManager{"

@ -174,8 +174,7 @@ public class DefaultSynchronizer implements Synchronizer {
syncState.markInitialSyncPhaseAsDone();
future = startFullSync();
}
future = future.thenApply(this::finalizeSync);
return future;
return future.thenApply(this::finalizeSync);
} else {
throw new IllegalStateException("Attempt to start an already started synchronizer.");
}

@ -23,8 +23,10 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.ConsensusContext;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
import org.hyperledger.besu.ethereum.chain.Blockchain;
@ -35,6 +37,7 @@ import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions;
import org.hyperledger.besu.ethereum.core.BlockImporter;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
@ -50,6 +53,7 @@ import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.testutil.TestClock;
@ -59,7 +63,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.tuweni.bytes.Bytes;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -77,13 +80,44 @@ public abstract class AbstractBlockPropagationManagerTest {
protected BlockPropagationManager blockPropagationManager;
protected SynchronizerConfiguration syncConfig;
protected final PendingBlocksManager pendingBlocksManager =
new PendingBlocksManager(
SynchronizerConfiguration.builder().blockPropagationRange(-10, 30).build());
spy(
new PendingBlocksManager(
SynchronizerConfiguration.builder().blockPropagationRange(-10, 30).build()));
protected SyncState syncState;
protected final MetricsSystem metricsSystem = new NoOpMetricsSystem();
protected void setup(final DataStorageFormat dataStorageFormat) {
blockchainUtil = BlockchainSetupUtil.forTesting(dataStorageFormat);
blockchain = blockchainUtil.getBlockchain();
protocolSchedule = blockchainUtil.getProtocolSchedule();
final ProtocolContext tempProtocolContext = blockchainUtil.getProtocolContext();
protocolContext =
new ProtocolContext(
blockchain,
tempProtocolContext.getWorldStateArchive(),
tempProtocolContext.getConsensusContext(ConsensusContext.class));
ethProtocolManager =
EthProtocolManagerTestUtil.create(
blockchain,
blockchainUtil.getWorldArchive(),
blockchainUtil.getTransactionPool(),
EthProtocolConfiguration.defaultConfig());
syncConfig = SynchronizerConfiguration.builder().blockPropagationRange(-3, 5).build();
syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
blockBroadcaster = mock(BlockBroadcaster.class);
blockPropagationManager =
new BlockPropagationManager(
syncConfig,
protocolSchedule,
protocolContext,
ethProtocolManager.ethContext(),
syncState,
pendingBlocksManager,
metricsSystem,
blockBroadcaster);
}
@Test
@Ignore // temporarily ignore waiting on Karim's fix
public void importsAnnouncedBlocks_aheadOfChainInOrder() {
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
@ -95,7 +129,7 @@ public abstract class AbstractBlockPropagationManagerTest {
blockPropagationManager.start();
// Setup additonal peer for best peers list
// Setup additional peer for best peers list
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
// Setup peer and messages
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
@ -727,5 +761,89 @@ public abstract class AbstractBlockPropagationManagerTest {
assertThat(blockchain.contains(nextBlock.getHash())).isFalse();
}
@Test
public void shouldStopWhenTTDReached() {
blockPropagationManager.start();
syncState.setReachedTerminalDifficulty(true);
assertThat(blockPropagationManager.isRunning()).isFalse();
}
@Test
public void shouldRestartWhenTTDReachedReturnsFalse() {
blockPropagationManager.start();
syncState.setReachedTerminalDifficulty(true);
assertThat(blockPropagationManager.isRunning()).isFalse();
syncState.setReachedTerminalDifficulty(false);
assertThat(blockPropagationManager.isRunning()).isTrue();
}
@Test
public void shouldNotListenToNewBlockHashesAnnouncementsWhenTTDReached() {
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
// Sanity check
assertThat(blockchain.contains(nextBlock.getHash())).isFalse();
blockPropagationManager.start();
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockHashesMessage nextAnnouncement =
NewBlockHashesMessage.create(
Collections.singletonList(
new NewBlockHashesMessage.NewBlockHash(
nextBlock.getHash(), nextBlock.getHeader().getNumber())));
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());
syncState.setReachedTerminalDifficulty(true);
// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement);
peer.respondWhile(responder, peer::hasOutstandingRequests);
assertThat(blockPropagationManager.isRunning()).isFalse();
assertThat(blockchain.contains(nextBlock.getHash())).isFalse();
}
@Test
public void shouldNotListenToNewBlockAnnouncementsWhenTTDReached() {
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
// Sanity check
assertThat(blockchain.contains(nextBlock.getHash())).isFalse();
blockPropagationManager.start();
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage nextAnnouncement =
NewBlockMessage.create(
nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get());
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());
syncState.setReachedTerminalDifficulty(true);
// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement);
peer.respondWhile(responder, peer::hasOutstandingRequests);
assertThat(blockPropagationManager.isRunning()).isFalse();
assertThat(blockchain.contains(nextBlock.getHash())).isFalse();
}
@Test
public void shouldNotListenToBlockAddedEventsWhenTTDReached() {
blockchainUtil.importFirstBlocks(2);
blockPropagationManager.start();
syncState.setReachedTerminalDifficulty(true);
blockchainUtil.importBlockAtIndex(2);
assertThat(blockPropagationManager.isRunning()).isFalse();
verifyNoInteractions(pendingBlocksManager);
}
public abstract Blockchain getFullBlockchain();
}

@ -14,15 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.sync;
import static org.mockito.Mockito.mock;
import org.hyperledger.besu.ethereum.ConsensusContext;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.junit.Before;
@ -39,34 +32,7 @@ public class BonsaiBlockPropagationManagerTest extends AbstractBlockPropagationM
@Before
public void setup() {
blockchainUtil = BlockchainSetupUtil.forTesting(DataStorageFormat.BONSAI);
blockchain = blockchainUtil.getBlockchain();
protocolSchedule = blockchainUtil.getProtocolSchedule();
final ProtocolContext tempProtocolContext = blockchainUtil.getProtocolContext();
protocolContext =
new ProtocolContext(
blockchain,
tempProtocolContext.getWorldStateArchive(),
tempProtocolContext.getConsensusContext(ConsensusContext.class));
ethProtocolManager =
EthProtocolManagerTestUtil.create(
blockchain,
blockchainUtil.getWorldArchive(),
blockchainUtil.getTransactionPool(),
EthProtocolConfiguration.defaultConfig());
syncConfig = SynchronizerConfiguration.builder().blockPropagationRange(-3, 5).build();
syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
blockBroadcaster = mock(BlockBroadcaster.class);
blockPropagationManager =
new BlockPropagationManager(
syncConfig,
protocolSchedule,
protocolContext,
ethProtocolManager.ethContext(),
syncState,
pendingBlocksManager,
metricsSystem,
blockBroadcaster);
setup(DataStorageFormat.BONSAI);
}
@Override

@ -14,15 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.sync;
import static org.mockito.Mockito.mock;
import org.hyperledger.besu.ethereum.ConsensusContext;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.junit.Before;
@ -39,34 +32,7 @@ public class ForestBlockPropagationManagerTest extends AbstractBlockPropagationM
@Before
public void setup() {
blockchainUtil = BlockchainSetupUtil.forTesting(DataStorageFormat.FOREST);
blockchain = blockchainUtil.getBlockchain();
protocolSchedule = blockchainUtil.getProtocolSchedule();
final ProtocolContext tempProtocolContext = blockchainUtil.getProtocolContext();
protocolContext =
new ProtocolContext(
blockchain,
tempProtocolContext.getWorldStateArchive(),
tempProtocolContext.getConsensusContext(ConsensusContext.class));
ethProtocolManager =
EthProtocolManagerTestUtil.create(
blockchain,
blockchainUtil.getWorldArchive(),
blockchainUtil.getTransactionPool(),
EthProtocolConfiguration.defaultConfig());
syncConfig = SynchronizerConfiguration.builder().blockPropagationRange(-3, 5).build();
syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
blockBroadcaster = mock(BlockBroadcaster.class);
blockPropagationManager =
new BlockPropagationManager(
syncConfig,
protocolSchedule,
protocolContext,
ethProtocolManager.ethContext(),
syncState,
pendingBlocksManager,
metricsSystem,
blockBroadcaster);
setup(DataStorageFormat.FOREST);
}
@Override

Loading…
Cancel
Save