Fixes race condition between `NodeAddedListener` and `FullBlockImportStep` (#56)

Fixes race condition between attaching the NodeAddedListener and some state from post marked block being persisted. We now prepare and cleanup the listener only once each, on start and stop respectively.

Other changes:

Start the Pruner before FullSyncDownloader. Luckily the downloader took enough time to start downloading that this wasn't an issue but it's safer this way.

Signed-off-by: Ratan Rai Sur <ratan.r.sur@gmail.com>
pull/87/head
Ratan Rai Sur 5 years ago committed by GitHub
parent fad9bf9c34
commit c6ad0f7705
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java
  2. 34
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java
  3. 9
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java
  4. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java

@ -123,6 +123,10 @@ public class PrunerIntegrationTest {
generateBlockchainData(numBlockInCycle, accountsPerBlock); generateBlockchainData(numBlockInCycle, accountsPerBlock);
assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE); assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE);
// Restarting the Pruner shouldn't matter since we're idle
pruner.stop();
pruner.start();
// Collect the nodes we expect to keep // Collect the nodes we expect to keep
final Set<BytesValue> expectedNodes = new HashSet<>(); final Set<BytesValue> expectedNodes = new HashSet<>();
for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) { for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) {

@ -97,14 +97,12 @@ public class MarkSweepPruner {
} }
public void prepare() { public void prepare() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case. // Optimization for the case where the previous cycle was interrupted (like the node was shut
markStorage.clear(); // down). If the previous cycle was interrupted, there will be marks in the mark storage from
pendingMarks.clear(); // last time, causing the first sweep to be smaller than it needs to be.
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes); clearMarks();
}
public void cleanup() { nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes);
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
} }
public void mark(final Hash rootHash) { public void mark(final Hash rootHash) {
@ -151,9 +149,18 @@ public class MarkSweepPruner {
// Sweep non-state-root nodes // Sweep non-state-root nodes
prunedNodeCount += worldStateStorage.prune(this::isMarked); prunedNodeCount += worldStateStorage.prune(this::isMarked);
sweptNodesCounter.inc(prunedNodeCount); sweptNodesCounter.inc(prunedNodeCount);
clearMarks();
LOG.debug("Completed sweeping unused nodes");
}
public void cleanup() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
clearMarks();
}
public void clearMarks() {
markStorage.clear(); markStorage.clear();
LOG.debug("Completed sweeping unused nodes"); pendingMarks.clear();
} }
private boolean isMarked(final Bytes32 key) { private boolean isMarked(final Bytes32 key) {
@ -190,7 +197,14 @@ public class MarkSweepPruner {
@VisibleForTesting @VisibleForTesting
void markNode(final Bytes32 hash) { void markNode(final Bytes32 hash) {
markNodes(Collections.singleton(hash)); markedNodesCounter.inc();
markLock.lock();
try {
pendingMarks.add(hash);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
} }
private void markNodes(final Collection<Bytes32> nodeHashes) { private void markNodes(final Collection<Bytes32> nodeHashes) {
@ -210,7 +224,7 @@ public class MarkSweepPruner {
} }
} }
void flushPendingMarks() { private void flushPendingMarks() {
markLock.lock(); markLock.lock();
try { try {
final KeyValueStorageTransaction transaction = markStorage.startTransaction(); final KeyValueStorageTransaction transaction = markStorage.startTransaction();

@ -35,6 +35,7 @@ public class Pruner {
private final MarkSweepPruner pruningStrategy; private final MarkSweepPruner pruningStrategy;
private final Blockchain blockchain; private final Blockchain blockchain;
private final ExecutorService executorService; private final ExecutorService executorService;
private Long blockAddedObserverId;
private final long blocksRetained; private final long blocksRetained;
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE); private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
private volatile long markBlockNumber = 0; private volatile long markBlockNumber = 0;
@ -58,11 +59,14 @@ public class Pruner {
public void start() { public void start() {
LOG.info("Starting Pruner."); LOG.info("Starting Pruner.");
blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event)); pruningStrategy.prepare();
blockAddedObserverId =
blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event));
} }
public void stop() throws InterruptedException { public void stop() throws InterruptedException {
pruningStrategy.cleanup(); pruningStrategy.cleanup();
blockchain.removeObserver(blockAddedObserverId);
executorService.awaitTermination(10, TimeUnit.SECONDS); executorService.awaitTermination(10, TimeUnit.SECONDS);
} }
@ -73,7 +77,6 @@ public class Pruner {
final long blockNumber = event.getBlock().getHeader().getNumber(); final long blockNumber = event.getBlock().getHeader().getNumber();
if (state.compareAndSet(State.IDLE, State.MARK_BLOCK_CONFIRMATIONS_AWAITING)) { if (state.compareAndSet(State.IDLE, State.MARK_BLOCK_CONFIRMATIONS_AWAITING)) {
pruningStrategy.prepare();
markBlockNumber = blockNumber; markBlockNumber = blockNumber;
} else if (blockNumber >= markBlockNumber + blockConfirmations } else if (blockNumber >= markBlockNumber + blockConfirmations
&& state.compareAndSet(State.MARK_BLOCK_CONFIRMATIONS_AWAITING, State.MARKING)) { && state.compareAndSet(State.MARK_BLOCK_CONFIRMATIONS_AWAITING, State.MARKING)) {
@ -87,7 +90,6 @@ public class Pruner {
} }
private void mark(final BlockHeader header) { private void mark(final BlockHeader header) {
markBlockNumber = header.getNumber();
final Hash stateRoot = header.getStateRoot(); final Hash stateRoot = header.getStateRoot();
LOG.debug( LOG.debug(
"Begin marking used nodes for pruning. Block number: {} State root: {}", "Begin marking used nodes for pruning. Block number: {} State root: {}",
@ -117,6 +119,7 @@ public class Pruner {
executorService.execute(action); executorService.execute(action);
} catch (final Throwable t) { } catch (final Throwable t) {
LOG.error("Pruning failed", t); LOG.error("Pruning failed", t);
pruningStrategy.cleanup();
state.set(State.IDLE); state.set(State.IDLE);
} }
} }

@ -170,8 +170,8 @@ public class DefaultSynchronizer<C> implements Synchronizer {
} }
private void startFullSync() { private void startFullSync() {
fullSyncDownloader.start();
maybePruner.ifPresent(Pruner::start); maybePruner.ifPresent(Pruner::start);
fullSyncDownloader.start();
} }
@Override @Override

Loading…
Cancel
Save