diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index b82b895292..5710cf1a21 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -257,12 +257,7 @@ public class WorldStateDownloaderTest { final Responder responder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); - while (!result.isDone()) { - for (final RespondingEthPeer peer : peers) { - peer.respond(responder); - } - giveOtherThreadsAGo(); - } + respondUntilDone(peers, responder, result); // Check that all expected account data was downloaded final WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); @@ -325,12 +320,7 @@ public class WorldStateDownloaderTest { final Responder responder = RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages); - while (!result.isDone()) { - for (final RespondingEthPeer peer : peers) { - peer.respond(responder); - } - giveOtherThreadsAGo(); - } + respondUntilDone(peers, responder, result); // Check that known code was not requested final List requestedHashes = @@ -498,12 +488,7 @@ public class WorldStateDownloaderTest { final Responder responder = RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages); - while (!result.isDone()) { - for (final RespondingEthPeer peer : peers) { - peer.respond(responder); - } - giveOtherThreadsAGo(); - } + respondUntilDone(peers, responder, result); // Check that unknown trie nodes were requested final List requestedHashes = @@ -600,12 +585,7 @@ public class WorldStateDownloaderTest { final Responder responder = RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages); - while (!result.isDone()) { - for (final RespondingEthPeer peer : peers) { - peer.respond(responder); - } - giveOtherThreadsAGo(); - } + respondUntilDone(peers, responder, result); // World state should be available by the time the result is complete assertThat(localStorage.isWorldStateAvailable(stateRoot)).isTrue(); @@ -734,10 +714,7 @@ public class WorldStateDownloaderTest { RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages); CompletableFuture result = downloader.run(header); - while (!result.isDone()) { - peer.respond(responder); - giveOtherThreadsAGo(); - } + peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone()); assertThat(localStorage.isWorldStateAvailable(stateRoot)).isTrue(); // Check that already enqueued trie nodes were requested @@ -920,12 +897,7 @@ public class WorldStateDownloaderTest { final CompletableFuture downloaderFuture) { final Responder responder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); - while (!downloaderFuture.isDone()) { - for (final RespondingEthPeer peer : peers) { - peer.respond(responder); - } - giveOtherThreadsAGo(); - } + respondUntilDone(peers, responder, downloaderFuture); } private void respondPartially( @@ -961,12 +933,7 @@ public class WorldStateDownloaderTest { // Downloader should not complete with empty responses assertThat(downloaderFuture).isNotDone(); - while (!downloaderFuture.isDone()) { - for (final RespondingEthPeer peer : peers) { - peer.respond(fullResponder); - } - giveOtherThreadsAGo(); - } + respondUntilDone(peers, fullResponder, downloaderFuture); } private void assertAccountsMatch( @@ -1010,6 +977,23 @@ public class WorldStateDownloaderTest { new NoOpMetricsSystem()); } + private void respondUntilDone( + final List peers, + final Responder responder, + final CompletableFuture result) { + if (peers.size() == 1) { + // Use a blocking approach to waiting for the next message when we can. + peers.get(0).respondWhileOtherThreadsWork(responder, () -> !result.isDone()); + return; + } + while (!result.isDone()) { + for (final RespondingEthPeer peer : peers) { + peer.respond(responder); + } + giveOtherThreadsAGo(); + } + } + private void giveOtherThreadsAGo() { LockSupport.parkNanos(200); }