Fix potential stall in world state download. (#922)

There's an issue with the world state downloader where the download process could stall. The sequence goes like:

1. Inside requestDataFromPeer thread A takes out the sendingRequests lock
2. Thread A checks shouldRequestNodeData which returns true
3. Thread A sends a request for data
4. Thread A checks shouldRequestNodeData which returns false so it exits the while loop
5. Thread B receives the response to the (only) outstanding request
6. Thread B enters shouldRequestNodeData but fails to get the sendingRequests lock so exits the method
7. Thread A releases the sendingRequests lock and exits the methods

There are now no threads checking if they should send new requests and no outstanding requests to trigger a check in the future so the download is stuck and will never make anymore progress.

The fix is to switch the order of taking out the sendingRequests lock and checking shouldRequestNodeData so we release the sendingRequests lock before we go back round the loop to check shouldRequestNodeData.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 4404cac632
commit c4a0d69af9
  1. 106
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java

@ -160,67 +160,75 @@ public class WorldStateDownloader {
}
private void requestNodeData(final BlockHeader header) {
if (sendingRequests.compareAndSet(false, true)) {
while (shouldRequestNodeData()) {
while (shouldRequestNodeData()) {
if (sendingRequests.compareAndSet(false, true)) {
final Optional<EthPeer> maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber());
if (!maybePeer.isPresent()) {
// If no peer is available, wait and try again
sendingRequests.set(false);
waitForNewPeer().whenComplete((r, t) -> requestNodeData(header));
break;
} else {
final EthPeer peer = maybePeer.get();
requestDataFromPeer(header, maybePeer.get());
}
sendingRequests.set(false);
} else {
break;
}
}
}
// Collect data to be requested
final List<Task<NodeDataRequest>> toRequest = new ArrayList<>();
while (toRequest.size() < hashCountPerRequest) {
final Task<NodeDataRequest> pendingRequestTask = pendingRequests.dequeue();
if (pendingRequestTask == null) {
break;
}
final NodeDataRequest pendingRequest = pendingRequestTask.getData();
final Optional<BytesValue> existingData =
pendingRequest.getExistingData(worldStateStorage);
if (existingData.isPresent()) {
pendingRequest.setData(existingData.get());
queueChildRequests(pendingRequest);
completedRequestsCounter.inc();
pendingRequestTask.markCompleted();
continue;
}
toRequest.add(pendingRequestTask);
}
private void requestDataFromPeer(final BlockHeader header, final EthPeer peer) {
// Collect data to be requested
final List<Task<NodeDataRequest>> toRequest = getTasksForNextRequest();
// Request and process node data
sendAndProcessRequests(peer, toRequest, header)
.whenComplete(
(task, error) -> {
final boolean done;
synchronized (this) {
outstandingRequests.remove(task);
done =
status == Status.RUNNING
&& outstandingRequests.size() == 0
&& pendingRequests.allTasksCompleted();
}
if (done) {
// We're done
final Updater updater = worldStateStorage.updater();
updater.putAccountStateTrieNode(header.getStateRoot(), rootNode);
updater.commit();
markDone();
} else {
// Send out additional requests
requestNodeData(header);
}
});
}
// Request and process node data
sendAndProcessRequests(peer, toRequest, header)
.whenComplete(
(task, error) -> {
final boolean done;
synchronized (this) {
outstandingRequests.remove(task);
done =
status == Status.RUNNING
&& outstandingRequests.size() == 0
&& pendingRequests.allTasksCompleted();
}
if (done) {
// We're done
final Updater updater = worldStateStorage.updater();
updater.putAccountStateTrieNode(header.getStateRoot(), rootNode);
updater.commit();
markDone();
} else {
// Send out additional requests
requestNodeData(header);
}
});
}
private List<Task<NodeDataRequest>> getTasksForNextRequest() {
final List<Task<NodeDataRequest>> toRequest = new ArrayList<>();
while (toRequest.size() < hashCountPerRequest) {
final Task<NodeDataRequest> pendingRequestTask = pendingRequests.dequeue();
if (pendingRequestTask == null) {
break;
}
final NodeDataRequest pendingRequest = pendingRequestTask.getData();
final Optional<BytesValue> existingData = pendingRequest.getExistingData(worldStateStorage);
if (existingData.isPresent()) {
pendingRequest.setData(existingData.get());
queueChildRequests(pendingRequest);
completedRequestsCounter.inc();
pendingRequestTask.markCompleted();
continue;
}
sendingRequests.set(false);
toRequest.add(pendingRequestTask);
}
return toRequest;
}
private boolean shouldRequestNodeData() {
private synchronized boolean shouldRequestNodeData() {
return !future.isDone()
&& outstandingRequests.size() < maxOutstandingRequests
&& !pendingRequests.isEmpty();

Loading…
Cancel
Save