added compatibility with the snap protocol (#3213)

Second PR to add compatibility with snap protocol messages.

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
pull/3232/head
matkt 3 years ago committed by GitHub
parent a890e427ca
commit 97bad4a7b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  2. 8
      besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java
  3. 24
      besu/src/main/java/org/hyperledger/besu/controller/IbftBesuControllerBuilder.java
  4. 5
      besu/src/main/java/org/hyperledger/besu/controller/IbftLegacyBesuControllerBuilder.java
  5. 30
      besu/src/main/java/org/hyperledger/besu/controller/QbftBesuControllerBuilder.java
  6. 2
      consensus/ibftlegacy/src/main/java/org/hyperledger/besu/consensus/ibftlegacy/protocol/Istanbul99Protocol.java
  7. 3
      consensus/ibftlegacy/src/test/java/org/hyperledger/besu/consensus/ibftlegacy/protocol/Istanbul99ProtocolManagerTest.java
  8. 99
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/SnapProtocol.java
  9. 19
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthContext.java
  10. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthMessages.java
  11. 255
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java
  12. 13
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  13. 14
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java
  14. 48
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/RequestId.java
  15. 14
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/RequestManager.java
  16. 156
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java
  17. 79
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapServer.java
  18. 139
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/AccountRangeMessage.java
  19. 93
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/ByteCodesMessage.java
  20. 114
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/GetAccountRangeMessage.java
  21. 112
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/GetByteCodesMessage.java
  22. 171
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/GetStorageRangeMessage.java
  23. 120
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/GetTrieNodes.java
  24. 31
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/SnapV1.java
  25. 127
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/StorageRangeMessage.java
  26. 85
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/TrieNodes.java
  27. 15
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java
  28. 208
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/MessageWrapperTest.java
  29. 71
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/AbstractSnapMessageData.java
  30. 26
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/MessageData.java

@ -37,11 +37,13 @@ import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.ClassicForkPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.DaoForkPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
@ -310,13 +312,15 @@ public abstract class BesuControllerBuilder {
new EthPeers(getSupportedProtocol(), clock, metricsSystem, messagePermissioningProviders);
final EthMessages ethMessages = new EthMessages();
final EthMessages snapMessages = new EthMessages();
final EthScheduler scheduler =
new EthScheduler(
syncConfig.getDownloaderParallelism(),
syncConfig.getTransactionsParallelism(),
syncConfig.getComputationParallelism(),
metricsSystem);
final EthContext ethContext = new EthContext(ethPeers, ethMessages, scheduler);
final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler);
final SyncState syncState = new SyncState(blockchain, ethPeers);
final boolean fastSyncEnabled = SyncMode.FAST.equals(syncConfig.getSyncMode());
@ -331,6 +335,8 @@ public abstract class BesuControllerBuilder {
miningParameters.getMinTransactionGasPrice(),
transactionPoolConfiguration);
final List<PeerValidator> peerValidators = createPeerValidators(protocolSchedule);
final EthProtocolManager ethProtocolManager =
createEthProtocolManager(
protocolContext,
@ -341,7 +347,10 @@ public abstract class BesuControllerBuilder {
ethContext,
ethMessages,
scheduler,
createPeerValidators(protocolSchedule));
peerValidators);
final Optional<SnapProtocolManager> maybeSnapProtocolManager =
createSnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive);
final Synchronizer synchronizer =
new DefaultSynchronizer(
@ -370,7 +379,8 @@ public abstract class BesuControllerBuilder {
createAdditionalPluginServices(blockchain, protocolContext);
final SubProtocolConfiguration subProtocolConfiguration =
createSubProtocolConfiguration(ethProtocolManager);
createSubProtocolConfiguration(ethProtocolManager, maybeSnapProtocolManager);
;
final JsonRpcMethods additionalJsonRpcMethodFactory =
createAdditionalJsonRpcMethodFactory(protocolContext);
@ -407,8 +417,15 @@ public abstract class BesuControllerBuilder {
}
protected SubProtocolConfiguration createSubProtocolConfiguration(
final EthProtocolManager ethProtocolManager) {
return new SubProtocolConfiguration().withSubProtocol(EthProtocol.get(), ethProtocolManager);
final EthProtocolManager ethProtocolManager,
final Optional<SnapProtocolManager> maybeSnapProtocolManager) {
final SubProtocolConfiguration subProtocolConfiguration =
new SubProtocolConfiguration().withSubProtocol(EthProtocol.get(), ethProtocolManager);
maybeSnapProtocolManager.ifPresent(
snapProtocolManager -> {
subProtocolConfiguration.withSubProtocol(SnapProtocol.get(), snapProtocolManager);
});
return subProtocolConfiguration;
}
protected abstract MiningCoordinator createMiningCoordinator(
@ -466,6 +483,16 @@ public abstract class BesuControllerBuilder {
blockchain, worldStateArchive, protocolSchedule, consensusContextFactory);
}
@SuppressWarnings("unused")
private Optional<SnapProtocolManager> createSnapProtocolManager(
final List<PeerValidator> peerValidators,
final EthPeers ethPeers,
final EthMessages snapMessages,
final WorldStateArchive worldStateArchive) {
// TODO implement method when flag will be available
return Optional.empty();
}
private WorldStateArchive createWorldStateArchive(
final WorldStateStorage worldStateStorage, final Blockchain blockchain) {
switch (dataStorageConfiguration.getDataStorageFormat()) {

@ -46,6 +46,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
@ -204,8 +205,11 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
@Override
protected SubProtocolConfiguration createSubProtocolConfiguration(
final EthProtocolManager ethProtocolManager) {
return besuControllerBuilderSchedule.get(0L).createSubProtocolConfiguration(ethProtocolManager);
final EthProtocolManager ethProtocolManager,
final Optional<SnapProtocolManager> maybeSnapProtocolManager) {
return besuControllerBuilderSchedule
.get(0L)
.createSubProtocolConfiguration(ethProtocolManager, maybeSnapProtocolManager);
}
@Override

@ -63,7 +63,9 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Util;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@ -74,6 +76,7 @@ import org.hyperledger.besu.util.Subscribers;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -109,13 +112,20 @@ public class IbftBesuControllerBuilder extends BftBesuControllerBuilder {
@Override
protected SubProtocolConfiguration createSubProtocolConfiguration(
final EthProtocolManager ethProtocolManager) {
return new SubProtocolConfiguration()
.withSubProtocol(EthProtocol.get(), ethProtocolManager)
.withSubProtocol(
IbftSubProtocol.get(),
new BftProtocolManager(
bftEventQueue, peers, IbftSubProtocol.IBFV1, IbftSubProtocol.get().getName()));
final EthProtocolManager ethProtocolManager,
final Optional<SnapProtocolManager> maybeSnapProtocolManager) {
final SubProtocolConfiguration subProtocolConfiguration =
new SubProtocolConfiguration()
.withSubProtocol(EthProtocol.get(), ethProtocolManager)
.withSubProtocol(
IbftSubProtocol.get(),
new BftProtocolManager(
bftEventQueue, peers, IbftSubProtocol.IBFV1, IbftSubProtocol.get().getName()));
maybeSnapProtocolManager.ifPresent(
snapProtocolManager -> {
subProtocolConfiguration.withSubProtocol(SnapProtocol.get(), snapProtocolManager);
});
return subProtocolConfiguration;
}
@Override

@ -36,6 +36,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
@ -44,6 +45,7 @@ import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.util.List;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -55,7 +57,8 @@ public class IbftLegacyBesuControllerBuilder extends BesuControllerBuilder {
@Override
protected SubProtocolConfiguration createSubProtocolConfiguration(
final EthProtocolManager ethProtocolManager) {
final EthProtocolManager ethProtocolManager,
final Optional<SnapProtocolManager> snapProtocolManage) {
return new SubProtocolConfiguration()
.withSubProtocol(Istanbul99Protocol.get(), ethProtocolManager);
}

@ -73,7 +73,9 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Util;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@ -85,6 +87,7 @@ import org.hyperledger.besu.util.Subscribers;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -145,16 +148,23 @@ public class QbftBesuControllerBuilder extends BftBesuControllerBuilder {
@Override
protected SubProtocolConfiguration createSubProtocolConfiguration(
final EthProtocolManager ethProtocolManager) {
return new SubProtocolConfiguration()
.withSubProtocol(EthProtocol.get(), ethProtocolManager)
.withSubProtocol(
Istanbul100SubProtocol.get(),
new BftProtocolManager(
bftEventQueue,
peers,
Istanbul100SubProtocol.ISTANBUL_100,
Istanbul100SubProtocol.get().getName()));
final EthProtocolManager ethProtocolManager,
final Optional<SnapProtocolManager> maybeSnapProtocolManager) {
final SubProtocolConfiguration subProtocolConfiguration =
new SubProtocolConfiguration()
.withSubProtocol(EthProtocol.get(), ethProtocolManager)
.withSubProtocol(
Istanbul100SubProtocol.get(),
new BftProtocolManager(
bftEventQueue,
peers,
Istanbul100SubProtocol.ISTANBUL_100,
Istanbul100SubProtocol.get().getName()));
maybeSnapProtocolManager.ifPresent(
snapProtocolManager -> {
subProtocolConfiguration.withSubProtocol(SnapProtocol.get(), snapProtocolManager);
});
return subProtocolConfiguration;
}
@Override

@ -30,7 +30,7 @@ import java.util.List;
*/
public class Istanbul99Protocol implements SubProtocol {
private static final String NAME = "istanbul";
public static final String NAME = "istanbul";
private static final int VERSION = 99;
static final Capability ISTANBUL99 = Capability.create(NAME, 99);

@ -106,7 +106,8 @@ public class Istanbul99ProtocolManagerTest {
throws ExecutionException, InterruptedException, TimeoutException {
final CompletableFuture<Void> done = new CompletableFuture<>();
final EthScheduler ethScheduler = new DeterministicEthScheduler(() -> false);
EthPeers peers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), new NoOpMetricsSystem());
EthPeers peers =
new EthPeers(Istanbul99Protocol.NAME, TestClock.fixed(), new NoOpMetricsSystem());
EthMessages messages = new EthMessages();
final BigInteger networkId = BigInteger.ONE;

@ -0,0 +1,99 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.List;
/**
* Snap protocol messages as defined in https://github.com/ethereum/devp2p/blob/master/caps/snap.md}
*/
public class SnapProtocol implements SubProtocol {
public static final String NAME = "snap";
public static final Capability SNAP1 = Capability.create(NAME, SnapVersion.V1);
private static final SnapProtocol INSTANCE = new SnapProtocol();
private static final List<Integer> snap1Messages =
List.of(
SnapV1.GET_ACCOUNT_RANGE,
SnapV1.ACCOUNT_RANGE,
SnapV1.GET_STORAGE_RANGE,
SnapV1.STORAGE_RANGE,
SnapV1.GET_BYTECODES,
SnapV1.BYTECODES,
SnapV1.GET_TRIE_NODES,
SnapV1.TRIE_NODES);
@Override
public String getName() {
return NAME;
}
@Override
public int messageSpace(final int protocolVersion) {
switch (protocolVersion) {
case SnapVersion.V1:
return 17;
default:
return 0;
}
}
@Override
public boolean isValidMessageCode(final int protocolVersion, final int code) {
switch (protocolVersion) {
case SnapVersion.V1:
return snap1Messages.contains(code);
default:
return false;
}
}
@Override
public String messageName(final int protocolVersion, final int code) {
switch (code) {
case SnapV1.GET_ACCOUNT_RANGE:
return "GetAccountRange";
case SnapV1.ACCOUNT_RANGE:
return "AccountRange";
case SnapV1.GET_STORAGE_RANGE:
return "GetStorageRange";
case SnapV1.STORAGE_RANGE:
return "StorageRange";
case SnapV1.GET_BYTECODES:
return "GetBytecodes";
case SnapV1.BYTECODES:
return "Bytecodes";
case SnapV1.GET_TRIE_NODES:
return "GetTrieNodes";
case SnapV1.TRIE_NODES:
return "TrieNodes";
default:
return INVALID_MESSAGE_NAME;
}
}
public static SnapProtocol get() {
return INSTANCE;
}
public static class SnapVersion {
public static final int V1 = 1;
}
}

@ -14,16 +14,31 @@
*/
package org.hyperledger.besu.ethereum.eth.manager;
import java.util.Optional;
public class EthContext {
private final EthPeers ethPeers;
private final EthMessages ethMessages;
private final Optional<EthMessages> snapMessages;
private final EthScheduler scheduler;
public EthContext(
final EthPeers ethPeers,
final EthMessages ethMessages,
final EthMessages snapMessages,
final EthScheduler scheduler) {
this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
this.snapMessages = Optional.of(snapMessages);
this.scheduler = scheduler;
}
public EthContext(
final EthPeers ethPeers, final EthMessages ethMessages, final EthScheduler scheduler) {
this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
this.snapMessages = Optional.empty();
this.scheduler = scheduler;
}
@ -35,6 +50,10 @@ public class EthContext {
return ethMessages;
}
public Optional<EthMessages> getSnapMessages() {
return snapMessages;
}
public EthScheduler getScheduler() {
return scheduler;
}

@ -27,7 +27,7 @@ public class EthMessages {
private final Map<Integer, MessageResponseConstructor> messageResponseConstructorsByCode =
new ConcurrentHashMap<>();
Optional<MessageData> dispatch(final EthMessage ethMessage) {
public Optional<MessageData> dispatch(final EthMessage ethMessage) {
final int code = ethMessage.getData().getCode();
// trigger arbitrary side-effecting listeners

@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.EthPV63;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
@ -27,6 +28,11 @@ import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetNodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetPooledTransactionsMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetAccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetByteCodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetStorageRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetTrieNodes;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
@ -37,9 +43,11 @@ import org.hyperledger.besu.plugin.services.permissioning.NodeMessagePermissioni
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@ -80,16 +88,29 @@ public class EthPeer {
private final AtomicInteger lastProtocolVersion = new AtomicInteger(0);
private volatile long lastRequestTimestamp = 0;
private final RequestManager headersRequestManager;
private final RequestManager bodiesRequestManager;
private final RequestManager receiptsRequestManager;
private final RequestManager nodeDataRequestManager;
private final RequestManager pooledTransactionsRequestManager;
private final Map<String, Map<Integer, RequestManager>> requestManagers;
private final AtomicReference<Consumer<EthPeer>> onStatusesExchanged = new AtomicReference<>();
private final PeerReputation reputation = new PeerReputation();
private final Map<PeerValidator, Boolean> validationStatus = new ConcurrentHashMap<>();
private static final Map<Integer, Integer> roundMessages;
static {
roundMessages = new HashMap<>();
roundMessages.put(EthPV62.BLOCK_HEADERS, EthPV62.GET_BLOCK_HEADERS);
roundMessages.put(EthPV62.BLOCK_BODIES, EthPV62.GET_BLOCK_BODIES);
roundMessages.put(EthPV63.RECEIPTS, EthPV63.GET_RECEIPTS);
roundMessages.put(EthPV63.NODE_DATA, EthPV63.GET_NODE_DATA);
roundMessages.put(EthPV65.POOLED_TRANSACTIONS, EthPV65.GET_POOLED_TRANSACTIONS);
roundMessages.put(SnapV1.ACCOUNT_RANGE, SnapV1.GET_ACCOUNT_RANGE);
roundMessages.put(SnapV1.STORAGE_RANGE, SnapV1.GET_STORAGE_RANGE);
roundMessages.put(SnapV1.BYTECODES, SnapV1.GET_BYTECODES);
roundMessages.put(SnapV1.TRIE_NODES, SnapV1.GET_TRIE_NODES);
}
@VisibleForTesting
public EthPeer(
final PeerConnection connection,
@ -106,13 +127,45 @@ public class EthPeer {
peerValidators.forEach(peerValidator -> validationStatus.put(peerValidator, false));
fullyValidated.set(peerValidators.isEmpty());
this.requestManagers = new HashMap<>();
initEthRequestManagers();
initSnapRequestManagers();
}
private void initEthRequestManagers() {
final boolean supportsRequestId =
getAgreedCapabilities().stream().anyMatch(EthProtocol::isEth66Compatible);
this.headersRequestManager = new RequestManager(this, supportsRequestId);
this.bodiesRequestManager = new RequestManager(this, supportsRequestId);
this.receiptsRequestManager = new RequestManager(this, supportsRequestId);
this.nodeDataRequestManager = new RequestManager(this, supportsRequestId);
this.pooledTransactionsRequestManager = new RequestManager(this, supportsRequestId);
// eth protocol
requestManagers.put(
EthProtocol.NAME,
Map.ofEntries(
Map.entry(
EthPV62.GET_BLOCK_HEADERS,
new RequestManager(this, supportsRequestId, EthProtocol.NAME)),
Map.entry(
EthPV62.GET_BLOCK_BODIES,
new RequestManager(this, supportsRequestId, EthProtocol.NAME)),
Map.entry(
EthPV63.GET_RECEIPTS,
new RequestManager(this, supportsRequestId, EthProtocol.NAME)),
Map.entry(
EthPV63.GET_NODE_DATA,
new RequestManager(this, supportsRequestId, EthProtocol.NAME)),
Map.entry(
EthPV65.GET_POOLED_TRANSACTIONS,
new RequestManager(this, supportsRequestId, EthProtocol.NAME))));
}
private void initSnapRequestManagers() {
// snap protocol
requestManagers.put(
SnapProtocol.NAME,
Map.ofEntries(
Map.entry(SnapV1.GET_ACCOUNT_RANGE, new RequestManager(this, true, SnapProtocol.NAME)),
Map.entry(SnapV1.GET_STORAGE_RANGE, new RequestManager(this, true, SnapProtocol.NAME)),
Map.entry(SnapV1.GET_BYTECODES, new RequestManager(this, true, SnapProtocol.NAME)),
Map.entry(SnapV1.GET_TRIE_NODES, new RequestManager(this, true, SnapProtocol.NAME))));
}
public void markValidated(final PeerValidator validator) {
@ -159,9 +212,19 @@ public class EthPeer {
}
public RequestManager.ResponseStream send(final MessageData messageData) throws PeerNotConnected {
return send(messageData, this.protocolName);
}
public RequestManager.ResponseStream send(
final MessageData messageData, final String protocolName) throws PeerNotConnected {
if (connection.getAgreedCapabilities().stream()
.noneMatch(capability -> capability.getName().equalsIgnoreCase(protocolName))) {
LOG.debug("Protocol {} unavailable for this peer ", protocolName);
return null;
}
if (permissioningProviders.stream()
.anyMatch(p -> !p.isMessagePermitted(connection.getRemoteEnode(), messageData.getCode()))) {
LOG.debug(
LOG.info(
"Permissioning blocked sending of message code {} to {}",
messageData.getCode(),
connection.getRemoteEnode());
@ -176,21 +239,15 @@ public class EthPeer {
return null;
}
switch (messageData.getCode()) {
case EthPV62.GET_BLOCK_HEADERS:
return sendRequest(headersRequestManager, messageData);
case EthPV62.GET_BLOCK_BODIES:
return sendRequest(bodiesRequestManager, messageData);
case EthPV63.GET_RECEIPTS:
return sendRequest(receiptsRequestManager, messageData);
case EthPV63.GET_NODE_DATA:
return sendRequest(nodeDataRequestManager, messageData);
case EthPV65.GET_POOLED_TRANSACTIONS:
return sendRequest(pooledTransactionsRequestManager, messageData);
default:
connection.sendForProtocol(protocolName, messageData);
return null;
if (requestManagers.containsKey(protocolName)) {
final Map<Integer, RequestManager> managers = this.requestManagers.get(protocolName);
if (managers.containsKey(messageData.getCode())) {
return sendRequest(managers.get(messageData.getCode()), messageData);
}
}
connection.sendForProtocol(protocolName, messageData);
return null;
}
public RequestManager.ResponseStream getHeadersByHash(
@ -198,7 +255,9 @@ public class EthPeer {
throws PeerNotConnected {
final GetBlockHeadersMessage message =
GetBlockHeadersMessage.create(hash, maxHeaders, skip, reverse);
return sendRequest(headersRequestManager, message);
final RequestManager requestManager =
requestManagers.get(EthProtocol.NAME).get(EthPV62.GET_BLOCK_HEADERS);
return sendRequest(requestManager, message);
}
public RequestManager.ResponseStream getHeadersByNumber(
@ -206,108 +265,114 @@ public class EthPeer {
throws PeerNotConnected {
final GetBlockHeadersMessage message =
GetBlockHeadersMessage.create(blockNumber, maxHeaders, skip, reverse);
return sendRequest(headersRequestManager, message);
return sendRequest(
requestManagers.get(EthProtocol.NAME).get(EthPV62.GET_BLOCK_HEADERS), message);
}
public RequestManager.ResponseStream getBodies(final List<Hash> blockHashes)
throws PeerNotConnected {
final GetBlockBodiesMessage message = GetBlockBodiesMessage.create(blockHashes);
return sendRequest(bodiesRequestManager, message);
return sendRequest(
requestManagers.get(EthProtocol.NAME).get(EthPV62.GET_BLOCK_BODIES), message);
}
public RequestManager.ResponseStream getReceipts(final List<Hash> blockHashes)
throws PeerNotConnected {
final GetReceiptsMessage message = GetReceiptsMessage.create(blockHashes);
return sendRequest(receiptsRequestManager, message);
return sendRequest(requestManagers.get(EthProtocol.NAME).get(EthPV63.GET_RECEIPTS), message);
}
public RequestManager.ResponseStream getNodeData(final Iterable<Hash> nodeHashes)
throws PeerNotConnected {
final GetNodeDataMessage message = GetNodeDataMessage.create(nodeHashes);
return sendRequest(nodeDataRequestManager, message);
return sendRequest(requestManagers.get(EthProtocol.NAME).get(EthPV63.GET_NODE_DATA), message);
}
public RequestManager.ResponseStream getPooledTransactions(final List<Hash> hashes)
throws PeerNotConnected {
final GetPooledTransactionsMessage message = GetPooledTransactionsMessage.create(hashes);
return sendRequest(pooledTransactionsRequestManager, message);
return sendRequest(
requestManagers.get(EthProtocol.NAME).get(EthPV65.GET_POOLED_TRANSACTIONS), message);
}
public RequestManager.ResponseStream getSnapAccountRange(final GetAccountRangeMessage message)
throws PeerNotConnected {
return sendRequest(
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_ACCOUNT_RANGE), message);
}
public RequestManager.ResponseStream getSnapStorageRange(final GetStorageRangeMessage message)
throws PeerNotConnected {
return sendRequest(
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_STORAGE_RANGE), message);
}
public RequestManager.ResponseStream getSnapBytecode(final GetByteCodesMessage message)
throws PeerNotConnected {
return sendRequest(requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_BYTECODES), message);
}
public RequestManager.ResponseStream getSnapTrieNode(final GetTrieNodes message)
throws PeerNotConnected {
return sendRequest(requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_TRIE_NODES), message);
}
private RequestManager.ResponseStream sendRequest(
final RequestManager requestManager, final MessageData messageData) throws PeerNotConnected {
lastRequestTimestamp = clock.millis();
return requestManager.dispatchRequest(
msgData -> connection.sendForProtocol(protocolName, msgData), messageData);
msgData -> connection.sendForProtocol(requestManager.getProtocolName(), msgData),
messageData);
}
boolean validateReceivedMessage(final EthMessage message) {
public boolean validateReceivedMessage(final EthMessage message, final String protocolName) {
checkArgument(message.getPeer().equals(this), "Mismatched message sent to peer for dispatch");
switch (message.getData().getCode()) {
case EthPV62.BLOCK_HEADERS:
if (headersRequestManager.outstandingRequests() == 0) {
LOG.warn("Unsolicited headers received.");
return false;
}
break;
case EthPV62.BLOCK_BODIES:
if (bodiesRequestManager.outstandingRequests() == 0) {
LOG.warn("Unsolicited bodies received.");
return false;
}
break;
case EthPV63.RECEIPTS:
if (receiptsRequestManager.outstandingRequests() == 0) {
LOG.warn("Unsolicited receipts received.");
return false;
}
break;
case EthPV63.NODE_DATA:
if (nodeDataRequestManager.outstandingRequests() == 0) {
LOG.warn("Unsolicited node data received.");
return false;
}
break;
case EthPV65.POOLED_TRANSACTIONS:
if (pooledTransactionsRequestManager.outstandingRequests() == 0) {
LOG.warn("Unsolicited pooling transactions received.");
return false;
}
break;
default:
// Nothing to do
}
return true;
return getRequestManager(protocolName, message.getData().getCode())
.map(requestManager -> requestManager.outstandingRequests() != 0)
.orElse(true);
}
/**
* Routes messages originating from this peer to listeners.
*
* @param ethMessage the Eth message to dispatch
* @param protocolName Specific protocol name if needed
*/
void dispatch(final EthMessage ethMessage) {
void dispatch(final EthMessage ethMessage, final String protocolName) {
checkArgument(
ethMessage.getPeer().equals(this), "Mismatched Eth message sent to peer for dispatch");
final int messageCode = ethMessage.getData().getCode();
reputation.resetTimeoutCount(messageCode);
switch (messageCode) {
case EthPV62.BLOCK_HEADERS:
headersRequestManager.dispatchResponse(ethMessage);
break;
case EthPV62.BLOCK_BODIES:
bodiesRequestManager.dispatchResponse(ethMessage);
break;
case EthPV63.RECEIPTS:
receiptsRequestManager.dispatchResponse(ethMessage);
break;
case EthPV63.NODE_DATA:
nodeDataRequestManager.dispatchResponse(ethMessage);
break;
case EthPV65.POOLED_TRANSACTIONS:
pooledTransactionsRequestManager.dispatchResponse(ethMessage);
break;
default:
// Nothing to do
getRequestManager(protocolName, messageCode)
.ifPresentOrElse(
requestManager -> requestManager.dispatchResponse(ethMessage),
() -> {
LOG.trace(
"Message {} not expected has just been received for {} ",
messageCode,
protocolName);
});
}
/**
* Routes messages originating from this peer to listeners.
*
* @param ethMessage the Eth message to dispatch
*/
void dispatch(final EthMessage ethMessage) {
dispatch(ethMessage, protocolName);
}
private Optional<RequestManager> getRequestManager(final String protocolName, final int code) {
if (requestManagers.containsKey(protocolName)) {
final Map<Integer, RequestManager> managers = requestManagers.get(protocolName);
final Integer requestCode = roundMessages.getOrDefault(code, -1);
if (managers.containsKey(requestCode)) {
return Optional.of(managers.get(requestCode));
}
}
return Optional.empty();
}
public Map<Integer, AtomicInteger> timeoutCounts() {
@ -315,11 +380,8 @@ public class EthPeer {
}
void handleDisconnect() {
headersRequestManager.close();
bodiesRequestManager.close();
receiptsRequestManager.close();
nodeDataRequestManager.close();
pooledTransactionsRequestManager.close();
requestManagers.forEach(
(protocolName, map) -> map.forEach((code, requestManager) -> requestManager.close()));
}
public void registerKnownBlock(final Hash hash) {
@ -411,11 +473,10 @@ public class EthPeer {
}
public int outstandingRequests() {
return headersRequestManager.outstandingRequests()
+ bodiesRequestManager.outstandingRequests()
+ receiptsRequestManager.outstandingRequests()
+ nodeDataRequestManager.outstandingRequests()
+ pooledTransactionsRequestManager.outstandingRequests();
return requestManagers.values().stream()
.flatMap(m -> m.values().stream())
.mapToInt(RequestManager::outstandingRequests)
.sum();
}
public long getLastRequestTimestamp() {

@ -76,7 +76,7 @@ public class EthPeers {
pendingRequests::size);
}
void registerConnection(
public void registerConnection(
final PeerConnection peerConnection, final List<PeerValidator> peerValidators) {
final EthPeer peer =
new EthPeer(
@ -89,7 +89,7 @@ public class EthPeers {
connections.putIfAbsent(peerConnection, peer);
}
void registerDisconnect(final PeerConnection connection) {
public void registerDisconnect(final PeerConnection connection) {
final EthPeer peer = connections.remove(connection);
if (peer != null) {
disconnectCallbacks.forEach(callback -> callback.onDisconnect(peer));
@ -127,13 +127,18 @@ public class EthPeers {
return pendingPeerRequest;
}
public void dispatchMessage(final EthPeer peer, final EthMessage ethMessage) {
peer.dispatch(ethMessage);
public void dispatchMessage(
final EthPeer peer, final EthMessage ethMessage, final String protocolName) {
peer.dispatch(ethMessage, protocolName);
if (peer.hasAvailableRequestCapacity()) {
reattemptPendingPeerRequests();
}
}
public void dispatchMessage(final EthPeer peer, final EthMessage ethMessage) {
dispatchMessage(peer, ethMessage, protocolName);
}
private void reattemptPendingPeerRequests() {
synchronized (this) {
pendingRequests.removeIf(PendingPeerRequest::attemptExecution);

@ -265,27 +265,25 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final EthMessage ethMessage = new EthMessage(ethPeer, messageData);
if (!ethPeer.validateReceivedMessage(ethMessage)) {
if (!ethPeer.validateReceivedMessage(ethMessage, getSupportedProtocol())) {
LOG.debug("Unsolicited message received from, disconnecting: {}", ethPeer);
ethPeer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
return;
}
// This will handle responses
ethPeers.dispatchMessage(ethPeer, ethMessage);
ethPeers.dispatchMessage(ethPeer, ethMessage, getSupportedProtocol());
// This will handle requests
Optional<MessageData> maybeResponseData = Optional.empty();
try {
if (EthProtocol.isEth66Compatible(cap) && EthProtocol.requestIdCompatible(code)) {
final Map.Entry<BigInteger, MessageData> requestIdAndEthMessage =
RequestId.unwrapMessageData(ethMessage.getData());
ethMessage.getData().unwrapMessageData();
maybeResponseData =
ethMessages
.dispatch(new EthMessage(ethPeer, requestIdAndEthMessage.getValue()))
.map(
responseData ->
RequestId.wrapMessageData(requestIdAndEthMessage.getKey(), responseData));
.map(responseData -> responseData.wrapMessageData(requestIdAndEthMessage.getKey()));
} else {
maybeResponseData = ethMessages.dispatch(ethMessage);
}
@ -298,7 +296,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
maybeResponseData.ifPresent(
responseData -> {
try {
ethPeer.send(responseData);
ethPeer.send(responseData, getSupportedProtocol());
} catch (final PeerNotConnected __) {
// Peer disconnected before we could respond - nothing to do
}
@ -328,7 +326,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
latestForkId);
try {
LOG.debug("Sending status message to {}.", peer);
peer.send(status);
peer.send(status, getSupportedProtocol());
peer.registerStatusSent();
} catch (final PeerNotConnected peerNotConnected) {
// Nothing to do.

@ -1,48 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import java.math.BigInteger;
import java.util.AbstractMap;
import java.util.Map;
public class RequestId {
public static MessageData wrapMessageData(
final BigInteger requestId, final MessageData messageData) {
final BytesValueRLPOutput rlpOutput = new BytesValueRLPOutput();
rlpOutput.startList();
rlpOutput.writeBigIntegerScalar(requestId);
rlpOutput.writeRaw(messageData.getData());
rlpOutput.endList();
return new RawMessage(messageData.getCode(), rlpOutput.encoded());
}
static Map.Entry<BigInteger, MessageData> unwrapMessageData(final MessageData messageData) {
final RLPInput messageDataRLP = RLP.input(messageData.getData());
messageDataRLP.enterList();
final BigInteger requestId = messageDataRLP.readBigIntegerScalar();
final RLPInput unwrappedMessageRLP = messageDataRLP.readAsRlp();
messageDataRLP.leaveList();
return new AbstractMap.SimpleImmutableEntry<>(
requestId, new RawMessage(messageData.getCode(), unwrappedMessageRLP.raw()));
}
}

@ -34,25 +34,31 @@ public class RequestManager {
private final Map<BigInteger, ResponseStream> responseStreams = new ConcurrentHashMap<>();
private final EthPeer peer;
private final boolean supportsRequestId;
private final String protocolName;
private final AtomicInteger outstandingRequests = new AtomicInteger(0);
public RequestManager(final EthPeer peer, final boolean supportsRequestId) {
public RequestManager(
final EthPeer peer, final boolean supportsRequestId, final String protocolName) {
this.peer = peer;
this.supportsRequestId = supportsRequestId;
this.protocolName = protocolName;
}
public int outstandingRequests() {
return outstandingRequests.get();
}
public String getProtocolName() {
return protocolName;
}
public ResponseStream dispatchRequest(final RequestSender sender, final MessageData messageData)
throws PeerNotConnected {
outstandingRequests.incrementAndGet();
final BigInteger requestId = BigInteger.valueOf(requestIdCounter.getAndIncrement());
final ResponseStream stream = createStream(requestId);
sender.send(
supportsRequestId ? RequestId.wrapMessageData(requestId, messageData) : messageData);
sender.send(supportsRequestId ? messageData.wrapMessageData(requestId) : messageData);
return stream;
}
@ -62,7 +68,7 @@ public class RequestManager {
if (supportsRequestId) {
// If there's a requestId, find the specific stream it belongs to
final Map.Entry<BigInteger, MessageData> requestIdAndEthMessage =
RequestId.unwrapMessageData(ethMessage.getData());
ethMessage.getData().unwrapMessageData();
Optional.ofNullable(responseStreams.get(requestIdAndEthMessage.getKey()))
.ifPresentOrElse(
responseStream -> responseStream.processMessage(requestIdAndEthMessage.getValue()),

@ -0,0 +1,156 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.snap;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthMessage;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractSnapMessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class SnapProtocolManager implements ProtocolManager {
private static final Logger LOG = LogManager.getLogger();
private final List<PeerValidator> peerValidators;
private final List<Capability> supportedCapabilities;
private final EthPeers ethPeers;
private final EthMessages snapMessages;
public SnapProtocolManager(
final List<PeerValidator> peerValidators,
final EthPeers ethPeers,
final EthMessages snapMessages,
final WorldStateArchive worldStateArchive) {
this.peerValidators = peerValidators;
this.ethPeers = ethPeers;
this.snapMessages = snapMessages;
this.supportedCapabilities = calculateCapabilities();
new SnapServer(snapMessages, worldStateArchive);
}
private List<Capability> calculateCapabilities() {
final ImmutableList.Builder<Capability> capabilities = ImmutableList.builder();
capabilities.add(SnapProtocol.SNAP1);
return capabilities.build();
}
@Override
public String getSupportedProtocol() {
return SnapProtocol.NAME;
}
@Override
public List<Capability> getSupportedCapabilities() {
return supportedCapabilities;
}
@Override
public void stop() {}
@Override
public void awaitStop() throws InterruptedException {}
/**
* This function is called by the P2P framework when an "SNAP message has been received.
*
* @param cap The capability under which the message was transmitted.
* @param message The message to be decoded.
*/
@Override
public void processMessage(final Capability cap, final Message message) {
final MessageData messageData = AbstractSnapMessageData.create(message);
final int code = messageData.getCode();
LOG.trace("Process snap message {}, {}", cap, code);
final EthPeer ethPeer = ethPeers.peer(message.getConnection());
if (ethPeer == null) {
LOG.debug(
"Ignoring message received from unknown peer connection: " + message.getConnection());
return;
}
final EthMessage ethMessage = new EthMessage(ethPeer, messageData);
if (!ethPeer.validateReceivedMessage(ethMessage, getSupportedProtocol())) {
LOG.debug("Unsolicited message received from, disconnecting: {}", ethPeer);
ethPeer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
return;
}
// This will handle responses
ethPeers.dispatchMessage(ethPeer, ethMessage, getSupportedProtocol());
// This will handle requests
Optional<MessageData> maybeResponseData = Optional.empty();
try {
final Map.Entry<BigInteger, MessageData> requestIdAndEthMessage =
ethMessage.getData().unwrapMessageData();
maybeResponseData =
snapMessages
.dispatch(new EthMessage(ethPeer, requestIdAndEthMessage.getValue()))
.map(responseData -> responseData.wrapMessageData(requestIdAndEthMessage.getKey()));
} catch (final RLPException e) {
LOG.debug(
"Received malformed message {} , disconnecting: {}", messageData.getData(), ethPeer, e);
ethPeer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
}
maybeResponseData.ifPresent(
responseData -> {
try {
ethPeer.send(responseData, getSupportedProtocol());
} catch (final PeerConnection.PeerNotConnected error) {
// Peer disconnected before we could respond - nothing to do
LOG.trace(
"Peer disconnected before we could respond - nothing to do " + error.getMessage());
}
});
}
@Override
public void handleNewConnection(final PeerConnection connection) {
ethPeers.registerConnection(connection, peerValidators);
}
@Override
public void handleDisconnect(
final PeerConnection connection,
final DisconnectReason reason,
final boolean initiatedByPeer) {
ethPeers.registerDisconnect(connection);
LOG.debug(
"Disconnect - {} - {} - {} - {} peers left",
initiatedByPeer ? "Inbound" : "Outbound",
reason,
connection.getPeerInfo(),
ethPeers.peerCount());
}
}

@ -0,0 +1,79 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.snap;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.ByteCodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodes;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.util.HashMap;
import kotlin.collections.ArrayDeque;
@SuppressWarnings("unused")
class SnapServer {
private final EthMessages snapMessages;
private final WorldStateArchive worldStateArchive;
SnapServer(final EthMessages snapMessages, final WorldStateArchive worldStateArchive) {
this.snapMessages = snapMessages;
this.worldStateArchive = worldStateArchive;
this.registerResponseConstructors();
}
private void registerResponseConstructors() {
snapMessages.registerResponseConstructor(
SnapV1.GET_ACCOUNT_RANGE,
messageData -> constructGetAccountRangeResponse(worldStateArchive, messageData));
snapMessages.registerResponseConstructor(
SnapV1.GET_STORAGE_RANGE,
messageData -> constructGetStorageRangeResponse(worldStateArchive, messageData));
snapMessages.registerResponseConstructor(
SnapV1.GET_BYTECODES,
messageData -> constructGetBytecodesResponse(worldStateArchive, messageData));
snapMessages.registerResponseConstructor(
SnapV1.GET_TRIE_NODES,
messageData -> constructGetTrieNodesResponse(worldStateArchive, messageData));
}
private MessageData constructGetAccountRangeResponse(
final WorldStateArchive worldStateArchive, final MessageData message) {
// TODO implement
return AccountRangeMessage.create(new HashMap<>(), new ArrayDeque<>());
}
private MessageData constructGetStorageRangeResponse(
final WorldStateArchive worldStateArchive, final MessageData message) {
// TODO implement
return StorageRangeMessage.create(new ArrayDeque<>(), new ArrayDeque<>());
}
private MessageData constructGetBytecodesResponse(
final WorldStateArchive worldStateArchive, final MessageData message) {
// TODO implement
return ByteCodesMessage.create(new ArrayDeque<>());
}
private MessageData constructGetTrieNodesResponse(
final WorldStateArchive worldStateArchive, final MessageData message) {
return TrieNodes.create(new ArrayDeque<>());
}
}

@ -0,0 +1,139 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.messages.snap;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractSnapMessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import java.math.BigInteger;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import com.google.common.collect.Maps;
import kotlin.collections.ArrayDeque;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.immutables.value.Value;
public final class AccountRangeMessage extends AbstractSnapMessageData {
public static AccountRangeMessage readFrom(final MessageData message) {
if (message instanceof AccountRangeMessage) {
return (AccountRangeMessage) message;
}
final int code = message.getCode();
if (code != SnapV1.ACCOUNT_RANGE) {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a AccountRangeMessage.", code));
}
return new AccountRangeMessage(message.getData());
}
public static AccountRangeMessage create(
final Map<Bytes32, Bytes> accounts, final ArrayDeque<Bytes> proof) {
return create(Optional.empty(), accounts, proof);
}
public static AccountRangeMessage create(
final Optional<BigInteger> requestId,
final Map<Bytes32, Bytes> accounts,
final ArrayDeque<Bytes> proof) {
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
requestId.ifPresent(tmp::writeBigIntegerScalar);
tmp.writeList(
accounts.entrySet(),
(entry, rlpOutput) -> {
rlpOutput.startList();
rlpOutput.writeBytes(entry.getKey());
rlpOutput.writeRLPBytes(entry.getValue());
rlpOutput.endList();
});
tmp.writeList(proof, (bytes, rlpOutput) -> rlpOutput.writeBytes(bytes));
tmp.endList();
return new AccountRangeMessage(tmp.encoded());
}
public AccountRangeMessage(final Bytes data) {
super(data);
}
@Override
protected Bytes wrap(final BigInteger requestId) {
final AccountRangeData accountData = accountData(false);
return create(Optional.of(requestId), accountData.accounts(), accountData.proofs()).getData();
}
@Override
public int getCode() {
return SnapV1.ACCOUNT_RANGE;
}
public AccountRangeData accountData(final boolean withRequestId) {
final TreeMap<Bytes32, Bytes> accounts = new TreeMap<>();
final ArrayDeque<Bytes> proofs = new ArrayDeque<>();
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
if (withRequestId) input.skipNext();
input
.readList(
rlpInput -> {
rlpInput.enterList();
Map.Entry<Bytes32, Bytes> entry =
Maps.immutableEntry(rlpInput.readBytes32(), toFullAccount(rlpInput.readAsRlp()));
rlpInput.leaveList();
return entry;
})
.forEach(entry -> accounts.put(entry.getKey(), entry.getValue()));
input.enterList();
while (!input.isEndOfCurrentList()) {
proofs.add(input.readBytes());
}
input.leaveList();
input.leaveList();
return ImmutableAccountRangeData.builder().accounts(accounts).proofs(proofs).build();
}
private Bytes toFullAccount(final RLPInput rlpInput) {
final StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(rlpInput);
final BytesValueRLPOutput rlpOutput = new BytesValueRLPOutput();
rlpOutput.startList();
rlpOutput.writeLongScalar(accountValue.getNonce()); // nonce
rlpOutput.writeUInt256Scalar(accountValue.getBalance()); // balance
rlpOutput.writeBytes(accountValue.getStorageRoot());
rlpOutput.writeBytes(accountValue.getCodeHash());
rlpOutput.endList();
return rlpOutput.encoded();
}
@Value.Immutable
public interface AccountRangeData {
TreeMap<Bytes32, Bytes> accounts();
ArrayDeque<Bytes> proofs();
}
}

@ -0,0 +1,93 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.messages.snap;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractSnapMessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import java.math.BigInteger;
import java.util.List;
import java.util.Optional;
import kotlin.collections.ArrayDeque;
import org.apache.tuweni.bytes.Bytes;
import org.immutables.value.Value;
public final class ByteCodesMessage extends AbstractSnapMessageData {
public static ByteCodesMessage readFrom(final MessageData message) {
if (message instanceof ByteCodesMessage) {
return (ByteCodesMessage) message;
}
final int code = message.getCode();
if (code != SnapV1.BYTECODES) {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a ByteCodesMessage.", code));
}
return new ByteCodesMessage(message.getData());
}
public static ByteCodesMessage create(final List<Bytes> codes) {
return create(Optional.empty(), codes);
}
public static ByteCodesMessage create(
final Optional<BigInteger> requestId, final List<Bytes> codes) {
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
requestId.ifPresent(tmp::writeBigIntegerScalar);
tmp.writeList(codes, (code, rlpOutput) -> rlpOutput.writeBytes(code));
tmp.endList();
return new ByteCodesMessage(tmp.encoded());
}
public ByteCodesMessage(final Bytes data) {
super(data);
}
@Override
protected Bytes wrap(final BigInteger requestId) {
final ByteCodes bytecodes = bytecodes(false);
return create(Optional.of(requestId), bytecodes.codes()).getData();
}
@Override
public int getCode() {
return SnapV1.BYTECODES;
}
public ByteCodes bytecodes(final boolean withRequestId) {
final ArrayDeque<Bytes> codes = new ArrayDeque<>();
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
if (withRequestId) input.skipNext();
input.enterList();
while (!input.isEndOfCurrentList()) {
codes.add(input.readBytes());
}
input.leaveList();
input.leaveList();
return ImmutableByteCodes.builder().codes(codes).build();
}
@Value.Immutable
public interface ByteCodes {
ArrayDeque<Bytes> codes();
}
}

@ -0,0 +1,114 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.messages.snap;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractSnapMessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import java.math.BigInteger;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.immutables.value.Value;
public final class GetAccountRangeMessage extends AbstractSnapMessageData {
public static GetAccountRangeMessage readFrom(final MessageData message) {
if (message instanceof GetAccountRangeMessage) {
return (GetAccountRangeMessage) message;
}
final int code = message.getCode();
if (code != SnapV1.GET_ACCOUNT_RANGE) {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a GetAccountRangeMessage.", code));
}
return new GetAccountRangeMessage(message.getData());
}
public static GetAccountRangeMessage create(
final Hash worldStateRootHash,
final Bytes32 startKeyHash,
final Bytes32 endKeyHash,
final BigInteger responseBytes) {
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
tmp.writeBytes(worldStateRootHash);
tmp.writeBytes(startKeyHash);
tmp.writeBytes(endKeyHash);
tmp.writeBigIntegerScalar(responseBytes);
tmp.endList();
return new GetAccountRangeMessage(tmp.encoded());
}
public GetAccountRangeMessage(final Bytes data) {
super(data);
}
@Override
protected Bytes wrap(final BigInteger requestId) {
final Range range = range(false);
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
tmp.writeBigIntegerScalar(requestId);
tmp.writeBytes(range.worldStateRootHash());
tmp.writeBytes(range.startKeyHash());
tmp.writeBytes(range.endKeyHash());
tmp.writeBigIntegerScalar(range.responseBytes());
tmp.endList();
return tmp.encoded();
}
@Override
public int getCode() {
return SnapV1.GET_ACCOUNT_RANGE;
}
public Range range(final boolean withRequestId) {
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
if (withRequestId) input.skipNext();
final Hash worldStateRootHash = Hash.wrap(Bytes32.wrap(input.readBytes32()));
final ImmutableRange range =
ImmutableRange.builder()
.worldStateRootHash(getOverrideStateRoot().orElse(worldStateRootHash))
.startKeyHash(Hash.wrap(Bytes32.wrap(input.readBytes32())))
.endKeyHash(Hash.wrap(Bytes32.wrap(input.readBytes32())))
.responseBytes(input.readBigIntegerScalar())
.build();
input.leaveList();
return range;
}
@Override
public String toString() {
return "GetAccountRangeMessage{" + "data=" + data + '}';
}
@Value.Immutable
public interface Range {
Hash worldStateRootHash();
Hash startKeyHash();
Hash endKeyHash();
BigInteger responseBytes();
}
}

@ -0,0 +1,112 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.messages.snap;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractSnapMessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import java.math.BigInteger;
import java.util.Optional;
import kotlin.collections.ArrayDeque;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.immutables.value.Value;
public final class GetByteCodesMessage extends AbstractSnapMessageData {
final Optional<ArrayDeque<Bytes32>> accountHashes;
public static GetByteCodesMessage readFrom(final MessageData message) {
if (message instanceof GetByteCodesMessage) {
return (GetByteCodesMessage) message;
}
final int code = message.getCode();
if (code != SnapV1.GET_BYTECODES) {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a GetByteCodesMessage.", code));
}
return new GetByteCodesMessage(Optional.empty(), message.getData());
}
public static GetByteCodesMessage create(
final Optional<ArrayDeque<Bytes32>> accountHashes,
final ArrayDeque<Bytes32> codeHashes,
final BigInteger responseBytes) {
return create(Optional.empty(), accountHashes, codeHashes, responseBytes);
}
public static GetByteCodesMessage create(
final Optional<BigInteger> requestId,
final Optional<ArrayDeque<Bytes32>> accountHashes,
final ArrayDeque<Bytes32> codeHashes,
final BigInteger responseBytes) {
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
requestId.ifPresent(tmp::writeBigIntegerScalar);
tmp.writeList(codeHashes, (hash, rlpOutput) -> rlpOutput.writeBytes(hash));
tmp.writeBigIntegerScalar(responseBytes);
tmp.endList();
return new GetByteCodesMessage(accountHashes, tmp.encoded());
}
public GetByteCodesMessage(final Optional<ArrayDeque<Bytes32>> accountHashes, final Bytes data) {
super(data);
this.accountHashes = accountHashes;
}
@Override
protected Bytes wrap(final BigInteger requestId) {
final CodeHashes request = codeHashes(false);
return create(Optional.of(requestId), accountHashes, request.hashes(), request.responseBytes())
.getData();
}
@Override
public int getCode() {
return SnapV1.GET_BYTECODES;
}
public CodeHashes codeHashes(final boolean withRequestId) {
final ArrayDeque<Bytes32> hashes = new ArrayDeque<>();
final BigInteger responseBytes;
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
if (withRequestId) input.skipNext();
input.enterList();
while (!input.isEndOfCurrentList()) {
hashes.add(input.readBytes32());
}
input.leaveList();
responseBytes = input.readBigIntegerScalar();
input.leaveList();
return ImmutableCodeHashes.builder().hashes(hashes).responseBytes(responseBytes).build();
}
public Optional<ArrayDeque<Bytes32>> getAccountHashes() {
return accountHashes;
}
@Value.Immutable
public interface CodeHashes {
ArrayDeque<Bytes32> hashes();
BigInteger responseBytes();
}
}

@ -0,0 +1,171 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.messages.snap;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractSnapMessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import java.math.BigInteger;
import java.util.Optional;
import javax.annotation.Nullable;
import kotlin.collections.ArrayDeque;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.immutables.value.Value;
public final class GetStorageRangeMessage extends AbstractSnapMessageData {
private final Optional<ArrayDeque<Bytes32>> storageRoots;
public static GetStorageRangeMessage readFrom(final MessageData message) {
if (message instanceof GetStorageRangeMessage) {
return (GetStorageRangeMessage) message;
}
final int code = message.getCode();
if (code != SnapV1.GET_STORAGE_RANGE) {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a GetStorageRangeMessage.", code));
}
return new GetStorageRangeMessage(message.getData());
}
public static GetStorageRangeMessage create(
final Hash worldStateRootHash,
final ArrayDeque<Bytes32> accountHashes,
final Optional<ArrayDeque<Bytes32>> storageRoots,
final Bytes32 startKeyHash,
final Bytes32 endKeyHash,
final BigInteger responseBytes) {
return create(
Optional.empty(),
worldStateRootHash,
accountHashes,
storageRoots,
startKeyHash,
endKeyHash,
responseBytes);
}
public static GetStorageRangeMessage create(
final Optional<BigInteger> requestId,
final Hash worldStateRootHash,
final ArrayDeque<Bytes32> accountHashes,
final Optional<ArrayDeque<Bytes32>> storageRoots,
final Bytes32 startKeyHash,
final Bytes32 endKeyHash,
final BigInteger responseBytes) {
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
requestId.ifPresent(tmp::writeBigIntegerScalar);
tmp.writeBytes(worldStateRootHash);
tmp.writeList(accountHashes, (hash, rlpOutput) -> rlpOutput.writeBytes(hash));
tmp.writeBytes(startKeyHash);
tmp.writeBytes(endKeyHash);
tmp.writeBigIntegerScalar(responseBytes);
tmp.endList();
return new GetStorageRangeMessage(tmp.encoded(), storageRoots);
}
public GetStorageRangeMessage(final Bytes data) {
this(data, Optional.empty());
}
public GetStorageRangeMessage(
final Bytes data, final Optional<ArrayDeque<Bytes32>> storageRoots) {
super(data);
this.storageRoots = storageRoots;
}
public Optional<ArrayDeque<Bytes32>> getStorageRoots() {
return storageRoots;
}
@Override
protected Bytes wrap(final BigInteger requestId) {
final StorageRange range = range(false);
return create(
Optional.of(requestId),
range.worldStateRootHash(),
range.hashes(),
storageRoots,
range.startKeyHash(),
range.endKeyHash(),
range.responseBytes())
.getData();
}
@Override
public int getCode() {
return SnapV1.GET_STORAGE_RANGE;
}
public StorageRange range(final boolean withRequestId) {
final ArrayDeque<Bytes32> hashes = new ArrayDeque<>();
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
if (withRequestId) input.skipNext();
final Hash worldStateRootHash = Hash.wrap(Bytes32.wrap(input.readBytes32()));
final ImmutableStorageRange.Builder range =
ImmutableStorageRange.builder()
.worldStateRootHash(getOverrideStateRoot().orElse(worldStateRootHash));
input.enterList();
while (!input.isEndOfCurrentList()) {
hashes.add(input.readBytes32());
}
range.hashes(hashes);
input.leaveList();
if (input.nextIsNull()) {
input.skipNext();
range.startKeyHash(Hash.ZERO);
} else {
range.startKeyHash(Hash.wrap(Bytes32.wrap(input.readBytes32())));
}
if (input.nextIsNull()) {
input.skipNext();
range.endKeyHash(Hash.ZERO);
} else {
range.endKeyHash(Hash.wrap(Bytes32.wrap(input.readBytes32())));
}
range.responseBytes(input.readBigIntegerScalar());
input.leaveList();
return range.build();
}
@Override
public String toString() {
return "GetStorageRangeMessage{" + "storageRoots=" + storageRoots + ", data=" + data + '}';
}
@Value.Immutable
public interface StorageRange {
Hash worldStateRootHash();
ArrayDeque<Bytes32> hashes();
Hash startKeyHash();
@Nullable
Hash endKeyHash();
BigInteger responseBytes();
}
}

@ -0,0 +1,120 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.messages.snap;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractSnapMessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import java.math.BigInteger;
import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.immutables.value.Value;
public final class GetTrieNodes extends AbstractSnapMessageData {
public static GetTrieNodes readFrom(final MessageData message) {
if (message instanceof GetTrieNodes) {
return (GetTrieNodes) message;
}
final int code = message.getCode();
if (code != SnapV1.GET_TRIE_NODES) {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a GetTrieNodes.", code));
}
return new GetTrieNodes(message.getData());
}
/*public static GetTrieNodes createWithRequest(
final Hash worldStateRootHash,
final TrieNodeDataRequest request,
final BigInteger responseBytes) {
return create(Optional.empty(), worldStateRootHash, request.getPaths(), responseBytes);
}*/
public static GetTrieNodes create(
final Hash worldStateRootHash,
final List<List<Bytes>> requests,
final BigInteger responseBytes) {
return create(Optional.empty(), worldStateRootHash, requests, responseBytes);
}
public static GetTrieNodes create(
final Optional<BigInteger> requestId,
final Hash worldStateRootHash,
final List<List<Bytes>> paths,
final BigInteger responseBytes) {
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
requestId.ifPresent(tmp::writeBigIntegerScalar);
tmp.writeBytes(worldStateRootHash);
tmp.writeList(
paths,
(path, rlpOutput) ->
rlpOutput.writeList(path, (b, subRlpOutput) -> subRlpOutput.writeBytes(b)));
tmp.writeBigIntegerScalar(responseBytes);
tmp.endList();
return new GetTrieNodes(tmp.encoded());
}
public GetTrieNodes(final Bytes data) {
super(data);
}
@Override
protected Bytes wrap(final BigInteger requestId) {
final TrieNodesPaths paths = paths(false);
return create(
Optional.of(requestId),
paths.worldStateRootHash(),
paths.paths(),
paths.responseBytes())
.getData();
}
@Override
public int getCode() {
return SnapV1.GET_TRIE_NODES;
}
public TrieNodesPaths paths(final boolean withRequestId) {
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
if (withRequestId) input.skipNext();
final ImmutableTrieNodesPaths.Builder paths =
ImmutableTrieNodesPaths.builder()
.worldStateRootHash(Hash.wrap(Bytes32.wrap(input.readBytes32())))
.paths(input.readList(rlpInput -> rlpInput.readList(RLPInput::readBytes)))
.responseBytes(input.readBigIntegerScalar());
input.leaveList();
return paths.build();
}
@Value.Immutable
public interface TrieNodesPaths {
Hash worldStateRootHash();
List<List<Bytes>> paths();
BigInteger responseBytes();
}
}

@ -0,0 +1,31 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.messages.snap;
public final class SnapV1 {
public static final int GET_ACCOUNT_RANGE = 0x00;
public static final int ACCOUNT_RANGE = 0x01;
public static final int GET_STORAGE_RANGE = 0x02;
public static final int STORAGE_RANGE = 0x03;
public static final int GET_BYTECODES = 0x04;
public static final int BYTECODES = 0x05;
public static final int GET_TRIE_NODES = 0x06;
public static final int TRIE_NODES = 0x07;
private SnapV1() {
// Holder for constants only
}
}

@ -0,0 +1,127 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.messages.snap;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractSnapMessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import java.math.BigInteger;
import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import kotlin.collections.ArrayDeque;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.immutables.value.Value;
public final class StorageRangeMessage extends AbstractSnapMessageData {
public static StorageRangeMessage readFrom(final MessageData message) {
if (message instanceof StorageRangeMessage) {
return (StorageRangeMessage) message;
}
final int code = message.getCode();
if (code != SnapV1.STORAGE_RANGE) {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a StorageRangeMessage.", code));
}
return new StorageRangeMessage(message.getData());
}
public static StorageRangeMessage create(
final ArrayDeque<TreeMap<Bytes32, Bytes>> slots, final List<Bytes> proof) {
return create(Optional.empty(), slots, proof);
}
public static StorageRangeMessage create(
final Optional<BigInteger> requestId,
final ArrayDeque<TreeMap<Bytes32, Bytes>> slots,
final List<Bytes> proof) {
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
requestId.ifPresent(tmp::writeBigIntegerScalar);
tmp.writeList(
slots,
(accountList, accountRlpOutput) ->
accountRlpOutput.writeList(
accountList.entrySet(),
(entry, slotRlpOutput) -> {
slotRlpOutput.startList();
slotRlpOutput.writeBytes(entry.getKey());
slotRlpOutput.writeBytes(entry.getValue());
slotRlpOutput.endList();
}));
tmp.writeList(proof, (bytes, rlpOutput) -> rlpOutput.writeBytes(bytes));
tmp.endList();
return new StorageRangeMessage(tmp.encoded());
}
public StorageRangeMessage(final Bytes data) {
super(data);
}
@Override
protected Bytes wrap(final BigInteger requestId) {
final SlotRangeData slotsData = slotsData(false);
return create(Optional.of(requestId), slotsData.slots(), slotsData.proofs()).getData();
}
@Override
public int getCode() {
return SnapV1.STORAGE_RANGE;
}
public SlotRangeData slotsData(final boolean withRequestId) {
final ArrayDeque<TreeMap<Bytes32, Bytes>> slots = new ArrayDeque<>();
final ArrayDeque<Bytes> proofs = new ArrayDeque<>();
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
if (withRequestId) input.skipNext();
input.readList(
accountRlpInput -> {
slots.add(new TreeMap<>());
return accountRlpInput.readList(
slotRlpInput -> {
slotRlpInput.enterList();
slots.last().put(slotRlpInput.readBytes32(), slotRlpInput.readBytes());
slotRlpInput.leaveList();
return Void.TYPE; // we don't need the response
});
});
input.enterList();
while (!input.isEndOfCurrentList()) {
proofs.add(input.readBytes());
}
input.leaveList();
input.leaveList();
return ImmutableSlotRangeData.builder().slots(slots).proofs(proofs).build();
}
@Value.Immutable
public interface SlotRangeData {
ArrayDeque<TreeMap<Bytes32, Bytes>> slots();
ArrayDeque<Bytes> proofs();
}
}

@ -0,0 +1,85 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.messages.snap;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractSnapMessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import java.math.BigInteger;
import java.util.List;
import java.util.Optional;
import kotlin.collections.ArrayDeque;
import org.apache.tuweni.bytes.Bytes;
public final class TrieNodes extends AbstractSnapMessageData {
public static TrieNodes readFrom(final MessageData message) {
if (message instanceof TrieNodes) {
return (TrieNodes) message;
}
final int code = message.getCode();
if (code != SnapV1.TRIE_NODES) {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a TrieNodes.", code));
}
return new TrieNodes(message.getData());
}
public static TrieNodes create(final List<Bytes> nodes) {
return create(Optional.empty(), nodes);
}
public static TrieNodes create(final Optional<BigInteger> requestId, final List<Bytes> nodes) {
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
requestId.ifPresent(tmp::writeBigIntegerScalar);
tmp.writeList(nodes, (node, rlpOutput) -> rlpOutput.writeBytes(node));
tmp.endList();
return new TrieNodes(tmp.encoded());
}
public TrieNodes(final Bytes data) {
super(data);
}
@Override
protected Bytes wrap(final BigInteger requestId) {
final List<Bytes> nodes = nodes(false);
return create(Optional.of(requestId), nodes).getData();
}
@Override
public int getCode() {
return SnapV1.TRIE_NODES;
}
public ArrayDeque<Bytes> nodes(final boolean withRequestId) {
final ArrayDeque<Bytes> trieNodes = new ArrayDeque<>();
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
if (withRequestId) input.skipNext();
input.enterList();
while (!input.isEndOfCurrentList()) {
trieNodes.add(input.readBytes());
}
input.leaveList();
input.leaveList();
return trieNodes;
}
}

@ -15,7 +15,6 @@
package org.hyperledger.besu.ethereum.eth.manager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.eth.manager.RequestId.unwrapMessageData;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
@ -45,7 +44,8 @@ public class RequestManagerTest {
public void dispatchesMessagesReceivedAfterRegisteringCallback() throws Exception {
for (final boolean supportsRequestId : List.of(true, false)) {
final EthPeer peer = createPeer();
final RequestManager requestManager = new RequestManager(peer, supportsRequestId);
final RequestManager requestManager =
new RequestManager(peer, supportsRequestId, EthProtocol.NAME);
final AtomicInteger sendCount = new AtomicInteger(0);
final RequestManager.RequestSender sender = __ -> sendCount.incrementAndGet();
@ -81,7 +81,8 @@ public class RequestManagerTest {
public void dispatchesMessagesReceivedBeforeRegisteringCallback() throws Exception {
for (final boolean supportsRequestId : List.of(true, false)) {
final EthPeer peer = createPeer();
final RequestManager requestManager = new RequestManager(peer, supportsRequestId);
final RequestManager requestManager =
new RequestManager(peer, supportsRequestId, EthProtocol.NAME);
final AtomicInteger sendCount = new AtomicInteger(0);
final RequestManager.RequestSender sender = __ -> sendCount.incrementAndGet();
@ -116,7 +117,7 @@ public class RequestManagerTest {
@Test
public void dispatchesMessagesReceivedBeforeAndAfterRegisteringCallback() throws Exception {
final EthPeer peer = createPeer();
final RequestManager requestManager = new RequestManager(peer, false);
final RequestManager requestManager = new RequestManager(peer, false, EthProtocol.NAME);
final AtomicInteger sendCount = new AtomicInteger(0);
final RequestManager.RequestSender sender = __ -> sendCount.incrementAndGet();
@ -161,7 +162,7 @@ public class RequestManagerTest {
@Test
public void dispatchesMessagesToMultipleStreamsIfNoRequestId() throws Exception {
final EthPeer peer = createPeer();
final RequestManager requestManager = new RequestManager(peer, false);
final RequestManager requestManager = new RequestManager(peer, false, EthProtocol.NAME);
final AtomicInteger sendCount = new AtomicInteger(0);
final RequestManager.RequestSender sender = __ -> sendCount.incrementAndGet();
@ -228,7 +229,7 @@ public class RequestManagerTest {
@Test
public void dispatchesMessagesToSingleStreamIfRequestId() throws Exception {
final EthPeer peer = createPeer();
final RequestManager requestManager = new RequestManager(peer, true);
final RequestManager requestManager = new RequestManager(peer, true, EthProtocol.NAME);
final AtomicInteger sendCount = new AtomicInteger(0);
final RequestManager.RequestSender sender = __ -> sendCount.incrementAndGet();
@ -290,7 +291,7 @@ public class RequestManagerTest {
assertThat(response)
.isEqualTo(
(supportsRequestId
? unwrapMessageData(mockMessage.getData()).getValue()
? mockMessage.getData().unwrapMessageData().getValue()
: mockMessage.getData()));
}

@ -22,7 +22,6 @@ import static org.hyperledger.besu.ethereum.core.Transaction.REPLAY_PROTECTED_V_
import static org.hyperledger.besu.ethereum.core.Transaction.REPLAY_PROTECTED_V_MIN;
import static org.hyperledger.besu.ethereum.core.Transaction.REPLAY_UNPROTECTED_V_BASE;
import static org.hyperledger.besu.ethereum.core.Transaction.REPLAY_UNPROTECTED_V_BASE_PLUS_1;
import static org.hyperledger.besu.ethereum.eth.manager.RequestId.wrapMessageData;
import org.hyperledger.besu.crypto.SECP256K1;
import org.hyperledger.besu.datatypes.Address;
@ -54,7 +53,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.tuweni.bytes.Bytes;
import org.junit.Test;
public class RequestIdMessageTest {
public class MessageWrapperTest {
private static final ObjectMapper objectMapper = new ObjectMapper();
@ -62,16 +61,14 @@ public class RequestIdMessageTest {
public void GetBlockHeaders() throws IOException {
final var testJson = parseTestFile("GetBlockHeadersPacket66.json");
final Bytes expected = Bytes.fromHexString(testJson.get("rlp").asText());
final Bytes actual =
wrapMessageData(
BigInteger.valueOf(1111),
GetBlockHeadersMessage.create(
Hash.fromHexString(
"0x00000000000000000000000000000000000000000000000000000000deadc0de"),
5,
5,
false))
.getData();
final GetBlockHeadersMessage getBlockHeadersMessage =
GetBlockHeadersMessage.create(
Hash.fromHexString(
"0x00000000000000000000000000000000000000000000000000000000deadc0de"),
5,
5,
false);
final Bytes actual = getBlockHeadersMessage.wrapMessageData(BigInteger.valueOf(1111)).getData();
assertThat(actual).isEqualTo(expected);
}
@ -79,9 +76,9 @@ public class RequestIdMessageTest {
public void GetBlockHeaders1() throws IOException {
final var testJson = parseTestFile("GetBlockHeadersPacket66-1.json");
final Bytes expected = Bytes.fromHexString(testJson.get("rlp").asText());
final Bytes actual =
wrapMessageData(BigInteger.valueOf(1111), GetBlockHeadersMessage.create(9999, 5, 5, false))
.getData();
final GetBlockHeadersMessage getBlockHeadersMessage =
GetBlockHeadersMessage.create(9999, 5, 5, false);
final Bytes actual = getBlockHeadersMessage.wrapMessageData(BigInteger.valueOf(1111)).getData();
assertThat(actual).isEqualTo(expected);
}
@ -89,15 +86,12 @@ public class RequestIdMessageTest {
public void BlockHeaders() throws IOException {
final var testJson = parseTestFile("BlockHeadersPacket66.json");
final Bytes expected = Bytes.fromHexString(testJson.get("rlp").asText());
final Bytes actual =
wrapMessageData(
BigInteger.valueOf(1111),
BlockHeadersMessage.create(
Arrays.asList(
objectMapper.treeToValue(
testJson.get("data").get("BlockHeadersPacket"),
TestBlockHeader[].class))))
.getData();
final BlockHeadersMessage blockHeadersMessage =
BlockHeadersMessage.create(
Arrays.asList(
objectMapper.treeToValue(
testJson.get("data").get("BlockHeadersPacket"), TestBlockHeader[].class)));
final Bytes actual = blockHeadersMessage.wrapMessageData(BigInteger.valueOf(1111)).getData();
assertThat(actual).isEqualTo(expected);
}
@ -105,16 +99,14 @@ public class RequestIdMessageTest {
public void GetBlockBodies() throws IOException {
final var testJson = parseTestFile("GetBlockBodiesPacket66.json");
final Bytes expected = Bytes.fromHexString(testJson.get("rlp").asText());
final Bytes actual =
wrapMessageData(
BigInteger.valueOf(1111),
GetBlockBodiesMessage.create(
Stream.of(
"0x00000000000000000000000000000000000000000000000000000000deadc0de",
"0x00000000000000000000000000000000000000000000000000000000feedbeef")
.map(Hash::fromHexString)
.collect(toUnmodifiableList())))
.getData();
final GetBlockBodiesMessage getBlockBodiesMessage =
GetBlockBodiesMessage.create(
Stream.of(
"0x00000000000000000000000000000000000000000000000000000000deadc0de",
"0x00000000000000000000000000000000000000000000000000000000feedbeef")
.map(Hash::fromHexString)
.collect(toUnmodifiableList()));
final Bytes actual = getBlockBodiesMessage.wrapMessageData(BigInteger.valueOf(1111)).getData();
assertThat(actual).isEqualTo(expected);
}
@ -122,14 +114,12 @@ public class RequestIdMessageTest {
public void BlockBodies() throws IOException {
final var testJson = parseTestFile("BlockBodiesPacket66.json");
final Bytes expected = Bytes.fromHexString(testJson.get("rlp").asText());
final Bytes actual =
wrapMessageData(
BigInteger.valueOf(1111),
BlockBodiesMessage.create(
Arrays.asList(
objectMapper.treeToValue(
testJson.get("data").get("BlockBodiesPacket"), TestBlockBody[].class))))
.getData();
final BlockBodiesMessage blockBodiesMessage =
BlockBodiesMessage.create(
Arrays.asList(
objectMapper.treeToValue(
testJson.get("data").get("BlockBodiesPacket"), TestBlockBody[].class)));
final Bytes actual = blockBodiesMessage.wrapMessageData(BigInteger.valueOf(1111)).getData();
assertThat(actual).isEqualTo(expected);
}
@ -137,16 +127,14 @@ public class RequestIdMessageTest {
public void GetNodeData() throws IOException {
final var testJson = parseTestFile("GetNodeDataPacket66.json");
final Bytes expected = Bytes.fromHexString(testJson.get("rlp").asText());
final Bytes actual =
wrapMessageData(
BigInteger.valueOf(1111),
GetNodeDataMessage.create(
Stream.of(
"0x00000000000000000000000000000000000000000000000000000000deadc0de",
"0x00000000000000000000000000000000000000000000000000000000feedbeef")
.map(Hash::fromHexString)
.collect(toUnmodifiableList())))
.getData();
final GetNodeDataMessage getNodeDataMessage =
GetNodeDataMessage.create(
Stream.of(
"0x00000000000000000000000000000000000000000000000000000000deadc0de",
"0x00000000000000000000000000000000000000000000000000000000feedbeef")
.map(Hash::fromHexString)
.collect(toUnmodifiableList()));
final Bytes actual = getNodeDataMessage.wrapMessageData(BigInteger.valueOf(1111)).getData();
assertThat(actual).isEqualTo(expected);
}
@ -154,14 +142,12 @@ public class RequestIdMessageTest {
public void NodeData() throws IOException {
final var testJson = parseTestFile("NodeDataPacket66.json");
final Bytes expected = Bytes.fromHexString(testJson.get("rlp").asText());
final Bytes actual =
wrapMessageData(
BigInteger.valueOf(1111),
NodeDataMessage.create(
Stream.of("0xdeadc0de", "0xfeedbeef")
.map(Bytes::fromHexString)
.collect(toUnmodifiableList())))
.getData();
final NodeDataMessage nodeDataMessage =
NodeDataMessage.create(
Stream.of("0xdeadc0de", "0xfeedbeef")
.map(Bytes::fromHexString)
.collect(toUnmodifiableList()));
final Bytes actual = nodeDataMessage.wrapMessageData(BigInteger.valueOf(1111)).getData();
assertThat(actual).isEqualTo(expected);
}
@ -169,16 +155,14 @@ public class RequestIdMessageTest {
public void GetReceipts() throws IOException {
final var testJson = parseTestFile("GetReceiptsPacket66.json");
final Bytes expected = Bytes.fromHexString(testJson.get("rlp").asText());
final Bytes actual =
wrapMessageData(
BigInteger.valueOf(1111),
GetReceiptsMessage.create(
Stream.of(
"0x00000000000000000000000000000000000000000000000000000000deadc0de",
"0x00000000000000000000000000000000000000000000000000000000feedbeef")
.map(Hash::fromHexString)
.collect(toUnmodifiableList())))
.getData();
final GetReceiptsMessage getReceiptsMessage =
GetReceiptsMessage.create(
Stream.of(
"0x00000000000000000000000000000000000000000000000000000000deadc0de",
"0x00000000000000000000000000000000000000000000000000000000feedbeef")
.map(Hash::fromHexString)
.collect(toUnmodifiableList()));
final Bytes actual = getReceiptsMessage.wrapMessageData(BigInteger.valueOf(1111)).getData();
assertThat(actual).isEqualTo(expected);
}
@ -186,35 +170,33 @@ public class RequestIdMessageTest {
public void Receipts() throws IOException {
final var testJson = parseTestFile("ReceiptsPacket66.json");
final Bytes expected = Bytes.fromHexString(testJson.get("rlp").asText());
final Bytes actual =
wrapMessageData(
BigInteger.valueOf(1111),
ReceiptsMessage.create(
singletonList(
final ReceiptsMessage receiptsMessage =
ReceiptsMessage.create(
singletonList(
singletonList(
new TransactionReceipt(
TransactionType.FRONTIER,
0,
1,
singletonList(
new TransactionReceipt(
TransactionType.FRONTIER,
new LogWithMetadata(
0,
0,
Hash.ZERO,
Hash.ZERO,
0,
1,
singletonList(
new LogWithMetadata(
0,
0,
Hash.ZERO,
Hash.ZERO,
0,
Address.fromHexString("0x11"),
Bytes.fromHexString("0x0100ff"),
Stream.of(
"0x000000000000000000000000000000000000000000000000000000000000dead",
"0x000000000000000000000000000000000000000000000000000000000000beef")
.map(LogTopic::fromHexString)
.collect(toUnmodifiableList()),
false)),
LogsBloomFilter.fromHexString(
"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"),
Optional.empty())))))
.getData();
Address.fromHexString("0x11"),
Bytes.fromHexString("0x0100ff"),
Stream.of(
"0x000000000000000000000000000000000000000000000000000000000000dead",
"0x000000000000000000000000000000000000000000000000000000000000beef")
.map(LogTopic::fromHexString)
.collect(toUnmodifiableList()),
false)),
LogsBloomFilter.fromHexString(
"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"),
Optional.empty()))));
final Bytes actual = receiptsMessage.wrapMessageData(BigInteger.valueOf(1111)).getData();
assertThat(actual).isEqualTo(expected);
}
@ -222,16 +204,15 @@ public class RequestIdMessageTest {
public void GetPooledTransactions() throws IOException {
final var testJson = parseTestFile("GetPooledTransactionsPacket66.json");
final Bytes expected = Bytes.fromHexString(testJson.get("rlp").asText());
final GetPooledTransactionsMessage getPooledTransactionsMessage =
GetPooledTransactionsMessage.create(
Stream.of(
"0x00000000000000000000000000000000000000000000000000000000deadc0de",
"0x00000000000000000000000000000000000000000000000000000000feedbeef")
.map(Hash::fromHexString)
.collect(toUnmodifiableList()));
final Bytes actual =
wrapMessageData(
BigInteger.valueOf(1111),
GetPooledTransactionsMessage.create(
Stream.of(
"0x00000000000000000000000000000000000000000000000000000000deadc0de",
"0x00000000000000000000000000000000000000000000000000000000feedbeef")
.map(Hash::fromHexString)
.collect(toUnmodifiableList())))
.getData();
getPooledTransactionsMessage.wrapMessageData(BigInteger.valueOf(1111)).getData();
assertThat(actual).isEqualTo(expected);
}
@ -239,15 +220,14 @@ public class RequestIdMessageTest {
public void PooledTransactions() throws IOException {
final var testJson = parseTestFile("PooledTransactionsPacket66.json");
final Bytes expected = Bytes.fromHexString(testJson.get("rlp").asText());
final PooledTransactionsMessage pooledTransactionsMessage =
PooledTransactionsMessage.create(
Arrays.asList(
objectMapper.treeToValue(
testJson.get("data").get("PooledTransactionsPacket"),
TestTransaction[].class)));
final Bytes actual =
wrapMessageData(
BigInteger.valueOf(1111),
PooledTransactionsMessage.create(
Arrays.asList(
objectMapper.treeToValue(
testJson.get("data").get("PooledTransactionsPacket"),
TestTransaction[].class))))
.getData();
pooledTransactionsMessage.wrapMessageData(BigInteger.valueOf(1111)).getData();
assertThat(actual).isEqualTo(expected);
}

@ -0,0 +1,71 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.p2p.rlpx.wire;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import java.math.BigInteger;
import java.util.AbstractMap;
import java.util.Map;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
public abstract class AbstractSnapMessageData extends AbstractMessageData {
private Optional<Hash> overrideStateRoot;
public AbstractSnapMessageData(final Bytes data) {
super(data);
overrideStateRoot = Optional.empty();
}
public Optional<Hash> getOverrideStateRoot() {
return overrideStateRoot;
}
public void setOverrideStateRoot(final Optional<Hash> overrideStateRoot) {
this.overrideStateRoot = overrideStateRoot;
}
@Override
public MessageData wrapMessageData(final BigInteger requestId) {
return new RawMessage(getCode(), wrap(requestId));
}
@Override
public Map.Entry<BigInteger, MessageData> unwrapMessageData() {
final RLPInput messageDataRLP = RLP.input(getData());
messageDataRLP.enterList();
final BigInteger requestId = messageDataRLP.readBigIntegerScalar();
messageDataRLP.leaveListLenient();
return new AbstractMap.SimpleImmutableEntry<>(requestId, new RawMessage(getCode(), getData()));
}
protected Bytes wrap(final BigInteger requestId) {
throw new UnsupportedOperationException("cannot wrap this message");
}
public static MessageData create(final Message message) {
return new AbstractSnapMessageData(message.getData().getData()) {
@Override
public int getCode() {
return message.getData().getCode();
}
};
}
}

@ -14,6 +14,14 @@
*/
package org.hyperledger.besu.ethereum.p2p.rlpx.wire;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import java.math.BigInteger;
import java.util.AbstractMap;
import java.util.Map;
import org.apache.tuweni.bytes.Bytes;
/** A P2P Network Message's Data. */
@ -39,4 +47,22 @@ public interface MessageData {
* @return the serialized representation of this message
*/
Bytes getData();
default MessageData wrapMessageData(final BigInteger requestId) {
final BytesValueRLPOutput rlpOutput = new BytesValueRLPOutput();
rlpOutput.startList();
rlpOutput.writeBigIntegerScalar(requestId);
rlpOutput.writeRaw(getData());
rlpOutput.endList();
return new RawMessage(getCode(), rlpOutput.encoded());
}
default Map.Entry<BigInteger, MessageData> unwrapMessageData() {
final RLPInput messageDataRLP = RLP.input(getData());
messageDataRLP.enterList();
final BigInteger requestId = messageDataRLP.readBigIntegerScalar();
final Bytes message = messageDataRLP.readAsRlp().raw();
messageDataRLP.leaveList();
return new AbstractMap.SimpleImmutableEntry<>(requestId, new RawMessage(getCode(), message));
}
}

Loading…
Cancel
Save