[NC-2195] Commit world state continuously (#809)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
mbaxter 6 years ago committed by GitHub
parent 9f7e74a459
commit c4f0cb4d52
  1. 24
      ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorageTest.java
  2. 8
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java
  3. 88
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java

@ -15,6 +15,7 @@ package tech.pegasys.pantheon.ethereum.storage.keyvalue;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage.Updater;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -151,6 +152,29 @@ public class KeyValueStorageWorldStateStorageTest {
assertThat(storage.getNodeData(Hash.hash(bytes))).contains(bytes); assertThat(storage.getNodeData(Hash.hash(bytes))).contains(bytes);
} }
@Test
public void reconcilesNonConflictingUpdaters() {
BytesValue bytesA = BytesValue.fromHexString("0x12");
BytesValue bytesB = BytesValue.fromHexString("0x1234");
BytesValue bytesC = BytesValue.fromHexString("0x123456");
KeyValueStorageWorldStateStorage storage = emptyStorage();
Updater updaterA = storage.updater();
Updater updaterB = storage.updater();
updaterA.putCode(bytesA);
updaterB.putCode(bytesA);
updaterB.putCode(bytesB);
updaterA.putCode(bytesC);
updaterA.commit();
updaterB.commit();
assertThat(storage.getCode(Hash.hash(bytesA))).contains(bytesA);
assertThat(storage.getCode(Hash.hash(bytesB))).contains(bytesB);
assertThat(storage.getCode(Hash.hash(bytesC))).contains(bytesC);
}
private KeyValueStorageWorldStateStorage emptyStorage() { private KeyValueStorageWorldStateStorage emptyStorage() {
return new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); return new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
} }

