[NC-1344] Create a simple WorldStateDownloader (#657)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
mbaxter 6 years ago committed by GitHub
parent fcf619ce54
commit 055785b7e3
  1. 54
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorage.java
  2. 70
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/DefaultMutableWorldState.java
  3. 95
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/StateTrieAccountValue.java
  4. 6
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/WorldStateArchive.java
  5. 20
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/WorldStateStorage.java
  6. 29
      ethereum/core/src/test-support/java/tech/pegasys/pantheon/ethereum/core/BlockDataGenerator.java
  7. 157
      ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorageTest.java
  8. 22
      ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/DefaultMutableWorldStateTest.java
  9. 2
      ethereum/eth/build.gradle
  10. 61
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java
  11. 39
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java
  12. 69
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java
  13. 46
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java
  14. 71
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java
  15. 201
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java
  16. 16
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java
  17. 10
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  18. 2
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/BlockchainSetupUtil.java
  19. 283
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java
  20. 14
      ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/BranchNode.java
  21. 16
      ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/ExtensionNode.java
  22. 13
      ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/LeafNode.java
  23. 4
      ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerklePatriciaTrie.java
  24. 6
      ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerkleTrieException.java
  25. 16
      ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/Node.java
  26. 17
      ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/NullNode.java
  27. 6
      ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredMerklePatriciaTrie.java
  28. 28
      ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNode.java
  29. 27
      ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNodeFactory.java
  30. 36
      ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoder.java
  31. 38
      services/queue/build.gradle
  32. 33
      services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BigQueue.java
  33. 40
      services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryBigQueue.java
  34. 16
      services/queue/src/main/resources/log4j2.xml
  35. 1
      settings.gradle

@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.storage.keyvalue;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage;
import tech.pegasys.pantheon.util.bytes.Bytes32;
@ -29,23 +30,41 @@ public class KeyValueStorageWorldStateStorage implements WorldStateStorage {
}
@Override
public Optional<BytesValue> getCode(final Hash codeHash) {
return keyValueStorage.get(codeHash);
public Optional<BytesValue> getCode(final Bytes32 codeHash) {
if (codeHash.equals(Hash.EMPTY)) {
return Optional.of(BytesValue.EMPTY);
} else {
return keyValueStorage.get(codeHash);
}
}
@Override
public Optional<BytesValue> getAccountStateTrieNode(final Bytes32 nodeHash) {
return keyValueStorage.get(nodeHash);
return getTrieNode(nodeHash);
}
@Override
public Optional<BytesValue> getAccountStorageTrieNode(final Bytes32 nodeHash) {
return keyValueStorage.get(nodeHash);
return getTrieNode(nodeHash);
}
private Optional<BytesValue> getTrieNode(final Bytes32 nodeHash) {
if (nodeHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
return Optional.of(MerklePatriciaTrie.EMPTY_TRIE_NODE);
} else {
return keyValueStorage.get(nodeHash);
}
}
@Override
public Optional<BytesValue> getNodeData(final Hash hash) {
return keyValueStorage.get(hash);
public Optional<BytesValue> getNodeData(final Bytes32 hash) {
if (hash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
return Optional.of(MerklePatriciaTrie.EMPTY_TRIE_NODE);
} else if (hash.equals(Hash.EMPTY)) {
return Optional.of(BytesValue.EMPTY);
} else {
return keyValueStorage.get(hash);
}
}
@Override
@ -62,18 +81,33 @@ public class KeyValueStorageWorldStateStorage implements WorldStateStorage {
}
@Override
public void putCode(final BytesValue code) {
transaction.put(Hash.hash(code), code);
public Updater putCode(final Bytes32 codeHash, final BytesValue code) {
if (code.size() == 0) {
// Don't save empty values
return this;
}
transaction.put(codeHash, code);
return this;
}
@Override
public void putAccountStateTrieNode(final Bytes32 nodeHash, final BytesValue node) {
public Updater putAccountStateTrieNode(final Bytes32 nodeHash, final BytesValue node) {
if (nodeHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
// Don't save empty nodes
return this;
}
transaction.put(nodeHash, node);
return this;
}
@Override
public void putAccountStorageTrieNode(final Bytes32 nodeHash, final BytesValue node) {
public Updater putAccountStorageTrieNode(final Bytes32 nodeHash, final BytesValue node) {
if (nodeHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
// Don't save empty nodes
return this;
}
transaction.put(nodeHash, node);
return this;
}
@Override

@ -49,7 +49,7 @@ public class DefaultMutableWorldState implements MutableWorldState {
private final WorldStateStorage worldStateStorage;
public DefaultMutableWorldState(final WorldStateStorage storage) {
this(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, storage);
this(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, storage);
}
public DefaultMutableWorldState(
@ -103,31 +103,15 @@ public class DefaultMutableWorldState implements MutableWorldState {
private AccountState deserializeAccount(
final Address address, final Hash addressHash, final BytesValue encoded) throws RLPException {
final RLPInput in = RLP.input(encoded);
in.enterList();
final long nonce = in.readLongScalar();
final Wei balance = in.readUInt256Scalar(Wei::wrap);
final Hash storageRoot = Hash.wrap(in.readBytes32());
final Hash codeHash = Hash.wrap(in.readBytes32());
in.leaveList();
return new AccountState(address, addressHash, nonce, balance, storageRoot, codeHash);
StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(in);
return new AccountState(address, addressHash, accountValue);
}
private static BytesValue serializeAccount(
final long nonce, final Wei balance, final Hash codeHash, final Hash storageRoot) {
return RLP.encode(
out -> {
out.startList();
out.writeLongScalar(nonce);
out.writeUInt256Scalar(balance);
out.writeBytesValue(storageRoot);
out.writeBytesValue(codeHash);
out.endList();
});
final long nonce, final Wei balance, final Hash storageRoot, final Hash codeHash) {
StateTrieAccountValue accountValue =
new StateTrieAccountValue(nonce, balance, storageRoot, codeHash);
return RLP.encode(accountValue::writeTo);
}
@Override
@ -187,28 +171,17 @@ public class DefaultMutableWorldState implements MutableWorldState {
private final Address address;
private final Hash addressHash;
private final long nonce;
private final Wei balance;
private final Hash storageRoot;
private final Hash codeHash;
final StateTrieAccountValue accountValue;
// Lazily initialized since we don't always access storage.
private volatile MerklePatriciaTrie<Bytes32, BytesValue> storageTrie;
private AccountState(
final Address address,
final Hash addressHash,
final long nonce,
final Wei balance,
final Hash storageRoot,
final Hash codeHash) {
final Address address, final Hash addressHash, final StateTrieAccountValue accountValue) {
this.address = address;
this.addressHash = addressHash;
this.nonce = nonce;
this.balance = balance;
this.storageRoot = storageRoot;
this.codeHash = codeHash;
this.accountValue = accountValue;
}
private MerklePatriciaTrie<Bytes32, BytesValue> storageTrie() {
@ -217,7 +190,7 @@ public class DefaultMutableWorldState implements MutableWorldState {
storageTrie = updatedTrie;
}
if (storageTrie == null) {
storageTrie = newAccountStorageTrie(storageRoot);
storageTrie = newAccountStorageTrie(getStorageRoot());
}
return storageTrie;
}
@ -234,12 +207,16 @@ public class DefaultMutableWorldState implements MutableWorldState {
@Override
public long getNonce() {
return nonce;
return accountValue.getNonce();
}
@Override
public Wei getBalance() {
return balance;
return accountValue.getBalance();
}
Hash getStorageRoot() {
return accountValue.getStorageRoot();
}
@Override
@ -249,6 +226,7 @@ public class DefaultMutableWorldState implements MutableWorldState {
return updatedCode;
}
// No code is common, save the KV-store lookup.
Hash codeHash = getCodeHash();
if (codeHash.equals(Hash.EMPTY)) {
return BytesValue.EMPTY;
}
@ -262,7 +240,7 @@ public class DefaultMutableWorldState implements MutableWorldState {
@Override
public Hash getCodeHash() {
return codeHash;
return accountValue.getCodeHash();
}
@Override
@ -303,8 +281,8 @@ public class DefaultMutableWorldState implements MutableWorldState {
builder.append("address=").append(getAddress()).append(", ");
builder.append("nonce=").append(getNonce()).append(", ");
builder.append("balance=").append(getBalance()).append(", ");
builder.append("storageRoot=").append(storageRoot).append(", ");
builder.append("codeHash=").append(codeHash);
builder.append("storageRoot=").append(getStorageRoot()).append(", ");
builder.append("codeHash=").append(getCodeHash());
return builder.append("}").toString();
}
}
@ -353,14 +331,14 @@ public class DefaultMutableWorldState implements MutableWorldState {
final AccountState origin = updated.getWrappedAccount();
// Save the code in key-value storage ...
Hash codeHash = origin == null ? Hash.EMPTY : origin.codeHash;
Hash codeHash = origin == null ? Hash.EMPTY : origin.getCodeHash();
if (updated.codeWasUpdated()) {
codeHash = Hash.hash(updated.getCode());
wrapped.updatedAccountCode.put(updated.getAddress(), updated.getCode());
}
// ...and storage in the account trie first.
final boolean freshState = origin == null || updated.getStorageWasCleared();
Hash storageRoot = freshState ? Hash.EMPTY_TRIE_HASH : origin.storageRoot;
Hash storageRoot = freshState ? Hash.EMPTY_TRIE_HASH : origin.getStorageRoot();
if (freshState) {
wrapped.updatedStorageTries.remove(updated.getAddress());
}
@ -386,7 +364,7 @@ public class DefaultMutableWorldState implements MutableWorldState {
// Lastly, save the new account.
final BytesValue account =
serializeAccount(updated.getNonce(), updated.getBalance(), codeHash, storageRoot);
serializeAccount(updated.getNonce(), updated.getBalance(), storageRoot, codeHash);
wrapped.accountStateTrie.put(updated.getAddressHash(), account);
}

@ -0,0 +1,95 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.worldstate;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.rlp.RLPInput;
import tech.pegasys.pantheon.ethereum.rlp.RLPOutput;
/** Represents the raw values associated with an account in the world state trie. */
public class StateTrieAccountValue {
private final long nonce;
private final Wei balance;
private final Hash storageRoot;
private final Hash codeHash;
public StateTrieAccountValue(
final long nonce, final Wei balance, final Hash storageRoot, final Hash codeHash) {
this.nonce = nonce;
this.balance = balance;
this.storageRoot = storageRoot;
this.codeHash = codeHash;
}
/**
* The account nonce, that is the number of transactions sent from that account.
*
* @return the account nonce.
*/
public long getNonce() {
return nonce;
}
/**
* The available balance of that account.
*
* @return the balance, in Wei, of the account.
*/
public Wei getBalance() {
return balance;
}
/**
* The hash of the root of the storage trie associated with this account.
*
* @return the hash of the root node of the storage trie.
*/
public Hash getStorageRoot() {
return storageRoot;
}
/**
* The hash of the EVM bytecode associated with this account.
*
* @return the hash of the account code (which may be {@link Hash#EMPTY}.
*/
public Hash getCodeHash() {
return codeHash;
}
public void writeTo(final RLPOutput out) {
out.startList();
out.writeLongScalar(nonce);
out.writeUInt256Scalar(balance);
out.writeBytesValue(storageRoot);
out.writeBytesValue(codeHash);
out.endList();
}
public static StateTrieAccountValue readFrom(final RLPInput in) {
in.enterList();
final long nonce = in.readLongScalar();
final Wei balance = in.readUInt256Scalar(Wei::wrap);
final Hash storageRoot = Hash.wrap(in.readBytes32());
final Hash codeHash = Hash.wrap(in.readBytes32());
in.leaveList();
return new StateTrieAccountValue(nonce, balance, storageRoot, codeHash);
}
}

@ -22,7 +22,7 @@ import java.util.Optional;
public class WorldStateArchive {
private final WorldStateStorage storage;
private static final Hash EMPTY_ROOT_HASH = Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH);
private static final Hash EMPTY_ROOT_HASH = Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH);
public WorldStateArchive(final WorldStateStorage storage) {
this.storage = storage;
@ -47,4 +47,8 @@ public class WorldStateArchive {
public Optional<BytesValue> getNodeData(final Hash hash) {
return storage.getNodeData(hash);
}
public WorldStateStorage getStorage() {
return storage;
}
}

@ -20,23 +20,33 @@ import java.util.Optional;
public interface WorldStateStorage {
Optional<BytesValue> getCode(Hash codeHash);
Optional<BytesValue> getCode(Bytes32 codeHash);
Optional<BytesValue> getAccountStateTrieNode(Bytes32 nodeHash);
Optional<BytesValue> getAccountStorageTrieNode(Bytes32 nodeHash);
Optional<BytesValue> getNodeData(Hash hash);
Optional<BytesValue> getNodeData(Bytes32 hash);
default boolean contains(final Bytes32 hash) {
return getNodeData(hash).isPresent();
}
Updater updater();
interface Updater {
void putCode(BytesValue code);
Updater putCode(Bytes32 nodeHash, BytesValue code);
default Updater putCode(final BytesValue code) {
// Skip the hash calculation for empty code
Hash codeHash = code.size() == 0 ? Hash.EMPTY : Hash.hash(code);
return putCode(codeHash, code);
}
void putAccountStateTrieNode(Bytes32 nodeHash, BytesValue node);
Updater putAccountStateTrieNode(Bytes32 nodeHash, BytesValue node);
void putAccountStorageTrieNode(Bytes32 nodeHash, BytesValue node);
Updater putAccountStorageTrieNode(Bytes32 nodeHash, BytesValue node);
void commit();

@ -58,7 +58,7 @@ public class BlockDataGenerator {
final List<Block> seq = new ArrayList<>(count);
final MutableWorldState worldState =
worldStateArchive.getMutable(Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH));
worldStateArchive.getMutable(Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH));
long nextBlockNumber = nextBlock;
Hash parentHash = parent;
@ -95,6 +95,33 @@ public class BlockDataGenerator {
return seq;
}
public List<Account> createRandomAccounts(final MutableWorldState worldState, final int count) {
WorldUpdater updater = worldState.updater();
List<Account> accounts = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
MutableAccount account = updater.getOrCreate(address());
// Make some accounts contract accounts
if (random.nextFloat() < .5) {
// Subset of random accounts are contract accounts
account.setCode(bytesValue(5, 50));
if (random.nextFloat() < .75) {
// Add some storage for most contract accounts
int storageValues = random.nextInt(20) + 10;
for (int j = 0; j < storageValues; j++) {
account.setStorageValue(uint256(), uint256());
}
}
}
account.setNonce(random.nextInt(10));
account.setBalance(Wei.of(positiveLong()));
accounts.add(account);
}
updater.commit();
worldState.persist();
return accounts;
}
public List<Block> blockSequence(final int count) {
final WorldStateArchive worldState = createInMemoryWorldStateArchive();
return blockSequence(count, worldState, Collections.emptyList(), Collections.emptyList());

@ -0,0 +1,157 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.storage.keyvalue;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import org.junit.Test;
public class KeyValueStorageWorldStateStorageTest {
@Test
public void getCode_returnsEmpty() {
KeyValueStorageWorldStateStorage storage = emptyStorage();
assertThat(storage.getCode(Hash.EMPTY)).contains(BytesValue.EMPTY);
}
@Test
public void getAccountStateTrieNode_returnsEmptyNode() {
KeyValueStorageWorldStateStorage storage = emptyStorage();
assertThat(storage.getAccountStateTrieNode(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH))
.contains(MerklePatriciaTrie.EMPTY_TRIE_NODE);
}
@Test
public void getAccountStorageTrieNode_returnsEmptyNode() {
KeyValueStorageWorldStateStorage storage = emptyStorage();
assertThat(storage.getAccountStorageTrieNode(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH))
.contains(MerklePatriciaTrie.EMPTY_TRIE_NODE);
}
@Test
public void getNodeData_returnsEmptyValue() {
KeyValueStorageWorldStateStorage storage = emptyStorage();
assertThat(storage.getNodeData(Hash.EMPTY)).contains(BytesValue.EMPTY);
}
@Test
public void getNodeData_returnsEmptyNode() {
KeyValueStorageWorldStateStorage storage = emptyStorage();
assertThat(storage.getNodeData(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH))
.contains(MerklePatriciaTrie.EMPTY_TRIE_NODE);
}
@Test
public void getCode_saveAndGetSpecialValues() {
KeyValueStorageWorldStateStorage storage = emptyStorage();
storage
.updater()
.putCode(MerklePatriciaTrie.EMPTY_TRIE_NODE)
.putCode(BytesValue.EMPTY)
.commit();
assertThat(storage.getCode(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH))
.contains(MerklePatriciaTrie.EMPTY_TRIE_NODE);
assertThat(storage.getCode(Hash.EMPTY)).contains(BytesValue.EMPTY);
}
@Test
public void getCode_saveAndGetRegularValue() {
BytesValue bytes = BytesValue.fromHexString("0x123456");
KeyValueStorageWorldStateStorage storage = emptyStorage();
storage.updater().putCode(bytes).commit();
assertThat(storage.getCode(Hash.hash(bytes))).contains(bytes);
}
@Test
public void getAccountStateTrieNode_saveAndGetSpecialValues() {
KeyValueStorageWorldStateStorage storage = emptyStorage();
storage
.updater()
.putAccountStateTrieNode(
Hash.hash(MerklePatriciaTrie.EMPTY_TRIE_NODE), MerklePatriciaTrie.EMPTY_TRIE_NODE)
.putAccountStateTrieNode(Hash.hash(BytesValue.EMPTY), BytesValue.EMPTY)
.commit();
assertThat(storage.getAccountStateTrieNode(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH))
.contains(MerklePatriciaTrie.EMPTY_TRIE_NODE);
assertThat(storage.getAccountStateTrieNode(Hash.EMPTY)).contains(BytesValue.EMPTY);
}
@Test
public void getAccountStateTrieNode_saveAndGetRegularValue() {
BytesValue bytes = BytesValue.fromHexString("0x123456");
KeyValueStorageWorldStateStorage storage = emptyStorage();
storage.updater().putAccountStateTrieNode(Hash.hash(bytes), bytes).commit();
assertThat(storage.getAccountStateTrieNode(Hash.hash(bytes))).contains(bytes);
}
@Test
public void getAccountStorageTrieNode_saveAndGetSpecialValues() {
KeyValueStorageWorldStateStorage storage = emptyStorage();
storage
.updater()
.putAccountStorageTrieNode(
Hash.hash(MerklePatriciaTrie.EMPTY_TRIE_NODE), MerklePatriciaTrie.EMPTY_TRIE_NODE)
.putAccountStorageTrieNode(Hash.hash(BytesValue.EMPTY), BytesValue.EMPTY)
.commit();
assertThat(storage.getAccountStorageTrieNode(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH))
.contains(MerklePatriciaTrie.EMPTY_TRIE_NODE);
assertThat(storage.getAccountStorageTrieNode(Hash.EMPTY)).contains(BytesValue.EMPTY);
}
@Test
public void getAccountStorageTrieNode_saveAndGetRegularValue() {
BytesValue bytes = BytesValue.fromHexString("0x123456");
KeyValueStorageWorldStateStorage storage = emptyStorage();
storage.updater().putAccountStorageTrieNode(Hash.hash(bytes), bytes).commit();
assertThat(storage.getAccountStateTrieNode(Hash.hash(bytes))).contains(bytes);
}
@Test
public void getNodeData_saveAndGetSpecialValues() {
KeyValueStorageWorldStateStorage storage = emptyStorage();
storage
.updater()
.putAccountStorageTrieNode(
Hash.hash(MerklePatriciaTrie.EMPTY_TRIE_NODE), MerklePatriciaTrie.EMPTY_TRIE_NODE)
.putAccountStorageTrieNode(Hash.hash(BytesValue.EMPTY), BytesValue.EMPTY)
.commit();
assertThat(storage.getNodeData(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH))
.contains(MerklePatriciaTrie.EMPTY_TRIE_NODE);
assertThat(storage.getNodeData(Hash.EMPTY)).contains(BytesValue.EMPTY);
}
@Test
public void getNodeData_saveAndGetRegularValue() {
BytesValue bytes = BytesValue.fromHexString("0x123456");
KeyValueStorageWorldStateStorage storage = emptyStorage();
storage.updater().putAccountStorageTrieNode(Hash.hash(bytes), bytes).commit();
assertThat(storage.getNodeData(Hash.hash(bytes))).contains(bytes);
}
private KeyValueStorageWorldStateStorage emptyStorage() {
return new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
}
}

@ -58,10 +58,10 @@ public class DefaultMutableWorldStateTest {
@Test
public void rootHash_Empty() {
final MutableWorldState worldState = createEmpty();
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash());
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash());
worldState.persist();
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash());
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash());
}
@Test
@ -88,10 +88,10 @@ public class DefaultMutableWorldStateTest {
final WorldUpdater updater = worldState.updater();
updater.deleteAccount(ADDRESS);
updater.commit();
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash());
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash());
worldState.persist();
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash());
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash());
}
@Test
@ -101,10 +101,10 @@ public class DefaultMutableWorldStateTest {
updater.createAccount(ADDRESS).setBalance(Wei.of(100000));
updater.deleteAccount(ADDRESS);
updater.commit();
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash());
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash());
worldState.persist();
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash());
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash());
}
@Test
@ -115,7 +115,7 @@ public class DefaultMutableWorldStateTest {
updater.createAccount(ADDRESS).setBalance(Wei.of(100000));
updater.commit();
assertNotNull(worldState.get(ADDRESS));
assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash());
assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash());
// Delete account
updater = worldState.updater();
@ -125,7 +125,7 @@ public class DefaultMutableWorldStateTest {
updater.commit();
assertNull(updater.get(ADDRESS));
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash());
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash());
}
@Test
@ -137,7 +137,7 @@ public class DefaultMutableWorldStateTest {
updater.commit();
worldState.persist();
assertNotNull(worldState.get(ADDRESS));
assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash());
assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash());
// Delete account
updater = worldState.updater();
@ -151,7 +151,7 @@ public class DefaultMutableWorldStateTest {
worldState.persist();
assertNull(updater.get(ADDRESS));
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash());
assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash());
}
@Test
@ -377,7 +377,7 @@ public class DefaultMutableWorldStateTest {
updater.commit();
worldState.persist();
assertNotNull(worldState.get(ADDRESS));
assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash());
assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash());
// Clear storage
account = updater.getMutable(ADDRESS);

