From c4a0d69af97b428167c8522024f78101fd7e891c Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Thu, 21 Feb 2019 05:50:04 +1000 Subject: [PATCH] Fix potential stall in world state download. (#922) There's an issue with the world state downloader where the download process could stall. The sequence goes like: 1. Inside requestDataFromPeer thread A takes out the sendingRequests lock 2. Thread A checks shouldRequestNodeData which returns true 3. Thread A sends a request for data 4. Thread A checks shouldRequestNodeData which returns false so it exits the while loop 5. Thread B receives the response to the (only) outstanding request 6. Thread B enters shouldRequestNodeData but fails to get the sendingRequests lock so exits the method 7. Thread A releases the sendingRequests lock and exits the methods There are now no threads checking if they should send new requests and no outstanding requests to trigger a check in the future so the download is stuck and will never make anymore progress. The fix is to switch the order of taking out the sendingRequests lock and checking shouldRequestNodeData so we release the sendingRequests lock before we go back round the loop to check shouldRequestNodeData. Signed-off-by: Adrian Sutton --- .../sync/worldstate/WorldStateDownloader.java | 106 ++++++++++-------- 1 file changed, 57 insertions(+), 49 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index be59863aa1..e7786c3140 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -160,67 +160,75 @@ public class WorldStateDownloader { } private void requestNodeData(final BlockHeader header) { - if (sendingRequests.compareAndSet(false, true)) { - while (shouldRequestNodeData()) { + while (shouldRequestNodeData()) { + if (sendingRequests.compareAndSet(false, true)) { final Optional maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber()); - if (!maybePeer.isPresent()) { // If no peer is available, wait and try again + sendingRequests.set(false); waitForNewPeer().whenComplete((r, t) -> requestNodeData(header)); break; } else { - final EthPeer peer = maybePeer.get(); + requestDataFromPeer(header, maybePeer.get()); + } + sendingRequests.set(false); + } else { + break; + } + } + } - // Collect data to be requested - final List> toRequest = new ArrayList<>(); - while (toRequest.size() < hashCountPerRequest) { - final Task pendingRequestTask = pendingRequests.dequeue(); - if (pendingRequestTask == null) { - break; - } - final NodeDataRequest pendingRequest = pendingRequestTask.getData(); - final Optional existingData = - pendingRequest.getExistingData(worldStateStorage); - if (existingData.isPresent()) { - pendingRequest.setData(existingData.get()); - queueChildRequests(pendingRequest); - completedRequestsCounter.inc(); - pendingRequestTask.markCompleted(); - continue; - } - toRequest.add(pendingRequestTask); - } + private void requestDataFromPeer(final BlockHeader header, final EthPeer peer) { + // Collect data to be requested + final List> toRequest = getTasksForNextRequest(); - // Request and process node data - sendAndProcessRequests(peer, toRequest, header) - .whenComplete( - (task, error) -> { - final boolean done; - synchronized (this) { - outstandingRequests.remove(task); - done = - status == Status.RUNNING - && outstandingRequests.size() == 0 - && pendingRequests.allTasksCompleted(); - } - if (done) { - // We're done - final Updater updater = worldStateStorage.updater(); - updater.putAccountStateTrieNode(header.getStateRoot(), rootNode); - updater.commit(); - markDone(); - } else { - // Send out additional requests - requestNodeData(header); - } - }); - } + // Request and process node data + sendAndProcessRequests(peer, toRequest, header) + .whenComplete( + (task, error) -> { + final boolean done; + synchronized (this) { + outstandingRequests.remove(task); + done = + status == Status.RUNNING + && outstandingRequests.size() == 0 + && pendingRequests.allTasksCompleted(); + } + if (done) { + // We're done + final Updater updater = worldStateStorage.updater(); + updater.putAccountStateTrieNode(header.getStateRoot(), rootNode); + updater.commit(); + markDone(); + } else { + // Send out additional requests + requestNodeData(header); + } + }); + } + + private List> getTasksForNextRequest() { + final List> toRequest = new ArrayList<>(); + while (toRequest.size() < hashCountPerRequest) { + final Task pendingRequestTask = pendingRequests.dequeue(); + if (pendingRequestTask == null) { + break; + } + final NodeDataRequest pendingRequest = pendingRequestTask.getData(); + final Optional existingData = pendingRequest.getExistingData(worldStateStorage); + if (existingData.isPresent()) { + pendingRequest.setData(existingData.get()); + queueChildRequests(pendingRequest); + completedRequestsCounter.inc(); + pendingRequestTask.markCompleted(); + continue; } - sendingRequests.set(false); + toRequest.add(pendingRequestTask); } + return toRequest; } - private boolean shouldRequestNodeData() { + private synchronized boolean shouldRequestNodeData() { return !future.isDone() && outstandingRequests.size() < maxOutstandingRequests && !pendingRequests.isEmpty();