mirror of https://github.com/hyperledger/besu
fastsync refactor (#3179)
Light refactor of the fastsync code to facilitate the implementation of the snapsync Signed-off-by: Karim TAAM <karim.t2am@gmail.com>pull/3215/head
parent
a974cec2e6
commit
110052c9f3
@ -0,0 +1,65 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.BlockHeader; |
||||
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; |
||||
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; |
||||
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater; |
||||
import org.hyperledger.besu.services.tasks.CachingTaskCollection; |
||||
|
||||
import java.time.Clock; |
||||
import java.util.Optional; |
||||
|
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
import org.apache.tuweni.bytes.Bytes; |
||||
|
||||
public class FastWorldDownloadState extends WorldDownloadState<NodeDataRequest> { |
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
|
||||
public FastWorldDownloadState( |
||||
final CachingTaskCollection<NodeDataRequest> pendingRequests, |
||||
final int maxRequestsWithoutProgress, |
||||
final long minMillisBeforeStalling, |
||||
final Clock clock) { |
||||
super(pendingRequests, maxRequestsWithoutProgress, minMillisBeforeStalling, clock); |
||||
} |
||||
|
||||
@Override |
||||
public synchronized boolean checkCompletion( |
||||
final WorldStateStorage worldStateStorage, final BlockHeader header) { |
||||
if (!internalFuture.isDone() && pendingRequests.allTasksCompleted()) { |
||||
if (rootNodeData == null) { |
||||
enqueueRequest( |
||||
NodeDataRequest.createAccountDataRequest( |
||||
header.getStateRoot(), Optional.of(Bytes.EMPTY))); |
||||
return false; |
||||
} |
||||
final Updater updater = worldStateStorage.updater(); |
||||
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData); |
||||
updater.commit(); |
||||
|
||||
internalFuture.complete(null); |
||||
// THere are no more inputs to process so make sure we wake up any threads waiting to dequeue
|
||||
// so they can give up waiting.
|
||||
notifyAll(); |
||||
LOG.info("Finished downloading world state from peers"); |
||||
return true; |
||||
} else { |
||||
return false; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,210 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; |
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull; |
||||
import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.BlockHeader; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; |
||||
import org.hyperledger.besu.ethereum.eth.sync.worldstate.TaskQueueIterator; |
||||
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadProcess; |
||||
import org.hyperledger.besu.metrics.BesuMetricCategory; |
||||
import org.hyperledger.besu.plugin.services.MetricsSystem; |
||||
import org.hyperledger.besu.plugin.services.metrics.Counter; |
||||
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; |
||||
import org.hyperledger.besu.services.pipeline.Pipe; |
||||
import org.hyperledger.besu.services.pipeline.Pipeline; |
||||
import org.hyperledger.besu.services.pipeline.PipelineBuilder; |
||||
import org.hyperledger.besu.services.pipeline.WritePipe; |
||||
import org.hyperledger.besu.services.tasks.Task; |
||||
import org.hyperledger.besu.util.ExceptionUtils; |
||||
|
||||
import java.util.concurrent.CancellationException; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
public class FastWorldStateDownloadProcess implements WorldStateDownloadProcess { |
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
private final Pipeline<Task<NodeDataRequest>> fetchDataPipeline; |
||||
private final Pipeline<Task<NodeDataRequest>> completionPipeline; |
||||
private final WritePipe<Task<NodeDataRequest>> requestsToComplete; |
||||
|
||||
private FastWorldStateDownloadProcess( |
||||
final Pipeline<Task<NodeDataRequest>> fetchDataPipeline, |
||||
final Pipeline<Task<NodeDataRequest>> completionPipeline, |
||||
final WritePipe<Task<NodeDataRequest>> requestsToComplete) { |
||||
this.fetchDataPipeline = fetchDataPipeline; |
||||
this.completionPipeline = completionPipeline; |
||||
this.requestsToComplete = requestsToComplete; |
||||
} |
||||
|
||||
public static Builder builder() { |
||||
return new Builder(); |
||||
} |
||||
|
||||
@Override |
||||
public CompletableFuture<Void> start(final EthScheduler ethScheduler) { |
||||
final CompletableFuture<Void> fetchDataFuture = ethScheduler.startPipeline(fetchDataPipeline); |
||||
final CompletableFuture<Void> completionFuture = ethScheduler.startPipeline(completionPipeline); |
||||
|
||||
fetchDataFuture.whenComplete( |
||||
(result, error) -> { |
||||
if (error != null) { |
||||
if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) { |
||||
LOG.error("Pipeline failed", error); |
||||
} |
||||
completionPipeline.abort(); |
||||
} else { |
||||
// No more data to fetch, so propagate the pipe closure onto the completion pipe.
|
||||
requestsToComplete.close(); |
||||
} |
||||
}); |
||||
|
||||
completionFuture.exceptionally( |
||||
error -> { |
||||
if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) { |
||||
LOG.error("Pipeline failed", error); |
||||
} |
||||
fetchDataPipeline.abort(); |
||||
return null; |
||||
}); |
||||
return completionFuture; |
||||
} |
||||
|
||||
@Override |
||||
public void abort() { |
||||
fetchDataPipeline.abort(); |
||||
completionPipeline.abort(); |
||||
} |
||||
|
||||
public static class Builder { |
||||
|
||||
private int hashCountPerRequest; |
||||
private int maxOutstandingRequests; |
||||
private LoadLocalDataStep loadLocalDataStep; |
||||
private FastWorldDownloadState downloadState; |
||||
private MetricsSystem metricsSystem; |
||||
private RequestDataStep requestDataStep; |
||||
private BlockHeader pivotBlockHeader; |
||||
private PersistDataStep persistDataStep; |
||||
private CompleteTaskStep completeTaskStep; |
||||
|
||||
public Builder hashCountPerRequest(final int hashCountPerRequest) { |
||||
this.hashCountPerRequest = hashCountPerRequest; |
||||
return this; |
||||
} |
||||
|
||||
public Builder maxOutstandingRequests(final int maxOutstandingRequests) { |
||||
this.maxOutstandingRequests = maxOutstandingRequests; |
||||
return this; |
||||
} |
||||
|
||||
public Builder loadLocalDataStep(final LoadLocalDataStep loadLocalDataStep) { |
||||
this.loadLocalDataStep = loadLocalDataStep; |
||||
return this; |
||||
} |
||||
|
||||
public Builder requestDataStep(final RequestDataStep requestDataStep) { |
||||
this.requestDataStep = requestDataStep; |
||||
return this; |
||||
} |
||||
|
||||
public Builder persistDataStep(final PersistDataStep persistDataStep) { |
||||
this.persistDataStep = persistDataStep; |
||||
return this; |
||||
} |
||||
|
||||
public Builder completeTaskStep(final CompleteTaskStep completeTaskStep) { |
||||
this.completeTaskStep = completeTaskStep; |
||||
return this; |
||||
} |
||||
|
||||
public Builder downloadState(final FastWorldDownloadState downloadState) { |
||||
this.downloadState = downloadState; |
||||
return this; |
||||
} |
||||
|
||||
public Builder pivotBlockHeader(final BlockHeader pivotBlockHeader) { |
||||
this.pivotBlockHeader = pivotBlockHeader; |
||||
return this; |
||||
} |
||||
|
||||
public Builder metricsSystem(final MetricsSystem metricsSystem) { |
||||
this.metricsSystem = metricsSystem; |
||||
return this; |
||||
} |
||||
|
||||
public FastWorldStateDownloadProcess build() { |
||||
checkNotNull(loadLocalDataStep); |
||||
checkNotNull(requestDataStep); |
||||
checkNotNull(persistDataStep); |
||||
checkNotNull(completeTaskStep); |
||||
checkNotNull(downloadState); |
||||
checkNotNull(pivotBlockHeader); |
||||
checkNotNull(metricsSystem); |
||||
|
||||
// Room for the requests we expect to do in parallel plus some buffer but not unlimited.
|
||||
final int bufferCapacity = hashCountPerRequest * 2; |
||||
final LabelledMetric<Counter> outputCounter = |
||||
metricsSystem.createLabelledCounter( |
||||
BesuMetricCategory.SYNCHRONIZER, |
||||
"world_state_pipeline_processed_total", |
||||
"Number of entries processed by each world state download pipeline stage", |
||||
"step", |
||||
"action"); |
||||
|
||||
final Pipeline<Task<NodeDataRequest>> completionPipeline = |
||||
PipelineBuilder.<Task<NodeDataRequest>>createPipeline( |
||||
"requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request") |
||||
.andFinishWith( |
||||
"requestCompleteTask", |
||||
task -> |
||||
completeTaskStep.markAsCompleteOrFailed( |
||||
pivotBlockHeader, downloadState, task)); |
||||
|
||||
final Pipe<Task<NodeDataRequest>> requestsToComplete = completionPipeline.getInputPipe(); |
||||
final Pipeline<Task<NodeDataRequest>> fetchDataPipeline = |
||||
createPipelineFrom( |
||||
"requestDequeued", |
||||
new TaskQueueIterator<>(downloadState), |
||||
bufferCapacity, |
||||
outputCounter, |
||||
true, |
||||
"world_state_download") |
||||
.thenFlatMapInParallel( |
||||
"requestLoadLocalData", |
||||
task -> loadLocalDataStep.loadLocalData(task, requestsToComplete), |
||||
3, |
||||
bufferCapacity) |
||||
.inBatches(hashCountPerRequest) |
||||
.thenProcessAsync( |
||||
"batchDownloadData", |
||||
requestTasks -> |
||||
requestDataStep.requestData(requestTasks, pivotBlockHeader, downloadState), |
||||
maxOutstandingRequests) |
||||
.thenProcess( |
||||
"batchPersistData", |
||||
tasks -> persistDataStep.persist(tasks, pivotBlockHeader, downloadState)) |
||||
.andFinishWith( |
||||
"batchDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put)); |
||||
|
||||
return new FastWorldStateDownloadProcess( |
||||
fetchDataPipeline, completionPipeline, requestsToComplete); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,177 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; |
||||
|
||||
import org.hyperledger.besu.datatypes.Hash; |
||||
import org.hyperledger.besu.ethereum.core.BlockHeader; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthContext; |
||||
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; |
||||
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; |
||||
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; |
||||
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; |
||||
import org.hyperledger.besu.metrics.BesuMetricCategory; |
||||
import org.hyperledger.besu.plugin.services.MetricsSystem; |
||||
import org.hyperledger.besu.services.tasks.CachingTaskCollection; |
||||
|
||||
import java.time.Clock; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.concurrent.atomic.AtomicReference; |
||||
import java.util.function.Function; |
||||
import java.util.function.IntSupplier; |
||||
|
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
import org.apache.tuweni.bytes.Bytes; |
||||
|
||||
public class FastWorldStateDownloader implements WorldStateDownloader { |
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
|
||||
private final long minMillisBeforeStalling; |
||||
private final Clock clock; |
||||
private final MetricsSystem metricsSystem; |
||||
|
||||
private final EthContext ethContext; |
||||
private final CachingTaskCollection<NodeDataRequest> taskCollection; |
||||
private final int hashCountPerRequest; |
||||
private final int maxOutstandingRequests; |
||||
private final int maxNodeRequestsWithoutProgress; |
||||
private final WorldStateStorage worldStateStorage; |
||||
|
||||
private final AtomicReference<FastWorldDownloadState> downloadState = new AtomicReference<>(); |
||||
|
||||
private Optional<CompleteTaskStep> maybeCompleteTask = Optional.empty(); |
||||
|
||||
public FastWorldStateDownloader( |
||||
final EthContext ethContext, |
||||
final WorldStateStorage worldStateStorage, |
||||
final CachingTaskCollection<NodeDataRequest> taskCollection, |
||||
final int hashCountPerRequest, |
||||
final int maxOutstandingRequests, |
||||
final int maxNodeRequestsWithoutProgress, |
||||
final long minMillisBeforeStalling, |
||||
final Clock clock, |
||||
final MetricsSystem metricsSystem) { |
||||
this.ethContext = ethContext; |
||||
this.worldStateStorage = worldStateStorage; |
||||
this.taskCollection = taskCollection; |
||||
this.hashCountPerRequest = hashCountPerRequest; |
||||
this.maxOutstandingRequests = maxOutstandingRequests; |
||||
this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress; |
||||
this.minMillisBeforeStalling = minMillisBeforeStalling; |
||||
this.clock = clock; |
||||
this.metricsSystem = metricsSystem; |
||||
|
||||
metricsSystem.createIntegerGauge( |
||||
BesuMetricCategory.SYNCHRONIZER, |
||||
"world_state_node_requests_since_last_progress_current", |
||||
"Number of world state requests made since the last time new data was returned", |
||||
downloadStateValue(FastWorldDownloadState::getRequestsSinceLastProgress)); |
||||
|
||||
metricsSystem.createIntegerGauge( |
||||
BesuMetricCategory.SYNCHRONIZER, |
||||
"world_state_inflight_requests_current", |
||||
"Number of in progress requests for world state data", |
||||
downloadStateValue(FastWorldDownloadState::getOutstandingTaskCount)); |
||||
} |
||||
|
||||
private IntSupplier downloadStateValue(final Function<FastWorldDownloadState, Integer> getter) { |
||||
return () -> { |
||||
final FastWorldDownloadState state = this.downloadState.get(); |
||||
return state != null ? getter.apply(state) : 0; |
||||
}; |
||||
} |
||||
|
||||
@Override |
||||
public CompletableFuture<Void> run( |
||||
final FastSyncActions fastSyncActions, final FastSyncState fastSyncState) { |
||||
synchronized (this) { |
||||
final FastWorldDownloadState oldDownloadState = this.downloadState.get(); |
||||
if (oldDownloadState != null && oldDownloadState.isDownloading()) { |
||||
final CompletableFuture<Void> failed = new CompletableFuture<>(); |
||||
failed.completeExceptionally( |
||||
new IllegalStateException( |
||||
"Cannot run an already running " + this.getClass().getSimpleName())); |
||||
return failed; |
||||
} |
||||
|
||||
final BlockHeader header = fastSyncState.getPivotBlockHeader().get(); |
||||
final Hash stateRoot = header.getStateRoot(); |
||||
if (worldStateStorage.isWorldStateAvailable(stateRoot, header.getHash())) { |
||||
LOG.info( |
||||
"World state already available for block {} ({}). State root {}", |
||||
header.getNumber(), |
||||
header.getHash(), |
||||
stateRoot); |
||||
return CompletableFuture.completedFuture(null); |
||||
} |
||||
LOG.info( |
||||
"Begin downloading world state from peers for block {} ({}). State root {}", |
||||
header.getNumber(), |
||||
header.getHash(), |
||||
stateRoot); |
||||
|
||||
final FastWorldDownloadState newDownloadState = |
||||
new FastWorldDownloadState( |
||||
taskCollection, maxNodeRequestsWithoutProgress, minMillisBeforeStalling, clock); |
||||
this.downloadState.set(newDownloadState); |
||||
|
||||
if (!newDownloadState.downloadWasResumed()) { |
||||
// Only queue the root node if we're starting a new download from scratch
|
||||
newDownloadState.enqueueRequest( |
||||
NodeDataRequest.createAccountDataRequest(stateRoot, Optional.of(Bytes.EMPTY))); |
||||
} |
||||
|
||||
maybeCompleteTask = |
||||
Optional.of(new CompleteTaskStep(worldStateStorage, metricsSystem, taskCollection::size)); |
||||
final FastWorldStateDownloadProcess downloadProcess = |
||||
FastWorldStateDownloadProcess.builder() |
||||
.hashCountPerRequest(hashCountPerRequest) |
||||
.maxOutstandingRequests(maxOutstandingRequests) |
||||
.loadLocalDataStep(new LoadLocalDataStep(worldStateStorage, metricsSystem)) |
||||
.requestDataStep(new RequestDataStep(ethContext, metricsSystem)) |
||||
.persistDataStep(new PersistDataStep(worldStateStorage)) |
||||
.completeTaskStep(maybeCompleteTask.get()) |
||||
.downloadState(newDownloadState) |
||||
.pivotBlockHeader(header) |
||||
.metricsSystem(metricsSystem) |
||||
.build(); |
||||
|
||||
newDownloadState.setWorldStateDownloadProcess(downloadProcess); |
||||
|
||||
return newDownloadState.startDownload(downloadProcess, ethContext.getScheduler()); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void cancel() { |
||||
synchronized (this) { |
||||
final FastWorldDownloadState downloadState = this.downloadState.get(); |
||||
if (downloadState != null) { |
||||
downloadState.getDownloadFuture().cancel(true); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public Optional<Long> getPulledStates() { |
||||
return maybeCompleteTask.map(CompleteTaskStep::getCompletedRequests); |
||||
} |
||||
|
||||
@Override |
||||
public Optional<Long> getKnownStates() { |
||||
return maybeCompleteTask.map(task -> task.getCompletedRequests() + task.getPendingRequests()); |
||||
} |
||||
} |
Loading…
Reference in new issue