@ -29,9 +29,11 @@ dependencies {
implementation project(':ethereum:core')
implementation project(':ethereum:p2p')
implementation project(':ethereum:rlp')
implementation project(':ethereum:trie')
implementation project(':ethereum:permissioning')
implementation project(':metrics')
implementation project(':services:kvstore')
implementation project(':services:queue')
implementation 'io.vertx:vertx-core'
implementation 'com.google.guava:guava'

@ -0,0 +1,61 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import static com.google.common.base.Preconditions.checkNotNull;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.StateTrieAccountValue;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.ArrayList;
import java.util.List;
class AccountTrieNodeDataRequest extends TrieNodeDataRequest {
AccountTrieNodeDataRequest(final Hash hash) {
super(Kind.ACCOUNT_TRIE_NODE, hash);
}
@Override
public void persist(final Updater updater) {
checkNotNull(getData(), "Must set data before node can be persisted.");
updater.putAccountStateTrieNode(getHash(), getData());
}
@Override
protected NodeDataRequest createChildNodeDataRequest(final Hash childHash) {
return NodeDataRequest.createAccountDataRequest(childHash);
}
@Override
protected List<NodeDataRequest> getRequestsFromTrieNodeValue(final BytesValue value) {
List<NodeDataRequest> nodeData = new ArrayList<>(2);
StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(RLP.input(value));
// Add code, if appropriate
if (!accountValue.getCodeHash().equals(Hash.EMPTY)) {
nodeData.add(NodeDataRequest.createCodeRequest(accountValue.getCodeHash()));
}
// Add storage, if appropriate
if (!accountValue.getStorageRoot().equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
// If storage is non-empty queue download
NodeDataRequest storageNode =
NodeDataRequest.createStorageDataRequest(accountValue.getStorageRoot());
nodeData.add(storageNode);
}
return nodeData;
}
}

@ -0,0 +1,39 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import static com.google.common.base.Preconditions.checkNotNull;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import java.util.stream.Stream;
class CodeNodeDataRequest extends NodeDataRequest {
CodeNodeDataRequest(final Hash hash) {
super(Kind.CODE, hash);
}
@Override
public void persist(final Updater updater) {
checkNotNull(getData(), "Must set data before node can be persisted.");
updater.putCode(getHash(), getData());
}
@Override
public Stream<NodeDataRequest> getChildRequests() {
// Code nodes have nothing further to download
return Stream.empty();
}
}

@ -0,0 +1,69 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.stream.Stream;
abstract class NodeDataRequest {
public enum Kind {
ACCOUNT_TRIE_NODE,
STORAGE_TRIE_NODE,
CODE
}
private final Kind kind;
private final Hash hash;
private BytesValue data;
protected NodeDataRequest(final Kind kind, final Hash hash) {
this.kind = kind;
this.hash = hash;
}
public static AccountTrieNodeDataRequest createAccountDataRequest(final Hash hash) {
return new AccountTrieNodeDataRequest(hash);
}
public static StorageTrieNodeDataRequest createStorageDataRequest(final Hash hash) {
return new StorageTrieNodeDataRequest(hash);
}
public static CodeNodeDataRequest createCodeRequest(final Hash hash) {
return new CodeNodeDataRequest(hash);
}
public Kind getKind() {
return kind;
}
public Hash getHash() {
return hash;
}
public BytesValue getData() {
return data;
}
public NodeDataRequest setData(final BytesValue data) {
this.data = data;
return this;
}
public abstract void persist(final WorldStateStorage.Updater updater);
public abstract Stream<NodeDataRequest> getChildRequests();
}

@ -0,0 +1,46 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import static com.google.common.base.Preconditions.checkNotNull;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Collections;
import java.util.List;
class StorageTrieNodeDataRequest extends TrieNodeDataRequest {
StorageTrieNodeDataRequest(final Hash hash) {
super(Kind.STORAGE_TRIE_NODE, hash);
}
@Override
public void persist(final Updater updater) {
checkNotNull(getData(), "Must set data before node can be persisted.");
updater.putAccountStorageTrieNode(getHash(), getData());
}
@Override
protected NodeDataRequest createChildNodeDataRequest(final Hash childHash) {
return NodeDataRequest.createStorageDataRequest(childHash);
}
@Override
protected List<NodeDataRequest> getRequestsFromTrieNodeValue(final BytesValue value) {
// Nothing to do for terminal storage node
return Collections.emptyList();
}
}

@ -0,0 +1,71 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.trie.Node;
import tech.pegasys.pantheon.ethereum.trie.TrieNodeDecoder;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.List;
import java.util.stream.Stream;
abstract class TrieNodeDataRequest extends NodeDataRequest {
private static final TrieNodeDecoder nodeDecoder = TrieNodeDecoder.create();
TrieNodeDataRequest(final Kind kind, final Hash hash) {
super(kind, hash);
}
@Override
public Stream<NodeDataRequest> getChildRequests() {
if (getData() == null) {
// If this node hasn't been downloaded yet, we can't return any child data
return Stream.empty();
}
final Node<BytesValue> node = nodeDecoder.decode(getData());
return getRequestsFromLoadedTrieNode(node);
}
private Stream<NodeDataRequest> getRequestsFromLoadedTrieNode(final Node<BytesValue> trieNode) {
// Process this node's children
final Stream<NodeDataRequest> childRequests =
trieNode
.getChildren()
.map(List::stream)
.map(s -> s.flatMap(this::getRequestsFromChildTrieNode))
.orElse(Stream.of());
// Process value at this node, if present
return trieNode
.getValue()
.map(v -> Stream.concat(childRequests, (getRequestsFromTrieNodeValue(v).stream())))
.orElse(childRequests);
}
private Stream<NodeDataRequest> getRequestsFromChildTrieNode(final Node<BytesValue> trieNode) {
if (trieNode.isReferencedByHash()) {
// If child nodes are reference by hash, we need to download them
NodeDataRequest req = createChildNodeDataRequest(Hash.wrap(trieNode.getHash()));
return Stream.of(req);
}
// Otherwise if the child's value has been inlined we can go ahead and process it
return getRequestsFromLoadedTrieNode(trieNode);
}
protected abstract NodeDataRequest createChildNodeDataRequest(final Hash childHash);
protected abstract List<NodeDataRequest> getRequestsFromTrieNodeValue(final BytesValue value);
}

@ -0,0 +1,201 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetNodeDataFromPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.BigQueue;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class WorldStateDownloader {
private enum Status {
IDLE,
RUNNING,
DONE
}
private final EthContext ethContext;
// The target header for which we want to retrieve world state
private final BlockHeader header;
private final BigQueue<NodeDataRequest> pendingRequests;
private final WorldStateStorage.Updater worldStateStorageUpdater;
private final int hashCountPerRequest;
private final int maxOutstandingRequests;
private final AtomicInteger outstandingRequests = new AtomicInteger(0);
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final WorldStateStorage worldStateStorage;
private final AtomicBoolean sendingRequests = new AtomicBoolean(false);
private volatile CompletableFuture<Void> future;
private volatile Status status = Status.IDLE;
public WorldStateDownloader(
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final BlockHeader header,
final BigQueue<NodeDataRequest> pendingRequests,
final int hashCountPerRequest,
final int maxOutstandingRequests,
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.ethContext = ethContext;
this.worldStateStorage = worldStateStorage;
this.header = header;
this.pendingRequests = pendingRequests;
this.hashCountPerRequest = hashCountPerRequest;
this.maxOutstandingRequests = maxOutstandingRequests;
this.ethTasksTimer = ethTasksTimer;
this.worldStateStorageUpdater = worldStateStorage.updater();
Hash stateRoot = header.getStateRoot();
if (stateRoot.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
// If we're requesting data for an empty world state, we're already done
markDone();
} else {
pendingRequests.enqueue(NodeDataRequest.createAccountDataRequest(header.getStateRoot()));
}
}
public CompletableFuture<Void> run() {
synchronized (this) {
if (status == Status.DONE || status == Status.RUNNING) {
return future;
}
status = Status.RUNNING;
future = new CompletableFuture<>();
}
requestNodeData();
return future;
}
private void requestNodeData() {
if (sendingRequests.compareAndSet(false, true)) {
while (shouldRequestNodeData()) {
Optional<EthPeer> maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber());
if (!maybePeer.isPresent()) {
// If no peer is available, wait and try again
waitForNewPeer().whenComplete((r, t) -> requestNodeData());
break;
} else {
EthPeer peer = maybePeer.get();
// Collect data to be requested
List<NodeDataRequest> toRequest = new ArrayList<>();
for (int i = 0; i < hashCountPerRequest; i++) {
NodeDataRequest pendingRequest = pendingRequests.dequeue();
if (pendingRequest == null) {
break;
}
toRequest.add(pendingRequest);
}
// Request and process node data
outstandingRequests.incrementAndGet();
sendAndProcessRequests(peer, toRequest)
.whenComplete(
(res, error) -> {
if (outstandingRequests.decrementAndGet() == 0 && pendingRequests.isEmpty()) {
// We're done
worldStateStorageUpdater.commit();
markDone();
} else {
// Send out additional requests
requestNodeData();
}
});
}
}
sendingRequests.set(false);
}
}
private synchronized void markDone() {
if (future == null) {
future = CompletableFuture.completedFuture(null);
} else {
future.complete(null);
}
status = Status.DONE;
}
private boolean shouldRequestNodeData() {
return !future.isDone()
&& outstandingRequests.get() < maxOutstandingRequests
&& !pendingRequests.isEmpty();
}
private CompletableFuture<?> waitForNewPeer() {
return ethContext
.getScheduler()
.timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5));
}
private CompletableFuture<?> sendAndProcessRequests(
final EthPeer peer, final List<NodeDataRequest> requests) {
List<Hash> hashes =
requests.stream().map(NodeDataRequest::getHash).distinct().collect(Collectors.toList());
return GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer)
.assignPeer(peer)
.run()
.thenApply(PeerTaskResult::getResult)
.thenApply(this::mapNodeDataByHash)
.whenComplete(
(data, err) -> {
boolean requestFailed = err != null;
for (NodeDataRequest request : requests) {
BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
if (matchingData == null) {
pendingRequests.enqueue(request);
} else {
// Persist request data
request.setData(matchingData);
request.persist(worldStateStorageUpdater);
// Queue child requests
request
.getChildRequests()
.filter(n -> !worldStateStorage.contains(n.getHash()))
.forEach(pendingRequests::enqueue);
}
}
});
}
private Map<Hash, BytesValue> mapNodeDataByHash(final List<BytesValue> data) {
// Map data by hash
Map<Hash, BytesValue> dataByHash = new HashMap<>();
data.stream().forEach(d -> dataByHash.put(Hash.hash(d), d));
return dataByHash;
}
}

