Separate download state tracking from WorldStateDownloader (#967)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent b81ad01b0f
commit 9f8d14522e
  1. 1
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java
  2. 212
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java
  3. 287
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java
  4. 237
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java
  5. 16
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java

@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public abstract class NodeDataRequest {
private final RequestType requestType;
private final Hash hash;
private BytesValue data;

@ -0,0 +1,212 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
class WorldDownloadState {
private static final Logger LOG = LogManager.getLogger();
private final TaskQueue<NodeDataRequest> pendingRequests;
private final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist;
private final int maxOutstandingRequests;
private final Set<EthTask<?>> outstandingRequests =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicBoolean sendingRequests = new AtomicBoolean(false);
private final CompletableFuture<Void> internalFuture;
private final CompletableFuture<Void> downloadFuture;
private boolean waitingForNewPeer = false;
private BytesValue rootNodeData;
private EthTask<?> persistenceTask;
public WorldDownloadState(
final TaskQueue<NodeDataRequest> pendingRequests,
final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist,
final int maxOutstandingRequests) {
this.pendingRequests = pendingRequests;
this.requestsToPersist = requestsToPersist;
this.maxOutstandingRequests = maxOutstandingRequests;
this.internalFuture = new CompletableFuture<>();
this.downloadFuture = new CompletableFuture<>();
this.internalFuture.whenComplete(this::cleanup);
this.downloadFuture.exceptionally(
error -> {
// Propagate cancellation back to our internal future.
if (error instanceof CancellationException) {
this.internalFuture.cancel(true);
}
return null;
});
}
private synchronized void cleanup(final Void result, final Throwable error) {
// Handle cancellations
if (internalFuture.isCancelled()) {
LOG.info("World state download cancelled");
} else if (error != null) {
if (!(ExceptionUtils.rootCause(error) instanceof StalledDownloadException)) {
LOG.info("World state download failed. ", error);
}
}
if (persistenceTask != null) {
persistenceTask.cancel();
}
for (final EthTask<?> outstandingRequest : outstandingRequests) {
outstandingRequest.cancel();
}
pendingRequests.clear();
requestsToPersist.clear();
if (error != null) {
downloadFuture.completeExceptionally(error);
} else {
downloadFuture.complete(result);
}
}
public void whileAdditionalRequestsCanBeSent(final Runnable action) {
while (shouldRequestNodeData()) {
if (sendingRequests.compareAndSet(false, true)) {
try {
action.run();
} finally {
sendingRequests.set(false);
}
} else {
break;
}
}
}
public synchronized void setWaitingForNewPeer(final boolean waitingForNewPeer) {
this.waitingForNewPeer = waitingForNewPeer;
}
public synchronized void addOutstandingTask(final EthTask<?> task) {
outstandingRequests.add(task);
}
public synchronized void removeOutstandingTask(final EthTask<?> task) {
outstandingRequests.remove(task);
}
public int getOutstandingRequestCount() {
return outstandingRequests.size();
}
private synchronized boolean shouldRequestNodeData() {
return !internalFuture.isDone()
&& outstandingRequests.size() < maxOutstandingRequests
&& !pendingRequests.isEmpty()
&& !waitingForNewPeer;
}
public CompletableFuture<Void> getDownloadFuture() {
return downloadFuture;
}
public synchronized void setPersistenceTask(final EthTask<?> persistenceTask) {
this.persistenceTask = persistenceTask;
}
public synchronized void enqueueRequest(final NodeDataRequest request) {
if (!internalFuture.isDone()) {
pendingRequests.enqueue(request);
}
}
public synchronized void enqueueRequests(final Stream<NodeDataRequest> requests) {
if (!internalFuture.isDone()) {
requests.forEach(pendingRequests::enqueue);
}
}
public synchronized Task<NodeDataRequest> dequeueRequest() {
if (internalFuture.isDone()) {
return null;
}
return pendingRequests.dequeue();
}
public synchronized void setRootNodeData(final BytesValue rootNodeData) {
this.rootNodeData = rootNodeData;
}
public ArrayBlockingQueue<Task<NodeDataRequest>> getRequestsToPersist() {
return requestsToPersist;
}
public void addToPersistenceQueue(final Task<NodeDataRequest> task) {
while (!internalFuture.isDone()) {
try {
if (requestsToPersist.offer(task, 1, TimeUnit.SECONDS)) {
break;
}
} catch (final InterruptedException e) {
task.markFailed();
Thread.currentThread().interrupt();
break;
}
}
}
public int getPersistenceQueueSize() {
return requestsToPersist.size();
}
public synchronized void markAsStalled(final int maxNodeRequestRetries) {
final String message =
"Download stalled due to too many failures to retrieve node data (>"
+ maxNodeRequestRetries
+ " failures)";
final WorldStateDownloaderException e = new StalledDownloadException(message);
internalFuture.completeExceptionally(e);
}
public synchronized boolean checkCompletion(
final WorldStateStorage worldStateStorage, final BlockHeader header) {
if (!internalFuture.isDone() && pendingRequests.allTasksCompleted()) {
final Updater updater = worldStateStorage.updater();
updater.putAccountStateTrieNode(header.getStateRoot(), rootNodeData);
updater.commit();
internalFuture.complete(null);
LOG.info("Finished downloading world state from peers");
return true;
} else {
return false;
}
}
public synchronized boolean isDownloading() {
return !internalFuture.isDone();
}
}

@ -20,7 +20,6 @@ import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractEthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.GetNodeDataFromPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.WaitForPeerTask;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
@ -39,16 +38,16 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
@ -56,64 +55,43 @@ import org.apache.logging.log4j.Logger;
public class WorldStateDownloader {
private static final Logger LOG = LogManager.getLogger();
private final Counter completedRequestsCounter;
private final Counter retriedRequestsCounter;
private final Counter existingNodeCounter;
private final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist;
private enum Status {
IDLE,
RUNNING,
CANCELLED,
COMPLETED
}
private final MetricsSystem metricsSystem;
private final AtomicInteger highestRetryCount = new AtomicInteger(0);
private final EthContext ethContext;
private final TaskQueue<NodeDataRequest> pendingRequests;
private final TaskQueue<NodeDataRequest> taskQueue;
private final int hashCountPerRequest;
private final int maxOutstandingRequests;
private final int maxNodeRequestRetries;
private final Set<EthTask<?>> outstandingRequests =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final MetricsSystem metricsSystem;
private final WorldStateStorage worldStateStorage;
private final AtomicBoolean sendingRequests = new AtomicBoolean(false);
private volatile CompletableFuture<Void> future;
private volatile Status status = Status.IDLE;
private volatile BytesValue rootNode;
private volatile PersistNodeDataTask persistenceTask;
private final AtomicInteger highestRetryCount = new AtomicInteger(0);
private final AtomicReference<WorldDownloadState> downloadState = new AtomicReference<>();
public WorldStateDownloader(
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final TaskQueue<NodeDataRequest> pendingRequests,
final TaskQueue<NodeDataRequest> taskQueue,
final int hashCountPerRequest,
final int maxOutstandingRequests,
final int maxNodeRequestRetries,
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.worldStateStorage = worldStateStorage;
this.pendingRequests = pendingRequests;
this.taskQueue = taskQueue;
this.hashCountPerRequest = hashCountPerRequest;
this.maxOutstandingRequests = maxOutstandingRequests;
this.maxNodeRequestRetries = maxNodeRequestRetries;
this.metricsSystem = metricsSystem;
// Room for the requests we expect to do in parallel plus some buffer but not unlimited.
this.requestsToPersist =
new ArrayBlockingQueue<>(hashCountPerRequest * maxOutstandingRequests * 2);
metricsSystem.createLongGauge(
MetricCategory.SYNCHRONIZER,
"world_state_pending_requests_current",
"Number of pending requests for fast sync world state download",
pendingRequests::size);
metricsSystem.createIntegerGauge(
MetricCategory.SYNCHRONIZER,
"world_state_inflight_requests_current",
"Number of requests currently in flight for fast sync world state download",
outstandingRequests::size);
taskQueue::size);
completedRequestsCounter =
metricsSystem.createCounter(
@ -142,7 +120,20 @@ public class WorldStateDownloader {
MetricCategory.SYNCHRONIZER,
"world_state_node_persistence_queue_length_current",
"Current number of node data requests waiting to be persisted",
requestsToPersist::size);
downloadStateValue(WorldDownloadState::getPersistenceQueueSize));
metricsSystem.createIntegerGauge(
MetricCategory.SYNCHRONIZER,
"world_state_inflight_requests_current",
"Number of requests currently in flight for fast sync world state download",
downloadStateValue(WorldDownloadState::getOutstandingRequestCount));
}
private Supplier<Integer> downloadStateValue(final Function<WorldDownloadState, Integer> getter) {
return () -> {
final WorldDownloadState state = this.downloadState.get();
return state != null ? getter.apply(state) : 0;
};
}
public CompletableFuture<Void> run(final BlockHeader header) {
@ -152,81 +143,98 @@ public class WorldStateDownloader {
header.getHash(),
header.getStateRoot());
synchronized (this) {
if (status == Status.RUNNING) {
final WorldDownloadState oldDownloadState = this.downloadState.get();
if (oldDownloadState != null && oldDownloadState.isDownloading()) {
final CompletableFuture<Void> failed = new CompletableFuture<>();
failed.completeExceptionally(
new IllegalStateException(
"Cannot run an already running " + this.getClass().getSimpleName()));
return failed;
}
status = Status.RUNNING;
future = createFuture();
highestRetryCount.set(0);
final Hash stateRoot = header.getStateRoot();
if (worldStateStorage.isWorldStateAvailable(stateRoot)) {
// If we're requesting data for an existing world state, we're already done
markDone();
} else {
pendingRequests.enqueue(NodeDataRequest.createAccountDataRequest(stateRoot));
ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header));
persistenceTask = new PersistNodeDataTask(header);
ethContext.getScheduler().scheduleServiceTask(persistenceTask);
return CompletableFuture.completedFuture(null);
}
// Room for the requests we expect to do in parallel plus some buffer but not unlimited.
final int persistenceQueueCapacity = hashCountPerRequest * maxNodeRequestRetries * 2;
final WorldDownloadState newDownloadState =
new WorldDownloadState(
taskQueue,
new ArrayBlockingQueue<>(persistenceQueueCapacity),
maxOutstandingRequests);
this.downloadState.set(newDownloadState);
highestRetryCount.set(0);
newDownloadState.enqueueRequest(NodeDataRequest.createAccountDataRequest(stateRoot));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(() -> requestNodeData(header, newDownloadState));
final PersistNodeDataTask persistenceTask = new PersistNodeDataTask(header, newDownloadState);
newDownloadState.setPersistenceTask(persistenceTask);
ethContext.getScheduler().scheduleServiceTask(persistenceTask);
return newDownloadState.getDownloadFuture();
}
return future;
}
public void cancel() {
getFuture().cancel(true);
}
private void requestNodeData(final BlockHeader header) {
while (shouldRequestNodeData()) {
if (sendingRequests.compareAndSet(false, true)) {
final Optional<EthPeer> maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber());
if (!maybePeer.isPresent()) {
// If no peer is available, wait and try again
sendingRequests.set(false);
waitForNewPeer().whenComplete((r, t) -> requestNodeData(header));
break;
} else {
requestDataFromPeer(header, maybePeer.get());
}
sendingRequests.set(false);
} else {
break;
synchronized (this) {
final WorldDownloadState downloadState = this.downloadState.get();
if (downloadState != null) {
downloadState.getDownloadFuture().cancel(true);
}
}
}
private void requestDataFromPeer(final BlockHeader header, final EthPeer peer) {
private void requestNodeData(final BlockHeader header, final WorldDownloadState downloadState) {
downloadState.whileAdditionalRequestsCanBeSent(
() -> {
final Optional<EthPeer> maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber());
if (!maybePeer.isPresent()) {
// If no peer is available, wait and try again
downloadState.setWaitingForNewPeer(true);
waitForNewPeer()
.whenComplete(
(r, t) -> {
downloadState.setWaitingForNewPeer(false);
requestNodeData(header, downloadState);
});
} else {
requestDataFromPeer(header, maybePeer.get(), downloadState);
}
});
}
private void requestDataFromPeer(
final BlockHeader header, final EthPeer peer, final WorldDownloadState downloadState) {
// Collect data to be requested
final List<Task<NodeDataRequest>> toRequest = getTasksForNextRequest();
final List<Task<NodeDataRequest>> toRequest = getTasksForNextRequest(downloadState);
if (toRequest.isEmpty()) {
requestNodeData(header);
return;
}
// Request and process node data
sendAndProcessRequests(peer, toRequest, header)
sendAndProcessRequests(peer, toRequest, header, downloadState)
.whenComplete(
(task, error) -> {
if (error != null
&& !(ExceptionUtils.rootCause(error) instanceof RejectedExecutionException)) {
LOG.error("World state data request failed", error);
}
outstandingRequests.remove(task);
requestNodeData(header);
downloadState.removeOutstandingTask(task);
requestNodeData(header, downloadState);
});
}
private List<Task<NodeDataRequest>> getTasksForNextRequest() {
private List<Task<NodeDataRequest>> getTasksForNextRequest(
final WorldDownloadState downloadState) {
final List<Task<NodeDataRequest>> toRequest = new ArrayList<>();
while (toRequest.size() < hashCountPerRequest) {
final Task<NodeDataRequest> pendingRequestTask = pendingRequests.dequeue();
final Task<NodeDataRequest> pendingRequestTask = downloadState.dequeueRequest();
if (pendingRequestTask == null) {
break;
}
@ -235,7 +243,7 @@ public class WorldStateDownloader {
if (existingData.isPresent()) {
existingNodeCounter.inc();
pendingRequest.setData(existingData.get()).setRequiresPersisting(false);
addToPersistenceQueue(pendingRequestTask);
downloadState.addToPersistenceQueue(pendingRequestTask);
continue;
}
toRequest.add(pendingRequestTask);
@ -243,12 +251,6 @@ public class WorldStateDownloader {
return toRequest;
}
private synchronized boolean shouldRequestNodeData() {
return !future.isDone()
&& outstandingRequests.size() < maxOutstandingRequests
&& !pendingRequests.isEmpty();
}
private CompletableFuture<?> waitForNewPeer() {
return ethContext
.getScheduler()
@ -258,7 +260,8 @@ public class WorldStateDownloader {
private CompletableFuture<AbstractPeerTask<Map<Hash, BytesValue>>> sendAndProcessRequests(
final EthPeer peer,
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader) {
final BlockHeader blockHeader,
final WorldDownloadState downloadState) {
final List<Hash> hashes =
requestTasks.stream()
.map(Task::getData)
@ -267,7 +270,7 @@ public class WorldStateDownloader {
.collect(Collectors.toList());
final AbstractPeerTask<Map<Hash, BytesValue>> ethTask =
GetNodeDataFromPeerTask.forHashes(ethContext, hashes, metricsSystem).assignPeer(peer);
outstandingRequests.add(ethTask);
downloadState.addOutstandingTask(ethTask);
return ethTask
.run()
.thenApply(PeerTaskResult::getResult)
@ -287,14 +290,17 @@ public class WorldStateDownloader {
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() -> storeData(requestTasks, blockHeader, ethTask, data)));
() -> {
storeData(requestTasks, blockHeader, data, downloadState);
return CompletableFuture.completedFuture(ethTask);
}));
}
private CompletableFuture<AbstractPeerTask<Map<Hash, BytesValue>>> storeData(
private void storeData(
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader,
final AbstractPeerTask<Map<Hash, BytesValue>> ethTask,
final Map<Hash, BytesValue> data) {
final Map<Hash, BytesValue> data,
final WorldDownloadState downloadState) {
for (final Task<NodeDataRequest> task : requestTasks) {
final NodeDataRequest request = task.getData();
final BytesValue matchingData = data.get(request.getHash());
@ -302,37 +308,23 @@ public class WorldStateDownloader {
retriedRequestsCounter.inc();
final int requestFailures = request.trackFailure();
updateHighestRetryCount(requestFailures);
task.markFailed();
if (requestFailures > maxNodeRequestRetries) {
handleStalledDownload();
LOG.info("Unavailable node {}", request.getHash());
downloadState.markAsStalled(maxNodeRequestRetries);
}
task.markFailed();
} else {
request.setData(matchingData);
if (isRootState(blockHeader, request)) {
queueChildRequests(request);
rootNode = request.getData();
downloadState.enqueueRequests(request.getChildRequests());
downloadState.setRootNodeData(request.getData());
task.markCompleted();
} else {
addToPersistenceQueue(task);
downloadState.addToPersistenceQueue(task);
}
}
}
requestNodeData(blockHeader);
return CompletableFuture.completedFuture(ethTask);
}
private void addToPersistenceQueue(final Task<NodeDataRequest> task) {
while (!future.isDone()) {
try {
if (requestsToPersist.offer(task, 1, TimeUnit.SECONDS)) {
break;
}
} catch (final InterruptedException e) {
task.markFailed();
Thread.currentThread().interrupt();
break;
}
}
requestNodeData(blockHeader, downloadState);
}
private void updateHighestRetryCount(final int requestFailures) {
@ -343,63 +335,6 @@ public class WorldStateDownloader {
}
}
private synchronized void queueChildRequests(final NodeDataRequest request) {
if (status == Status.RUNNING) {
request.getChildRequests().forEach(pendingRequests::enqueue);
}
}
private synchronized CompletableFuture<Void> getFuture() {
if (future == null) {
future = createFuture();
}
return future;
}
private CompletableFuture<Void> createFuture() {
final CompletableFuture<Void> future = new CompletableFuture<>();
future.whenComplete(
(res, err) -> {
// Handle cancellations
if (future.isCancelled()) {
LOG.info("World state download cancelled");
doCancelDownload();
} else if (err != null) {
if (!(ExceptionUtils.rootCause(err) instanceof StalledDownloadException)) {
LOG.info("World state download failed. ", err);
}
doCancelDownload();
}
});
return future;
}
private synchronized void handleStalledDownload() {
final String message =
"Download stalled due to too many failures to retrieve node data (>"
+ maxNodeRequestRetries
+ " failures)";
final WorldStateDownloaderException e = new StalledDownloadException(message);
future.completeExceptionally(e);
}
private synchronized void doCancelDownload() {
status = Status.CANCELLED;
persistenceTask.cancel();
pendingRequests.clear();
for (final EthTask<?> outstandingRequest : outstandingRequests) {
outstandingRequest.cancel();
}
}
private synchronized void markDone() {
final boolean completed = getFuture().complete(null);
if (completed) {
LOG.info("Finished downloading world state from peers");
status = Status.COMPLETED;
}
}
private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
}
@ -408,10 +343,12 @@ public class WorldStateDownloader {
private final List<Task<NodeDataRequest>> batch;
private final BlockHeader header;
private final WorldDownloadState downloadState;
public PersistNodeDataTask(final BlockHeader header) {
public PersistNodeDataTask(final BlockHeader header, final WorldDownloadState downloadState) {
super(metricsSystem);
this.header = header;
this.downloadState = downloadState;
batch = new ArrayList<>();
}
@ -419,6 +356,8 @@ public class WorldStateDownloader {
protected void executeTask() {
while (!isDone()) {
try {
final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist =
downloadState.getRequestsToPersist();
final Task<NodeDataRequest> task = requestsToPersist.poll(1, TimeUnit.SECONDS);
if (task != null) {
batch.clear();
@ -429,20 +368,18 @@ public class WorldStateDownloader {
taskToPersist -> {
final NodeDataRequest request = taskToPersist.getData();
request.persist(storageUpdater);
queueChildRequests(request);
downloadState.enqueueRequests(request.getChildRequests());
taskToPersist.markCompleted();
completedRequestsCounter.inc();
});
storageUpdater.commit();
if (pendingRequests.allTasksCompleted()) {
final Updater updater = worldStateStorage.updater();
updater.putAccountStateTrieNode(header.getStateRoot(), rootNode);
updater.commit();
markDone();
if (downloadState.checkCompletion(worldStateStorage, header)) {
result.get().complete(null);
} else {
ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(() -> requestNodeData(header, downloadState));
}
}
} catch (final InterruptedException ignore) {
@ -452,7 +389,9 @@ public class WorldStateDownloader {
// Assume we failed to persist any of the requests and ensure we have something
// scheduled to kick off another round of requests.
batch.forEach(Task::markFailed);
ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(() -> requestNodeData(header, downloadState));
}
}
}

@ -0,0 +1,237 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static tech.pegasys.pantheon.ethereum.eth.sync.worldstate.NodeDataRequest.createAccountDataRequest;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.queue.InMemoryTaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.junit.Before;
import org.junit.Test;
public class WorldDownloadStateTest {
private static final BytesValue ROOT_NODE_DATA = BytesValue.of(1, 2, 3, 4);
private static final Hash ROOT_NODE_HASH = Hash.hash(ROOT_NODE_DATA);
private static final int MAX_OUTSTANDING_REQUESTS = 3;
private final WorldStateStorage worldStateStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
private final BlockHeader header =
new BlockHeaderTestFixture().stateRoot(ROOT_NODE_HASH).buildHeader();
private final InMemoryTaskQueue<NodeDataRequest> pendingRequests = new InMemoryTaskQueue<>();
private final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist =
new ArrayBlockingQueue<>(100);
private final WorldDownloadState downloadState =
new WorldDownloadState(pendingRequests, requestsToPersist, MAX_OUTSTANDING_REQUESTS);
private final CompletableFuture<Void> future = downloadState.getDownloadFuture();
@Before
public void setUp() {
downloadState.setRootNodeData(ROOT_NODE_DATA);
assertThat(downloadState.isDownloading()).isTrue();
}
@Test
public void shouldCompleteReturnedFutureWhenNoPendingTasksRemain() {
downloadState.checkCompletion(worldStateStorage, header);
assertThat(future).isCompleted();
assertThat(downloadState.isDownloading()).isFalse();
}
@Test
public void shouldStoreRootNodeBeforeReturnedFutureCompletes() {
final CompletableFuture<Void> postFutureChecks =
future.thenAccept(
result ->
assertThat(worldStateStorage.getAccountStateTrieNode(ROOT_NODE_HASH))
.contains(ROOT_NODE_DATA));
downloadState.checkCompletion(worldStateStorage, header);
assertThat(future).isCompleted();
assertThat(postFutureChecks).isCompleted();
}
@Test
public void shouldNotCompleteWhenThereArePendingTasks() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
downloadState.checkCompletion(worldStateStorage, header);
assertThat(future).isNotDone();
assertThat(worldStateStorage.getAccountStateTrieNode(ROOT_NODE_HASH)).isEmpty();
assertThat(downloadState.isDownloading()).isTrue();
}
@Test
@SuppressWarnings("unchecked")
public void shouldCancelOutstandingTasksWhenFutureIsCancelled() {
final EthTask<?> persistenceTask = mock(EthTask.class);
final EthTask<?> outstandingTask1 = mock(EthTask.class);
final EthTask<?> outstandingTask2 = mock(EthTask.class);
final Task<NodeDataRequest> toPersist1 = mock(Task.class);
final Task<NodeDataRequest> toPersist2 = mock(Task.class);
downloadState.setPersistenceTask(persistenceTask);
downloadState.addOutstandingTask(outstandingTask1);
downloadState.addOutstandingTask(outstandingTask2);
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY));
requestsToPersist.add(toPersist1);
requestsToPersist.add(toPersist2);
future.cancel(true);
verify(persistenceTask).cancel();
verify(outstandingTask1).cancel();
verify(outstandingTask2).cancel();
assertThat(pendingRequests.isEmpty()).isTrue();
assertThat(requestsToPersist).isEmpty();
assertThat(downloadState.isDownloading()).isFalse();
}
@Test
public void shouldNotSendAdditionalRequestsWhenWaitingForANewPeer() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
downloadState.setWaitingForNewPeer(true);
downloadState.whileAdditionalRequestsCanBeSent(mustNotBeCalled());
}
@Test
public void shouldResumeSendingAdditionalRequestsWhenNoLongerWaitingForPeer() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest =
mockWithAction(() -> downloadState.addOutstandingTask(mock(EthTask.class)));
downloadState.setWaitingForNewPeer(true);
downloadState.whileAdditionalRequestsCanBeSent(mustNotBeCalled());
downloadState.setWaitingForNewPeer(false);
downloadState.whileAdditionalRequestsCanBeSent(sendRequest);
verify(sendRequest, times(MAX_OUTSTANDING_REQUESTS)).run();
}
@Test
public void shouldStopSendingAdditionalRequestsWhenPendingRequestsIsEmpty() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest = mockWithAction(pendingRequests::dequeue);
downloadState.whileAdditionalRequestsCanBeSent(sendRequest);
verify(sendRequest, times(2)).run();
}
@Test
public void shouldStopSendingAdditionalRequestsWhenMaximumOutstandingRequestCountReached() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest =
mockWithAction(() -> downloadState.addOutstandingTask(mock(EthTask.class)));
downloadState.whileAdditionalRequestsCanBeSent(sendRequest);
verify(sendRequest, times(MAX_OUTSTANDING_REQUESTS)).run();
}
@Test
public void shouldStopSendingAdditionalRequestsWhenFutureIsCancelled() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest = mockWithAction(() -> future.cancel(true));
downloadState.whileAdditionalRequestsCanBeSent(sendRequest);
verify(sendRequest, times(1)).run();
}
@Test
public void shouldStopSendingAdditionalRequestsWhenDownloadIsMarkedAsStalled() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest = mockWithAction(() -> downloadState.markAsStalled(1));
downloadState.whileAdditionalRequestsCanBeSent(sendRequest);
verify(sendRequest, times(1)).run();
}
@Test
public void shouldNotAllowMultipleCallsToSendAdditionalRequestsAtOnce() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest =
mockWithAction(
() -> {
downloadState.whileAdditionalRequestsCanBeSent(mustNotBeCalled());
downloadState.addOutstandingTask(mock(EthTask.class));
});
downloadState.whileAdditionalRequestsCanBeSent(sendRequest);
verify(sendRequest, times(MAX_OUTSTANDING_REQUESTS)).run();
}
@Test
public void shouldNotEnqueueRequestsAfterDownloadIsStalled() {
downloadState.checkCompletion(worldStateStorage, header);
downloadState.enqueueRequests(Stream.of(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)));
downloadState.enqueueRequest(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
assertThat(pendingRequests.isEmpty()).isTrue();
}
@Test // Sanity check for the test structure
public void shouldFailWhenMustNotBeCalledIsCalled() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
assertThatThrownBy(() -> downloadState.whileAdditionalRequestsCanBeSent(mustNotBeCalled()))
.hasMessage("Unexpected invocation");
}
private Runnable mustNotBeCalled() {
return () -> fail("Unexpected invocation");
}
private Runnable mockWithAction(final Runnable action) {
final Runnable runnable = mock(Runnable.class);
doAnswer(
invocation -> {
action.run();
return null;
})
.when(runnable)
.run();
return runnable;
}
}