@ -21,6 +21,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetNodeDataFromPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.BigQueue; import tech.pegasys.pantheon.services.queue.BigQueue;
@ -51,7 +52,6 @@ public class WorldStateDownloader {
private final EthContext ethContext; private final EthContext ethContext;
private final BigQueue<NodeDataRequest> pendingRequests; private final BigQueue<NodeDataRequest> pendingRequests;
private final WorldStateStorage.Updater worldStateStorageUpdater;
private final int hashCountPerRequest; private final int hashCountPerRequest;
private final int maxOutstandingRequests; private final int maxOutstandingRequests;
private final AtomicInteger outstandingRequests = new AtomicInteger(0); private final AtomicInteger outstandingRequests = new AtomicInteger(0);
@ -74,7 +74,6 @@ public class WorldStateDownloader {
this.hashCountPerRequest = hashCountPerRequest; this.hashCountPerRequest = hashCountPerRequest;
this.maxOutstandingRequests = maxOutstandingRequests; this.maxOutstandingRequests = maxOutstandingRequests;
this.ethTasksTimer = ethTasksTimer; this.ethTasksTimer = ethTasksTimer;
this.worldStateStorageUpdater = worldStateStorage.updater();
} }
public CompletableFuture<Void> run(final BlockHeader header) { public CompletableFuture<Void> run(final BlockHeader header) {
@ -135,7 +134,6 @@ public class WorldStateDownloader {
(res, error) -> { (res, error) -> {
if (outstandingRequests.decrementAndGet() == 0 && pendingRequests.isEmpty()) { if (outstandingRequests.decrementAndGet() == 0 && pendingRequests.isEmpty()) {
// We're done // We're done
worldStateStorageUpdater.commit();
markDone(); markDone();
} else { } else {
// Send out additional requests // Send out additional requests
@ -182,6 +180,7 @@ public class WorldStateDownloader {
.whenComplete( .whenComplete(
(data, err) -> { (data, err) -> {
boolean requestFailed = err != null; boolean requestFailed = err != null;
Updater storageUpdater = worldStateStorage.updater();
for (NodeDataRequest request : requests) { for (NodeDataRequest request : requests) {
BytesValue matchingData = requestFailed ? null : data.get(request.getHash()); BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
if (matchingData == null) { if (matchingData == null) {
@ -189,7 +188,7 @@ public class WorldStateDownloader {
} else { } else {
// Persist request data // Persist request data
request.setData(matchingData); request.setData(matchingData);
request.persist(worldStateStorageUpdater); request.persist(storageUpdater);
// Queue child requests // Queue child requests
request request
@ -198,6 +197,7 @@ public class WorldStateDownloader {
.forEach(pendingRequests::enqueue); .forEach(pendingRequests::enqueue);
} }
} }
storageUpdater.commit();
}); });
} }

@ -30,6 +30,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage; import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage; import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
@ -189,6 +190,11 @@ public class WorldStateDownloaderTest {
assertAccountsMatch(localWorldState, accounts); assertAccountsMatch(localWorldState, accounts);
} }
@Test
public void handlesPartialResponsesFromNetwork() {
downloadAvailableWorldStateFromPeers(5, 100, 10, 10, this::respondPartially);
}
@Test @Test
public void doesNotRequestKnownCodeFromNetwork() { public void doesNotRequestKnownCodeFromNetwork() {
BlockDataGenerator dataGen = new BlockDataGenerator(1); BlockDataGenerator dataGen = new BlockDataGenerator(1);
@ -479,6 +485,16 @@ public class WorldStateDownloaderTest {
final int accountCount, final int accountCount,
final int hashesPerRequest, final int hashesPerRequest,
final int maxOutstandingRequests) { final int maxOutstandingRequests) {
downloadAvailableWorldStateFromPeers(
peerCount, accountCount, hashesPerRequest, maxOutstandingRequests, this::respondFully);
}
private void downloadAvailableWorldStateFromPeers(
final int peerCount,
final int accountCount,
final int hashesPerRequest,
final int maxOutstandingRequests,
final NetworkResponder networkResponder) {
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final int trailingPeerCount = 5; final int trailingPeerCount = 5;
BlockDataGenerator dataGen = new BlockDataGenerator(1); BlockDataGenerator dataGen = new BlockDataGenerator(1);
@ -537,12 +553,15 @@ public class WorldStateDownloaderTest {
CompletableFuture<?> result = downloader.run(header); CompletableFuture<?> result = downloader.run(header);
// Respond to node data requests // Respond to node data requests
Responder responder = // Send one round of full responses, so that we can get multiple requests queued up
Responder fullResponder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
while (!result.isDone()) { for (RespondingEthPeer peer : usefulPeers) {
for (RespondingEthPeer peer : usefulPeers) { peer.respond(fullResponder);
peer.respond(responder); }
} // Respond to remaining queued requests in custom way
if (!result.isDone()) {
networkResponder.respond(usefulPeers, remoteWorldStateArchive, result);
} }
// Check that trailing peers were not queried for data // Check that trailing peers were not queried for data
@ -562,6 +581,57 @@ public class WorldStateDownloaderTest {
} }
} }
private void respondFully(
final List<RespondingEthPeer> peers,
final WorldStateArchive remoteWorldStateArchive,
final CompletableFuture<?> downloaderFuture) {
Responder responder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
while (!downloaderFuture.isDone()) {
for (RespondingEthPeer peer : peers) {
peer.respond(responder);
}
}
}
private void respondPartially(
final List<RespondingEthPeer> peers,
final WorldStateArchive remoteWorldStateArchive,
final CompletableFuture<?> downloaderFuture) {
Responder fullResponder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
Responder partialResponder =
RespondingEthPeer.partialResponder(
mock(Blockchain.class), remoteWorldStateArchive, MainnetProtocolSchedule.create(), .5f);
Responder emptyResponder = RespondingEthPeer.emptyResponder();
// Send a few partial responses
for (int i = 0; i < 5; i++) {
for (RespondingEthPeer peer : peers) {
peer.respond(partialResponder);
}
}
// Downloader should not complete with partial responses
assertThat(downloaderFuture).isNotDone();
// Send a few empty responses
for (int i = 0; i < 3; i++) {
for (RespondingEthPeer peer : peers) {
peer.respond(emptyResponder);
}
}
// Downloader should not complete with empty responses
assertThat(downloaderFuture).isNotDone();
while (!downloaderFuture.isDone()) {
for (RespondingEthPeer peer : peers) {
peer.respond(fullResponder);
}
}
}
private void assertAccountsMatch( private void assertAccountsMatch(
final WorldState worldState, final List<Account> expectedAccounts) { final WorldState worldState, final List<Account> expectedAccounts) {
for (Account expectedAccount : expectedAccounts) { for (Account expectedAccount : expectedAccounts) {
@ -577,4 +647,12 @@ public class WorldStateDownloaderTest {
assertThat(actualStorage).isEqualTo(expectedStorage); assertThat(actualStorage).isEqualTo(expectedStorage);
} }
} }
@FunctionalInterface
private interface NetworkResponder {
void respond(
final List<RespondingEthPeer> peers,
final WorldStateArchive remoteWorldStateArchive,
final CompletableFuture<?> downloaderFuture);
}
} }

Loading…
Cancel
Save