Fix heal stall issue (#3761)

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
pull/3768/head
matkt 3 years ago committed by GitHub
parent 77b308a37c
commit 7a48d9ff2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManager.java
  2. 1
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java
  3. 48
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java
  4. 31
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java
  5. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java
  6. 24
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManagerTest.java
  7. 19
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStepTest.java
  8. 14
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java

@ -18,57 +18,52 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.services.tasks.TasksPriorityProvider;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DynamicPivotBlockManager<REQUEST extends TasksPriorityProvider> {
public class DynamicPivotBlockManager {
public static final BiConsumer<BlockHeader, Boolean> doNothingOnPivotChange = (___, __) -> {};
private static final Logger LOG = LoggerFactory.getLogger(DynamicPivotBlockManager.class);
private final AtomicBoolean isSearchingPivotBlock = new AtomicBoolean(false);
private final AtomicBoolean isUpdatingPivotBlock = new AtomicBoolean(false);
private final WorldDownloadState<REQUEST> worldDownloadState;
private final FastSyncActions syncActions;
private final FastSyncState syncState;
private final int pivotBlockWindowValidity;
private final int pivotBlockDistanceBeforeCaching;
private Optional<BlockHeader> lastBlockFound;
private Optional<BlockHeader> lastPivotBlockFound;
public DynamicPivotBlockManager(
final WorldDownloadState<REQUEST> worldDownloadState,
final FastSyncActions fastSyncActions,
final SnapSyncState fastSyncState,
final int pivotBlockWindowValidity,
final int pivotBlockDistanceBeforeCaching) {
this.worldDownloadState = worldDownloadState;
this.syncActions = fastSyncActions;
this.syncState = fastSyncState;
this.pivotBlockWindowValidity = pivotBlockWindowValidity;
this.pivotBlockDistanceBeforeCaching = pivotBlockDistanceBeforeCaching;
this.lastBlockFound = Optional.empty();
this.lastPivotBlockFound = Optional.empty();
}
public void check(final Consumer<BlockHeader> onNewPivotBlock) {
public void check(final BiConsumer<BlockHeader, Boolean> onNewPivotBlock) {
syncState
.getPivotBlockNumber()
.ifPresent(
blockNumber -> {
final long currentPivotBlockNumber = syncState.getPivotBlockNumber().orElseThrow();
currentPivotBlockNumber -> {
final long distanceNextPivotBlock =
syncActions.getSyncState().bestChainHeight()
- lastBlockFound
- lastPivotBlockFound
.map(ProcessableBlockHeader::getNumber)
.orElse(currentPivotBlockNumber);
if (distanceNextPivotBlock > pivotBlockDistanceBeforeCaching
@ -77,7 +72,7 @@ public class DynamicPivotBlockManager<REQUEST extends TasksPriorityProvider> {
.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)
.thenCompose(syncActions::selectPivotBlock)
.thenCompose(syncActions::downloadPivotBlockHeader)
.thenAccept(fss -> lastBlockFound = fss.getPivotBlockHeader())
.thenAccept(fss -> lastPivotBlockFound = fss.getPivotBlockHeader())
.orTimeout(5, TimeUnit.MINUTES)
.whenComplete((unused, throwable) -> isSearchingPivotBlock.set(false));
}
@ -92,16 +87,15 @@ public class DynamicPivotBlockManager<REQUEST extends TasksPriorityProvider> {
});
}
private void switchToNewPivotBlock(final Consumer<BlockHeader> onNewPivotBlock) {
lastBlockFound.ifPresent(
public void switchToNewPivotBlock(final BiConsumer<BlockHeader, Boolean> onSwitchDone) {
lastPivotBlockFound.ifPresentOrElse(
blockHeader -> {
LOG.info(
"Select new pivot block {} {}", blockHeader.getNumber(), blockHeader.getStateRoot());
syncState.setCurrentHeader(blockHeader);
onNewPivotBlock.accept(blockHeader);
worldDownloadState.requestComplete(true);
worldDownloadState.notifyTaskAvailable();
});
lastBlockFound = Optional.empty();
onSwitchDone.accept(blockHeader, true);
},
() -> onSwitchDone.accept(syncState.getPivotBlockHeader().orElseThrow(), false));
lastPivotBlockFound = Optional.empty();
}
}

@ -60,7 +60,6 @@ public class LoadLocalDataStep {
existingNodeCounter.inc();
request.setData(existingData.get());
request.setRequiresPersisting(false);
request.setRootHash(snapSyncState.getPivotBlockHeader().get().getStateRoot());
final WorldStateStorage.Updater updater = worldStateStorage.updater();
request.persist(worldStateStorage, updater, downloadState, snapSyncState);
updater.commit();

@ -61,6 +61,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
new InMemoryTasksPriorityQueues<>();
public final HashSet<Bytes> inconsistentAccounts = new HashSet<>();
private DynamicPivotBlockManager dynamicPivotBlockManager;
private final SnapSyncState snapSyncState;
// metrics around the snapsync
@ -145,15 +146,8 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
&& pendingBigStorageRequests.allTasksCompleted()
&& pendingTrieNodeRequests.allTasksCompleted()) {
if (!snapSyncState.isHealInProgress()) {
LOG.info(
"Starting world state heal process from peers (generated nodes {})",
generatedNodes.get());
snapSyncState.setHealStatus(true);
enqueueRequest(
createAccountTrieNodeDataRequest(
snapSyncState.getPivotBlockHeader().orElseThrow().getStateRoot(),
Bytes.EMPTY,
inconsistentAccounts));
LOG.info("Starting world state heal process from peers");
startHeal();
} else {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
@ -180,6 +174,30 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
pendingTrieNodeRequests.clear();
}
public synchronized void startHeal() {
snapSyncState.setHealStatus(true);
// try to find new pivot block before healing
dynamicPivotBlockManager.switchToNewPivotBlock(
(blockHeader, newPivotBlockFound) -> {
enqueueRequest(
createAccountTrieNodeDataRequest(
blockHeader.getStateRoot(), Bytes.EMPTY, inconsistentAccounts));
});
}
public synchronized void reloadHeal() {
worldStateStorage.clearFlatDatabase();
pendingTrieNodeRequests.clearInternalQueues();
pendingCodeRequests.clearInternalQueue();
enqueueRequest(
createAccountTrieNodeDataRequest(
snapSyncState.getPivotBlockHeader().orElseThrow().getStateRoot(),
Bytes.EMPTY,
inconsistentAccounts));
requestComplete(true);
notifyTaskAvailable();
}
@Override
public synchronized void enqueueRequest(final SnapDataRequest request) {
if (!internalFuture.isDone()) {
@ -266,14 +284,6 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
List.of(pendingTrieNodeRequests));
}
public void clearTrieNodes() {
worldStateStorage.clearFlatDatabase();
pendingTrieNodeRequests.clearInternalQueues();
pendingCodeRequests.clearInternalQueue();
snapSyncState.setHealStatus(false);
snapSyncState.getPivotBlockHeader().ifPresent(this::checkCompletion);
}
public RunnableCounter getGeneratedNodes() {
return generatedNodes;
}
@ -289,4 +299,8 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
private void displayHealProgress() {
LOG.info("Healed {} world sync nodes", healedNodes.get());
}
public void setDynamicPivotBlockManager(final DynamicPivotBlockManager dynamicPivotBlockManager) {
this.dynamicPivotBlockManager = dynamicPivotBlockManager;
}
}

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.DynamicPivotBlockManager.doNothingOnPivotChange;
import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
@ -138,15 +139,14 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
private SnapSyncState snapSyncState;
private PersistDataStep persistDataStep;
private CompleteTaskStep completeTaskStep;
private DynamicPivotBlockManager<SnapDataRequest> pivotBlockManager;
private DynamicPivotBlockManager pivotBlockManager;
public Builder configuration(final SnapSyncConfiguration snapSyncConfiguration) {
this.snapSyncConfiguration = snapSyncConfiguration;
return this;
}
public Builder pivotBlockManager(
final DynamicPivotBlockManager<SnapDataRequest> pivotBlockManager) {
public Builder pivotBlockManager(final DynamicPivotBlockManager pivotBlockManager) {
this.pivotBlockManager = pivotBlockManager;
return this;
}
@ -231,7 +231,7 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
.thenProcess(
"checkNewPivotBlock",
tasks -> {
pivotBlockManager.check(blockHeader -> {});
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsync(
@ -254,7 +254,7 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
.thenProcess(
"checkNewPivotBlock",
tasks -> {
pivotBlockManager.check(blockHeader -> {});
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsyncOrdered(
@ -280,7 +280,7 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
.thenProcess(
"checkNewPivotBlock",
tasks -> {
pivotBlockManager.check(blockHeader -> {});
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsyncOrdered(
@ -320,9 +320,8 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
"checkNewPivotBlock",
tasks -> {
pivotBlockManager.check(
blockHeader -> {
if (snapSyncState.isHealInProgress()) downloadState.clearTrieNodes();
});
(blockHeader, newBlockFound) ->
reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound));
return tasks;
})
.thenProcessAsyncOrdered(
@ -357,9 +356,8 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
"checkNewPivotBlock",
tasks -> {
pivotBlockManager.check(
blockHeader -> {
if (snapSyncState.isHealInProgress()) downloadState.clearTrieNodes();
});
(blockHeader, newBlockFound) ->
reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound));
return tasks;
})
.thenProcessAsync(
@ -385,4 +383,13 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
requestsToComplete);
}
}
private static void reloadHealWhenNeeded(
final SnapSyncState snapSyncState,
final SnapWorldDownloadState downloadState,
final boolean newBlockFound) {
if (snapSyncState.isHealInProgress() && newBlockFound) {
downloadState.reloadHeal();
}
}
}

@ -138,17 +138,18 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
maybeCompleteTask = Optional.of(new CompleteTaskStep(snapSyncState, metricsSystem));
final DynamicPivotBlockManager dynamicPivotBlockManager =
new DynamicPivotBlockManager(
fastSyncActions,
snapSyncState,
snapSyncConfiguration.getPivotBlockWindowValidity(),
snapSyncConfiguration.getPivotBlockDistanceBeforeCaching());
downloadProcess =
SnapWorldStateDownloadProcess.builder()
.configuration(snapSyncConfiguration)
.maxOutstandingRequests(maxOutstandingRequests)
.pivotBlockManager(
new DynamicPivotBlockManager<>(
newDownloadState,
fastSyncActions,
snapSyncState,
snapSyncConfiguration.getPivotBlockWindowValidity(),
snapSyncConfiguration.getPivotBlockDistanceBeforeCaching()))
.pivotBlockManager(dynamicPivotBlockManager)
.loadLocalDataStep(
new LoadLocalDataStep(
worldStateStorage, newDownloadState, metricsSystem, snapSyncState))
@ -167,6 +168,8 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
.metricsSystem(metricsSystem)
.build();
newDownloadState.setDynamicPivotBlockManager(dynamicPivotBlockManager);
return newDownloadState.startDownload(downloadProcess, ethContext.getScheduler());
}
}

@ -16,7 +16,6 @@ package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -27,7 +26,6 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import java.util.OptionalLong;
@ -39,13 +37,11 @@ import org.junit.Test;
public class DynamicPivotBlockManagerTest {
private final SnapSyncState snapSyncState = mock(SnapSyncState.class);
private final SnapWorldDownloadState downloadState = mock(SnapWorldDownloadState.class);
private final FastSyncActions fastSyncActions = mock(FastSyncActions.class);
private final SyncState syncState = mock(SyncState.class);
private final DynamicPivotBlockManager<SnapDataRequest> dynamicPivotBlockManager =
new DynamicPivotBlockManager<>(
downloadState,
private final DynamicPivotBlockManager dynamicPivotBlockManager =
new DynamicPivotBlockManager(
fastSyncActions,
snapSyncState,
SnapSyncConfiguration.DEFAULT_PIVOT_BLOCK_WINDOW_VALIDITY,
@ -63,9 +59,7 @@ public class DynamicPivotBlockManagerTest {
when(snapSyncState.getPivotBlockNumber()).thenReturn(OptionalLong.of(999));
dynamicPivotBlockManager.check(
blockHeader -> {
fail("new pivot block not expected");
});
(blockHeader, newBlockFound) -> assertThat(newBlockFound).isFalse());
verify(fastSyncActions, never()).waitForSuitablePeers(any());
}
@ -87,9 +81,7 @@ public class DynamicPivotBlockManagerTest {
when(snapSyncState.getPivotBlockNumber()).thenReturn(OptionalLong.of(939));
dynamicPivotBlockManager.check(
blockHeader -> {
fail("new pivot block not expected");
});
(blockHeader, newBlockFound) -> assertThat(newBlockFound).isFalse());
verify(fastSyncActions).waitForSuitablePeers(any());
}
@ -111,15 +103,17 @@ public class DynamicPivotBlockManagerTest {
when(snapSyncState.getPivotBlockNumber()).thenReturn(OptionalLong.of(939));
dynamicPivotBlockManager.check(
blockHeader -> {
fail("new pivot block not expected");
(blockHeader, newBlockFound) -> {
assertThat(blockHeader.getNumber()).isEqualTo(939);
assertThat(newBlockFound).isFalse();
});
when(syncState.bestChainHeight()).thenReturn(1066L);
dynamicPivotBlockManager.check(
blockHeader -> {
(blockHeader, newBlockFound) -> {
assertThat(blockHeader.getNumber()).isEqualTo(pivotBlockHeader.getNumber());
assertThat(newBlockFound).isTrue();
});
verify(snapSyncState).setCurrentHeader(pivotBlockHeader);

@ -73,6 +73,7 @@ public class LoadLocalDataStepTest {
@Test
public void shouldReturnStreamWithUnchangedTaskWhenDataNotPresent() {
final Stream<Task<SnapDataRequest>> output =
loadLocalDataStep.loadLocalDataTrieNode(task, completedTasks);
@ -80,6 +81,24 @@ public class LoadLocalDataStepTest {
assertThat(output).containsExactly(task);
}
@Test
public void shouldReturnStreamWithSameRootHashTaskWhenDataArePresent() {
task.getData().setRootHash(blockHeader.getStateRoot());
when(worldStateStorage.getAccountStateTrieNode(any(), any())).thenReturn(Optional.of(DATA));
when(worldStateStorage.updater()).thenReturn(mock(WorldStateStorage.Updater.class));
final BlockHeader newBlockHeader =
new BlockHeaderTestFixture().stateRoot(Hash.EMPTY).buildHeader();
when(snapSyncState.getPivotBlockHeader()).thenReturn(Optional.of(newBlockHeader));
loadLocalDataStep.loadLocalDataTrieNode(task, completedTasks);
assertThat(completedTasks.poll()).isEqualTo(task);
assertThat(task.getData().getRootHash()).isEqualTo(blockHeader.getStateRoot());
}
@Test
public void shouldReturnEmptyStreamAndSendTaskToCompletedPipeWhenDataIsPresent() {
when(worldStateStorage.getAccountStateTrieNode(any(), any())).thenReturn(Optional.of(DATA));

@ -15,6 +15,8 @@
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -40,6 +42,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
@ -48,6 +51,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@SuppressWarnings("unchecked")
@RunWith(Parameterized.class)
public class SnapWorldDownloadStateTest {
@ -99,6 +103,16 @@ public class SnapWorldDownloadStateTest {
MIN_MILLIS_BEFORE_STALLING,
metricsSystem,
clock);
final DynamicPivotBlockManager dynamicPivotBlockManager = mock(DynamicPivotBlockManager.class);
doAnswer(
invocation -> {
BiConsumer<BlockHeader, Boolean> callback = invocation.getArgument(0);
callback.accept(header, false);
return null;
})
.when(dynamicPivotBlockManager)
.switchToNewPivotBlock(any());
downloadState.setDynamicPivotBlockManager(dynamicPivotBlockManager);
downloadState.setRootNodeData(ROOT_NODE_DATA);
future = downloadState.getDownloadFuture();
assertThat(downloadState.isDownloading()).isTrue();

Loading…
Cancel
Save