@ -16,6 +16,7 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/** Schedules tasks that run immediately and synchronously for testing. */
public class DeterministicEthScheduler extends EthScheduler {
@ -23,7 +24,7 @@ public class DeterministicEthScheduler extends EthScheduler {
private final TimeoutPolicy timeoutPolicy;
DeterministicEthScheduler() {
this(() -> false);
this(TimeoutPolicy.NEVER);
}
DeterministicEthScheduler(final TimeoutPolicy timeoutPolicy) {
@ -55,6 +56,19 @@ public class DeterministicEthScheduler extends EthScheduler {
@FunctionalInterface
public interface TimeoutPolicy {
TimeoutPolicy NEVER = () -> false;
boolean shouldTimeout();
static TimeoutPolicy timeoutXTimes(final int times) {
final AtomicInteger timeouts = new AtomicInteger(times);
return () -> {
if (timeouts.get() <= 0) {
return false;
}
timeouts.decrementAndGet();
return true;
};
}
}
}

@ -47,7 +47,11 @@ public class EthProtocolManagerTestUtil {
public static EthProtocolManager create(
final Blockchain blockchain, final WorldStateArchive worldStateArchive) {
return create(blockchain, worldStateArchive, () -> false);
return create(blockchain, worldStateArchive, TimeoutPolicy.NEVER);
}
public static EthProtocolManager create() {
return create(TimeoutPolicy.NEVER);
}
public static EthProtocolManager create(final TimeoutPolicy timeoutPolicy) {
@ -59,10 +63,6 @@ public class EthProtocolManagerTestUtil {
return create(blockchain, worldStateArchive, timeoutPolicy);
}
public static EthProtocolManager create() {
return create(() -> false);
}
public static void broadcastMessage(
final EthProtocolManager ethProtocolManager,
final RespondingEthPeer peer,

@ -54,7 +54,7 @@ public class BlockchainSetupUtil<C> {
private final List<Block> blocks;
private long maxBlockNumber;
public BlockchainSetupUtil(
private BlockchainSetupUtil(
final GenesisState genesisState,
final MutableBlockchain blockchain,
final ProtocolContext<C> protocolContext,

@ -0,0 +1,283 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.Account;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.MutableWorldState;
import tech.pegasys.pantheon.ethereum.core.WorldState;
import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.queue.BigQueue;
import tech.pegasys.pantheon.services.queue.InMemoryBigQueue;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Test;
public class WorldStateDownloaderTest {
private static final Hash EMPTY_TRIE_ROOT = Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH);
@Test
public void downloadWorldStateFromPeers_onePeerOneWithManyRequestsOneAtATime() {
downloadAvailableWorldStateFromPeers(1, 50, 1, 1);
}
@Test
public void downloadWorldStateFromPeers_onePeerOneWithManyRequests() {
downloadAvailableWorldStateFromPeers(1, 50, 1, 10);
}
@Test
public void downloadWorldStateFromPeers_onePeerWithSingleRequest() {
downloadAvailableWorldStateFromPeers(1, 1, 100, 10);
}
@Test
public void downloadWorldStateFromPeers_largeStateFromMultiplePeers() {
downloadAvailableWorldStateFromPeers(5, 100, 10, 10);
}
@Test
public void downloadWorldStateFromPeers_smallStateFromMultiplePeers() {
downloadAvailableWorldStateFromPeers(5, 5, 1, 10);
}
@Test
public void downloadWorldStateFromPeers_singleRequestWithMultiplePeers() {
downloadAvailableWorldStateFromPeers(5, 1, 50, 50);
}
@Test
public void downloadEmptyWorldState() {
BlockDataGenerator dataGen = new BlockDataGenerator(1);
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
BlockHeader header =
dataGen
.block(BlockOptions.create().setStateRoot(EMPTY_TRIE_ROOT).setBlockNumber(10))
.getHeader();
// Create some peers
List<RespondingEthPeer> peers =
Stream.generate(
() -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber()))
.limit(5)
.collect(Collectors.toList());
BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateDownloader downloader =
new WorldStateDownloader(
ethProtocolManager.ethContext(),
localStorage,
header,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
CompletableFuture<Void> future = downloader.run();
assertThat(future).isDone();
// Peers should not have been queried
for (RespondingEthPeer peer : peers) {
assertThat(peer.hasOutstandingRequests()).isFalse();
}
}
@Test
public void canRecoverFromTimeouts() {
BlockDataGenerator dataGen = new BlockDataGenerator(1);
TimeoutPolicy timeoutPolicy = TimeoutPolicy.timeoutXTimes(2);
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(timeoutPolicy);
// Setup "remote" state
final WorldStateStorage remoteStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final WorldStateArchive remoteWorldStateArchive = new WorldStateArchive(remoteStorage);
final MutableWorldState remoteWorldState = remoteWorldStateArchive.getMutable();
// Generate accounts and save corresponding state root
final List<Account> accounts = dataGen.createRandomAccounts(remoteWorldState, 20);
final Hash stateRoot = remoteWorldState.rootHash();
assertThat(stateRoot).isNotEqualTo(EMPTY_TRIE_ROOT); // Sanity check
BlockHeader header =
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();
// Create some peers
List<RespondingEthPeer> peers =
Stream.generate(
() -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber()))
.limit(5)
.collect(Collectors.toList());
BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateDownloader downloader =
new WorldStateDownloader(
ethProtocolManager.ethContext(),
localStorage,
header,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
CompletableFuture<Void> result = downloader.run();
// Respond to node data requests
Responder responder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
while (!result.isDone()) {
for (RespondingEthPeer peer : peers) {
peer.respond(responder);
}
}
// Check that all expected account data was downloaded
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
final WorldState localWorldState = localWorldStateArchive.get(stateRoot);
assertThat(result).isDone();
assertAccountsMatch(localWorldState, accounts);
}
private void downloadAvailableWorldStateFromPeers(
final int peerCount,
final int accountCount,
final int hashesPerRequest,
final int maxOutstandingRequests) {
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final int trailingPeerCount = 5;
BlockDataGenerator dataGen = new BlockDataGenerator(1);
// Setup "remote" state
final WorldStateStorage remoteStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final WorldStateArchive remoteWorldStateArchive = new WorldStateArchive(remoteStorage);
final MutableWorldState remoteWorldState = remoteWorldStateArchive.getMutable();
// Generate accounts and save corresponding state root
final List<Account> accounts = dataGen.createRandomAccounts(remoteWorldState, accountCount);
final Hash stateRoot = remoteWorldState.rootHash();
assertThat(stateRoot).isNotEqualTo(EMPTY_TRIE_ROOT); // Sanity check
BlockHeader header =
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();
// Generate more data that should not be downloaded
final List<Account> otherAccounts = dataGen.createRandomAccounts(remoteWorldState, 5);
Hash otherStateRoot = remoteWorldState.rootHash();
BlockHeader otherHeader =
dataGen
.block(BlockOptions.create().setStateRoot(otherStateRoot).setBlockNumber(11))
.getHeader();
assertThat(otherStateRoot).isNotEqualTo(stateRoot); // Sanity check
BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
WorldStateDownloader downloader =
new WorldStateDownloader(
ethProtocolManager.ethContext(),
localStorage,
header,
queue,
hashesPerRequest,
maxOutstandingRequests,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
// Create some peers that can respond
List<RespondingEthPeer> usefulPeers =
Stream.generate(
() -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber()))
.limit(peerCount)
.collect(Collectors.toList());
// And some irrelevant peers
List<RespondingEthPeer> trailingPeers =
Stream.generate(
() ->
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, header.getNumber() - 1L))
.limit(trailingPeerCount)
.collect(Collectors.toList());
// Start downloader
CompletableFuture<?> result = downloader.run();
// Respond to node data requests
Responder responder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
while (!result.isDone()) {
for (RespondingEthPeer peer : usefulPeers) {
peer.respond(responder);
}
}
// Check that trailing peers were not queried for data
for (RespondingEthPeer trailingPeer : trailingPeers) {
assertThat(trailingPeer.hasOutstandingRequests()).isFalse();
}
// Check that all expected account data was downloaded
final WorldState localWorldState = localWorldStateArchive.get(stateRoot);
assertThat(result).isDone();
assertAccountsMatch(localWorldState, accounts);
// We shouldn't have any extra data locally
assertThat(localStorage.contains(otherHeader.getStateRoot())).isFalse();
for (Account otherAccount : otherAccounts) {
assertThat(localWorldState.get(otherAccount.getAddress())).isNull();
}
}
private void assertAccountsMatch(
final WorldState worldState, final List<Account> expectedAccounts) {
for (Account expectedAccount : expectedAccounts) {
Account actualAccount = worldState.get(expectedAccount.getAddress());
assertThat(actualAccount).isNotNull();
// Check each field
assertThat(actualAccount.getNonce()).isEqualTo(expectedAccount.getNonce());
assertThat(actualAccount.getCode()).isEqualTo(expectedAccount.getCode());
assertThat(actualAccount.getBalance()).isEqualTo(expectedAccount.getBalance());
Map<Bytes32, UInt256> actualStorage = actualAccount.storageEntriesFrom(Bytes32.ZERO, 500);
Map<Bytes32, UInt256> expectedStorage = expectedAccount.storageEntriesFrom(Bytes32.ZERO, 500);
assertThat(actualStorage).isEqualTo(expectedStorage);
}
}
}

@ -23,6 +23,8 @@ import tech.pegasys.pantheon.util.bytes.MutableBytesValue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
@ -73,6 +75,11 @@ class BranchNode<V> implements Node<V> {
return value;
}
@Override
public Optional<List<Node<V>>> getChildren() {
return Optional.of(Collections.unmodifiableList(children));
}
public Node<V> child(final byte index) {
return children.get(index);
}
@ -103,11 +110,10 @@ class BranchNode<V> implements Node<V> {
@Override
public BytesValue getRlpRef() {
final BytesValue rlp = getRlp();
if (rlp.size() < 32) {
return rlp;
} else {
if (isReferencedByHash()) {
return RLP.encodeOne(getHash());
} else {
return getRlp();
}
}

@ -22,6 +22,8 @@ import tech.pegasys.pantheon.util.bytes.BytesValues;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
class ExtensionNode<V> implements Node<V> {
@ -58,7 +60,12 @@ class ExtensionNode<V> implements Node<V> {
@Override
public Optional<V> getValue() {
throw new UnsupportedOperationException();
return Optional.empty();
}
@Override
public Optional<List<Node<V>>> getChildren() {
return Optional.of(Collections.singletonList(child));
}
public Node<V> getChild() {
@ -85,11 +92,10 @@ class ExtensionNode<V> implements Node<V> {
@Override
public BytesValue getRlpRef() {
final BytesValue rlp = getRlp();
if (rlp.size() < 32) {
return rlp;
} else {
if (isReferencedByHash()) {
return RLP.encodeOne(getHash());
} else {
return getRlp();
}
}

@ -21,6 +21,7 @@ import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
@ -64,6 +65,11 @@ class LeafNode<V> implements Node<V> {
return Optional.of(value);
}
@Override
public Optional<List<Node<V>>> getChildren() {
return Optional.empty();
}
@Override
public BytesValue getRlp() {
if (rlp != null) {
@ -85,11 +91,10 @@ class LeafNode<V> implements Node<V> {
@Override
public BytesValue getRlpRef() {
final BytesValue rlp = getRlp();
if (rlp.size() < 32) {
return rlp;
} else {
if (isReferencedByHash()) {
return RLP.encodeOne(getHash());
} else {
return getRlp();
}
}

@ -16,6 +16,7 @@ import static tech.pegasys.pantheon.crypto.Hash.keccak256;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Map;
import java.util.Optional;
@ -23,7 +24,8 @@ import java.util.Optional;
/** An Merkle Patricial Trie. */
public interface MerklePatriciaTrie<K, V> {
Bytes32 EMPTY_TRIE_ROOT_HASH = keccak256(RLP.NULL);
BytesValue EMPTY_TRIE_NODE = RLP.NULL;
Bytes32 EMPTY_TRIE_NODE_HASH = keccak256(EMPTY_TRIE_NODE);
/**
* Returns an {@code Optional} of value mapped to the hash if it exists; otherwise empty.

@ -16,13 +16,13 @@ package tech.pegasys.pantheon.ethereum.trie;
* This exception is thrown when there is an issue retrieving or decoding values from {@link
* MerkleStorage}.
*/
public class MerkleStorageException extends RuntimeException {
public class MerkleTrieException extends RuntimeException {
public MerkleStorageException(final String message) {
public MerkleTrieException(final String message) {
super(message);
}
public MerkleStorageException(final String message, final Exception cause) {
public MerkleTrieException(final String message, final Exception cause) {
super(message, cause);
}
}

@ -15,9 +15,10 @@ package tech.pegasys.pantheon.ethereum.trie;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.List;
import java.util.Optional;
interface Node<V> {
public interface Node<V> {
Node<V> accept(PathNodeVisitor<V> visitor, BytesValue path);
@ -27,10 +28,23 @@ interface Node<V> {
Optional<V> getValue();
Optional<List<Node<V>>> getChildren();
BytesValue getRlp();
BytesValue getRlpRef();
/**
* Whether a reference to this node should be represented as a hash of the rlp, or the node rlp
* itself should be inlined (the rlp stored directly in the parent node). If true, the node is
* referenced by hash. If false, the node is referenced by its rlp-encoded value.
*
* @return true if this node should be referenced by hash
*/
default boolean isReferencedByHash() {
return getRlp().size() >= 32;
}
Bytes32 getHash();
Node<V> replacePath(BytesValue path);

@ -12,17 +12,13 @@
*/
package tech.pegasys.pantheon.ethereum.trie;
import static tech.pegasys.pantheon.crypto.Hash.keccak256;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.List;
import java.util.Optional;
class NullNode<V> implements Node<V> {
private static final Bytes32 HASH = keccak256(RLP.NULL);
@SuppressWarnings("rawtypes")
private static final NullNode instance = new NullNode();
@ -53,19 +49,24 @@ class NullNode<V> implements Node<V> {
return Optional.empty();
}
@Override
public Optional<List<Node<V>>> getChildren() {
return Optional.empty();
}
@Override
public BytesValue getRlp() {
return RLP.NULL;
return MerklePatriciaTrie.EMPTY_TRIE_NODE;
}
@Override
public BytesValue getRlpRef() {
return RLP.NULL;
return MerklePatriciaTrie.EMPTY_TRIE_NODE;
}
@Override
public Bytes32 getHash() {
return HASH;
return MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH;
}
@Override

@ -45,7 +45,7 @@ public class StoredMerklePatriciaTrie<K extends BytesValue, V> implements Merkle
final NodeLoader nodeLoader,
final Function<V, BytesValue> valueSerializer,
final Function<BytesValue, V> valueDeserializer) {
this(nodeLoader, MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, valueSerializer, valueDeserializer);
this(nodeLoader, MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, valueSerializer, valueDeserializer);
}
/**
@ -64,7 +64,7 @@ public class StoredMerklePatriciaTrie<K extends BytesValue, V> implements Merkle
final Function<BytesValue, V> valueDeserializer) {
this.nodeFactory = new StoredNodeFactory<>(nodeLoader, valueSerializer, valueDeserializer);
this.root =
rootHash.equals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH)
rootHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)
? NullNode.instance()
: new StoredNode<>(nodeFactory, rootHash);
}
@ -99,7 +99,7 @@ public class StoredMerklePatriciaTrie<K extends BytesValue, V> implements Merkle
// Reset root so dirty nodes can be garbage collected
final Bytes32 rootHash = root.getHash();
this.root =
rootHash.equals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH)
rootHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)
? NullNode.instance()
: new StoredNode<>(nodeFactory, rootHash);
}

@ -16,6 +16,7 @@ import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.List;
import java.util.Optional;
class StoredNode<V> implements Node<V> {
@ -63,10 +64,14 @@ class StoredNode<V> implements Node<V> {
return load().getValue();
}
@Override
public Optional<List<Node<V>>> getChildren() {
return load().getChildren();
}
@Override
public BytesValue getRlp() {
// Getting the rlp representation is only needed when persisting a concrete node
throw new UnsupportedOperationException();
return load().getRlp();
}
@Override
@ -75,6 +80,12 @@ class StoredNode<V> implements Node<V> {
return RLP.encodeOne(hash);
}
@Override
public boolean isReferencedByHash() {
// Stored nodes represent only nodes that are referenced by hash
return true;
}
@Override
public Bytes32 getHash() {
return hash;
@ -87,7 +98,11 @@ class StoredNode<V> implements Node<V> {
private Node<V> load() {
if (loaded == null) {
loaded = nodeFactory.retrieve(hash);
loaded =
nodeFactory
.retrieve(hash)
.orElseThrow(
() -> new MerkleTrieException("Unable to load trie node value for hash " + hash));
}
return loaded;
@ -95,7 +110,10 @@ class StoredNode<V> implements Node<V> {
@Override
public String print() {
final String value = load().print();
return value;
if (loaded == null) {
return "StoredNode:" + "\n\tRef: " + getRlpRef();
} else {
return load().print();
}
}
}

@ -87,7 +87,7 @@ class StoredNodeFactory<V> implements NodeFactory<V> {
return node;
}
public Node<V> retrieve(final Bytes32 hash) throws MerkleStorageException {
public Optional<Node<V>> retrieve(final Bytes32 hash) throws MerkleTrieException {
return nodeLoader
.getNode(hash)
.map(
@ -97,16 +97,19 @@ class StoredNodeFactory<V> implements NodeFactory<V> {
assert (hash.equals(node.getHash()))
: "Node hash " + node.getHash() + " not equal to expected " + hash;
return node;
})
.orElseThrow(() -> new MerkleStorageException("Missing value for hash " + hash));
});
}
public Node<V> decode(final BytesValue rlp) {
return decode(rlp, () -> String.format("Failed to decode value %s", rlp.toString()));
}
private Node<V> decode(final BytesValue rlp, final Supplier<String> errMessage)
throws MerkleStorageException {
throws MerkleTrieException {
try {
return decode(RLP.input(rlp), errMessage);
} catch (final RLPException ex) {
throw new MerkleStorageException(errMessage.get(), ex);
throw new MerkleTrieException(errMessage.get(), ex);
}
}
@ -123,8 +126,7 @@ class StoredNodeFactory<V> implements NodeFactory<V> {
try {
path = CompactEncoding.decode(encodedPath);
} catch (final IllegalArgumentException ex) {
throw new MerkleStorageException(
errMessage.get() + ": invalid path " + encodedPath, ex);
throw new MerkleTrieException(errMessage.get() + ": invalid path " + encodedPath, ex);
}
final int size = path.size();
@ -138,7 +140,7 @@ class StoredNodeFactory<V> implements NodeFactory<V> {
return decodeBranch(nodeRLPs, errMessage);
default:
throw new MerkleStorageException(
throw new MerkleTrieException(
errMessage.get() + format(": invalid list size %s", nodesCount));
}
} finally {
@ -189,7 +191,7 @@ class StoredNodeFactory<V> implements NodeFactory<V> {
private LeafNode<V> decodeLeaf(
final BytesValue path, final RLPInput valueRlp, final Supplier<String> errMessage) {
if (valueRlp.nextIsNull()) {
throw new MerkleStorageException(errMessage.get() + ": leaf has null value");
throw new MerkleTrieException(errMessage.get() + ": leaf has null value");
}
final V value = decodeValue(valueRlp, errMessage);
return new LeafNode<>(path, value, this, valueSerializer);
@ -198,7 +200,7 @@ class StoredNodeFactory<V> implements NodeFactory<V> {
@SuppressWarnings("unchecked")
private NullNode<V> decodeNull(final RLPInput nodeRLPs, final Supplier<String> errMessage) {
if (!nodeRLPs.nextIsNull()) {
throw new MerkleStorageException(errMessage.get() + ": list size 1 but not null");
throw new MerkleTrieException(errMessage.get() + ": list size 1 but not null");
}
nodeRLPs.skipNext();
return NULL_NODE;
@ -209,7 +211,7 @@ class StoredNodeFactory<V> implements NodeFactory<V> {
try {
bytes = valueRlp.readBytesValue();
} catch (final RLPException ex) {
throw new MerkleStorageException(
throw new MerkleTrieException(
errMessage.get() + ": failed decoding value rlp " + valueRlp, ex);
}
return deserializeValue(errMessage, bytes);
@ -220,8 +222,7 @@ class StoredNodeFactory<V> implements NodeFactory<V> {
try {
value = valueDeserializer.apply(bytes);
} catch (final IllegalArgumentException ex) {
throw new MerkleStorageException(
errMessage.get() + ": failed deserializing value " + bytes, ex);
throw new MerkleTrieException(errMessage.get() + ": failed deserializing value " + bytes, ex);
}
return value;
}

@ -0,0 +1,36 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.trie;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Optional;
import java.util.function.Function;
public class TrieNodeDecoder {
private final StoredNodeFactory<BytesValue> nodeFactory;
private TrieNodeDecoder() {
nodeFactory =
new StoredNodeFactory<>((h) -> Optional.empty(), Function.identity(), Function.identity());
}
public static TrieNodeDecoder create() {
return new TrieNodeDecoder();
}
public Node<BytesValue> decode(final BytesValue rlp) {
return nodeFactory.decode(rlp);
}
}

@ -0,0 +1,38 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
apply plugin: 'java-library'
jar {
baseName 'pantheon-queue'
manifest {
attributes(
'Specification-Title': baseName,
'Specification-Version': project.version,
'Implementation-Title': baseName,
'Implementation-Version': calculateVersion()
)
}
}
dependencies {
api project(':util')
implementation project(':metrics')
implementation 'org.apache.logging.log4j:log4j-api'
implementation 'com.google.guava:guava'
runtime 'org.apache.logging.log4j:log4j-core'
testImplementation 'junit:junit'
}

@ -0,0 +1,33 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;
import java.io.Closeable;
/**
* Represents a very large thread-safe queue that may exceed memory limits.
*
* @param <T> the type of data held in the queue
*/
public interface BigQueue<T> extends Closeable {
void enqueue(T value);
T dequeue();
long size();
default boolean isEmpty() {
return size() == 0;
}
}

@ -0,0 +1,40 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class InMemoryBigQueue<T> implements BigQueue<T> {
private final Queue<T> internalQueue = new ConcurrentLinkedQueue<>();
@Override
public void enqueue(final T value) {
internalQueue.add(value);
}
@Override
public T dequeue() {
return internalQueue.poll();
}
@Override
public long size() {
return internalQueue.size();
}
@Override
public void close() {
internalQueue.clear();
}
}

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" monitorInterval="30">
<Properties>
<Property name="root.log.level">INFO</Property>
</Properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSSZZZ} [%t] %-5level %c{1.} %msg%n" />
</Console>
</Appenders>
<Loggers>
<Root level="${sys:root.log.level}">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>

@ -33,6 +33,7 @@ include 'ethereum:trie'
include 'metrics'
include 'pantheon'
include 'services:kvstore'
include 'services:queue'
include 'testutil'
include 'util'
include 'errorprone-checks'

Loading…
Cancel
Save