Update WorldStateDownloader run() interface to accept header (#677)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
mbaxter 6 years ago committed by GitHub
parent ddefecbd7e
commit 38eb6f2f53
  1. 30
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java
  2. 15
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java

@ -46,8 +46,6 @@ public class WorldStateDownloader {
} }
private final EthContext ethContext; private final EthContext ethContext;
// The target header for which we want to retrieve world state
private final BlockHeader header;
private final BigQueue<NodeDataRequest> pendingRequests; private final BigQueue<NodeDataRequest> pendingRequests;
private final WorldStateStorage.Updater worldStateStorageUpdater; private final WorldStateStorage.Updater worldStateStorageUpdater;
private final int hashCountPerRequest; private final int hashCountPerRequest;
@ -62,30 +60,20 @@ public class WorldStateDownloader {
public WorldStateDownloader( public WorldStateDownloader(
final EthContext ethContext, final EthContext ethContext,
final WorldStateStorage worldStateStorage, final WorldStateStorage worldStateStorage,
final BlockHeader header,
final BigQueue<NodeDataRequest> pendingRequests, final BigQueue<NodeDataRequest> pendingRequests,
final int hashCountPerRequest, final int hashCountPerRequest,
final int maxOutstandingRequests, final int maxOutstandingRequests,
final LabelledMetric<OperationTimer> ethTasksTimer) { final LabelledMetric<OperationTimer> ethTasksTimer) {
this.ethContext = ethContext; this.ethContext = ethContext;
this.worldStateStorage = worldStateStorage; this.worldStateStorage = worldStateStorage;
this.header = header;
this.pendingRequests = pendingRequests; this.pendingRequests = pendingRequests;
this.hashCountPerRequest = hashCountPerRequest; this.hashCountPerRequest = hashCountPerRequest;
this.maxOutstandingRequests = maxOutstandingRequests; this.maxOutstandingRequests = maxOutstandingRequests;
this.ethTasksTimer = ethTasksTimer; this.ethTasksTimer = ethTasksTimer;
this.worldStateStorageUpdater = worldStateStorage.updater(); this.worldStateStorageUpdater = worldStateStorage.updater();
Hash stateRoot = header.getStateRoot();
if (stateRoot.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
// If we're requesting data for an empty world state, we're already done
markDone();
} else {
pendingRequests.enqueue(NodeDataRequest.createAccountDataRequest(header.getStateRoot()));
}
} }
public CompletableFuture<Void> run() { public CompletableFuture<Void> run(final BlockHeader header) {
synchronized (this) { synchronized (this) {
if (status == Status.DONE || status == Status.RUNNING) { if (status == Status.DONE || status == Status.RUNNING) {
return future; return future;
@ -94,18 +82,26 @@ public class WorldStateDownloader {
future = new CompletableFuture<>(); future = new CompletableFuture<>();
} }
requestNodeData(); Hash stateRoot = header.getStateRoot();
if (stateRoot.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
// If we're requesting data for an empty world state, we're already done
markDone();
} else {
pendingRequests.enqueue(NodeDataRequest.createAccountDataRequest(stateRoot));
requestNodeData(header);
}
return future; return future;
} }
private void requestNodeData() { private void requestNodeData(final BlockHeader header) {
if (sendingRequests.compareAndSet(false, true)) { if (sendingRequests.compareAndSet(false, true)) {
while (shouldRequestNodeData()) { while (shouldRequestNodeData()) {
Optional<EthPeer> maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber()); Optional<EthPeer> maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber());
if (!maybePeer.isPresent()) { if (!maybePeer.isPresent()) {
// If no peer is available, wait and try again // If no peer is available, wait and try again
waitForNewPeer().whenComplete((r, t) -> requestNodeData()); waitForNewPeer().whenComplete((r, t) -> requestNodeData(header));
break; break;
} else { } else {
EthPeer peer = maybePeer.get(); EthPeer peer = maybePeer.get();
@ -131,7 +127,7 @@ public class WorldStateDownloader {
markDone(); markDone();
} else { } else {
// Send out additional requests // Send out additional requests
requestNodeData(); requestNodeData(header);
} }
}); });
} }

@ -85,7 +85,7 @@ public class WorldStateDownloaderTest {
public void downloadEmptyWorldState() { public void downloadEmptyWorldState() {
BlockDataGenerator dataGen = new BlockDataGenerator(1); BlockDataGenerator dataGen = new BlockDataGenerator(1);
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
BlockHeader header = final BlockHeader header =
dataGen dataGen
.block(BlockOptions.create().setStateRoot(EMPTY_TRIE_ROOT).setBlockNumber(10)) .block(BlockOptions.create().setStateRoot(EMPTY_TRIE_ROOT).setBlockNumber(10))
.getHeader(); .getHeader();
@ -104,13 +104,12 @@ public class WorldStateDownloaderTest {
new WorldStateDownloader( new WorldStateDownloader(
ethProtocolManager.ethContext(), ethProtocolManager.ethContext(),
localStorage, localStorage,
header,
queue, queue,
10, 10,
10, 10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER); NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
CompletableFuture<Void> future = downloader.run(); CompletableFuture<Void> future = downloader.run(header);
assertThat(future).isDone(); assertThat(future).isDone();
// Peers should not have been queried // Peers should not have been queried
@ -135,7 +134,7 @@ public class WorldStateDownloaderTest {
final List<Account> accounts = dataGen.createRandomAccounts(remoteWorldState, 20); final List<Account> accounts = dataGen.createRandomAccounts(remoteWorldState, 20);
final Hash stateRoot = remoteWorldState.rootHash(); final Hash stateRoot = remoteWorldState.rootHash();
assertThat(stateRoot).isNotEqualTo(EMPTY_TRIE_ROOT); // Sanity check assertThat(stateRoot).isNotEqualTo(EMPTY_TRIE_ROOT); // Sanity check
BlockHeader header = final BlockHeader header =
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();
// Create some peers // Create some peers
@ -152,13 +151,12 @@ public class WorldStateDownloaderTest {
new WorldStateDownloader( new WorldStateDownloader(
ethProtocolManager.ethContext(), ethProtocolManager.ethContext(),
localStorage, localStorage,
header,
queue, queue,
10, 10,
10, 10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER); NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
CompletableFuture<Void> result = downloader.run(); CompletableFuture<Void> result = downloader.run(header);
// Respond to node data requests // Respond to node data requests
Responder responder = Responder responder =
@ -195,7 +193,7 @@ public class WorldStateDownloaderTest {
final List<Account> accounts = dataGen.createRandomAccounts(remoteWorldState, accountCount); final List<Account> accounts = dataGen.createRandomAccounts(remoteWorldState, accountCount);
final Hash stateRoot = remoteWorldState.rootHash(); final Hash stateRoot = remoteWorldState.rootHash();
assertThat(stateRoot).isNotEqualTo(EMPTY_TRIE_ROOT); // Sanity check assertThat(stateRoot).isNotEqualTo(EMPTY_TRIE_ROOT); // Sanity check
BlockHeader header = final BlockHeader header =
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();
// Generate more data that should not be downloaded // Generate more data that should not be downloaded
@ -215,7 +213,6 @@ public class WorldStateDownloaderTest {
new WorldStateDownloader( new WorldStateDownloader(
ethProtocolManager.ethContext(), ethProtocolManager.ethContext(),
localStorage, localStorage,
header,
queue, queue,
hashesPerRequest, hashesPerRequest,
maxOutstandingRequests, maxOutstandingRequests,
@ -237,7 +234,7 @@ public class WorldStateDownloaderTest {
.collect(Collectors.toList()); .collect(Collectors.toList());
// Start downloader // Start downloader
CompletableFuture<?> result = downloader.run(); CompletableFuture<?> result = downloader.run(header);
// Respond to node data requests // Respond to node data requests
Responder responder = Responder responder =

Loading…
Cancel
Save