@ -27,6 +27,7 @@ import tech.pegasys.pantheon.ethereum.core.Account;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.MutableWorldState;
import tech.pegasys.pantheon.ethereum.core.WorldState;
@ -640,8 +641,12 @@ public class WorldStateDownloaderTest {
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber());
// Start downloader
final CompletableFuture<?> result = downloader.run(header);
// Start downloader (with a state root that's not available anywhere
final CompletableFuture<?> result =
downloader.run(
new BlockHeaderTestFixture()
.stateRoot(Hash.hash(BytesValue.of(1, 2, 3, 4)))
.buildHeader());
// A second run should return an error without impacting the first result
final CompletableFuture<?> secondResult = downloader.run(header);
assertThat(secondResult).isCompletedExceptionally();
@ -652,6 +657,13 @@ public class WorldStateDownloaderTest {
assertThat(result).isCompletedExceptionally();
assertThatThrownBy(result::get).hasCauseInstanceOf(StalledDownloadException.class);
// Finally, check that when we restart the download with state that is available it works
final CompletableFuture<Void> retryResult = downloader.run(header);
final Responder responder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
peer.respondWhileOtherThreadsWork(responder, () -> !retryResult.isDone());
assertThat(retryResult).isCompleted();
}
/**

Loading…
Cancel
Save