From 05895495cc5d054e8f103760f542bdad0e2f3e35 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Sat, 19 Jan 2019 04:44:28 +1000 Subject: [PATCH] Add GetNodeDataFromPeerTask. (#597) Signed-off-by: Adrian Sutton --- .../eth/messages/BlockBodiesMessage.java | 4 +- .../eth/sync/tasks/GetBodiesFromPeerTask.java | 3 +- .../sync/tasks/GetNodeDataFromPeerTask.java | 89 +++++++++++++++++++ .../eth/manager/RespondingEthPeer.java | 3 +- .../ethtaskutils/AbstractMessageTaskTest.java | 11 ++- .../ethtaskutils/PeerMessageTaskTest.java | 3 +- .../ethtaskutils/RetryingMessageTaskTest.java | 19 ++-- .../tasks/GetNodeDataFromPeerTaskTest.java | 65 ++++++++++++++ 8 files changed, 181 insertions(+), 16 deletions(-) create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetNodeDataFromPeerTask.java create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetNodeDataFromPeerTaskTest.java diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessage.java index afac3193d9..7e2664a1f1 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessage.java @@ -22,6 +22,8 @@ import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.List; + public final class BlockBodiesMessage extends AbstractMessageData { public static BlockBodiesMessage readFrom(final MessageData message) { @@ -53,7 +55,7 @@ public final class BlockBodiesMessage extends AbstractMessageData { return EthPV62.BLOCK_BODIES; } - public Iterable bodies(final ProtocolSchedule protocolSchedule) { + public List bodies(final ProtocolSchedule protocolSchedule) { final BlockHashFunction blockHashFunction = ScheduleBasedBlockHashFunction.create(protocolSchedule); return new BytesValueRLPInput(data, false) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java index 504e3ff5d8..0c6422356e 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java @@ -42,7 +42,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -103,7 +102,7 @@ public class GetBodiesFromPeerTask extends AbstractPeerRequestTask bodies = Lists.newArrayList(bodiesMessage.bodies(protocolSchedule)); + final List bodies = bodiesMessage.bodies(protocolSchedule); if (bodies.size() == 0) { // Message contains no data - nothing to do return Optional.empty(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetNodeDataFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetNodeDataFromPeerTask.java new file mode 100644 index 0000000000..cf54138b73 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetNodeDataFromPeerTask.java @@ -0,0 +1,89 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import static java.util.Collections.emptyList; + +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerRequestTask; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; +import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; +import tech.pegasys.pantheon.ethereum.eth.messages.NodeDataMessage; +import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class GetNodeDataFromPeerTask extends AbstractPeerRequestTask> { + + private static final Logger LOG = LogManager.getLogger(); + + private final Set hashes; + + private GetNodeDataFromPeerTask( + final EthContext ethContext, + final Collection hashes, + final LabelledMetric ethTasksTimer) { + super(ethContext, EthPV63.GET_NODE_DATA, ethTasksTimer); + this.hashes = new HashSet<>(hashes); + } + + public static GetNodeDataFromPeerTask forHashes( + final EthContext ethContext, + final Collection hashes, + final LabelledMetric ethTasksTimer) { + return new GetNodeDataFromPeerTask(ethContext, hashes, ethTasksTimer); + } + + @Override + protected ResponseStream sendRequest(final EthPeer peer) throws PeerNotConnected { + LOG.debug("Requesting {} node data entries from peer {}.", hashes.size(), peer); + return peer.getNodeData(hashes); + } + + @Override + protected Optional> processResponse( + final boolean streamClosed, final MessageData message, final EthPeer peer) { + if (streamClosed) { + // We don't record this as a useless response because it's impossible to know if a peer has + // the data we're requesting. + return Optional.of(emptyList()); + } + final NodeDataMessage nodeDataMessage = NodeDataMessage.readFrom(message); + final List nodeData = nodeDataMessage.nodeData(); + if (nodeData.isEmpty()) { + return Optional.empty(); + } else if (nodeData.size() > hashes.size()) { + // Can't be the response to our request + return Optional.empty(); + } + + if (nodeData.stream().anyMatch(data -> !hashes.contains(Hash.hash(data)))) { + // Message contains unrequested data, must not be the response to our request. + return Optional.empty(); + } + return Optional.of(nodeData); + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java index 303e9165a6..495f0710d4 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java @@ -211,11 +211,12 @@ public class RespondingEthPeer { */ public static Responder partialResponder( final Blockchain blockchain, + final WorldStateArchive worldStateArchive, final ProtocolSchedule protocolSchedule, final float portion) { checkArgument(portion >= 0.0 && portion <= 1.0, "Portion is in the range [0.0..1.0]"); - final Responder fullResponder = blockchainResponder(blockchain); + final Responder fullResponder = blockchainResponder(blockchain, worldStateArchive); return (cap, msg) -> { final Optional maybeResponse = fullResponder.respond(cap, msg); if (!maybeResponse.isPresent()) { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java index 59661a924c..ae6452ad8c 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java @@ -29,7 +29,6 @@ import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -83,9 +82,10 @@ public abstract class AbstractMessageTaskTest { T requestedData, R response, EthPeer respondingPeer); @Test - public void completesWhenPeersAreResponsive() throws ExecutionException, InterruptedException { + public void completesWhenPeersAreResponsive() { // Setup a responsive peer - final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); + final Responder responder = + RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); final RespondingEthPeer respondingPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); @@ -109,8 +109,7 @@ public abstract class AbstractMessageTaskTest { } @Test - public void doesNotCompleteWhenPeersDoNotRespond() - throws ExecutionException, InterruptedException { + public void doesNotCompleteWhenPeersDoNotRespond() { // Setup a unresponsive peer EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); @@ -129,7 +128,7 @@ public abstract class AbstractMessageTaskTest { } @Test - public void cancel() throws ExecutionException, InterruptedException { + public void cancel() { // Setup a unresponsive peer EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java index 528fea4da7..c00af3819e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java @@ -41,7 +41,8 @@ public abstract class PeerMessageTaskTest extends AbstractMessageTaskTest extends AbstractMessageTaskTest // Setup a partially responsive peer and a non-responsive peer final Responder partialResponder = - RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.5f); + RespondingEthPeer.partialResponder( + blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.5f); final Responder emptyResponder = RespondingEthPeer.emptyResponder(); final RespondingEthPeer respondingPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager); @@ -96,10 +97,18 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskTest final CompletableFuture future = task.run(); // Respond with partial data up until complete. - respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.25f)); - respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.50f)); - respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.75f)); - respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 1.0f)); + respondingPeer.respond( + RespondingEthPeer.partialResponder( + blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.25f)); + respondingPeer.respond( + RespondingEthPeer.partialResponder( + blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.50f)); + respondingPeer.respond( + RespondingEthPeer.partialResponder( + blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.75f)); + respondingPeer.respond( + RespondingEthPeer.partialResponder( + blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 1.0f)); assertThat(future.isDone()).isTrue(); assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetNodeDataFromPeerTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetNodeDataFromPeerTaskTest.java new file mode 100644 index 0000000000..8388e1e30f --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetNodeDataFromPeerTaskTest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; +import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.ArrayList; +import java.util.List; + +public class GetNodeDataFromPeerTaskTest extends PeerMessageTaskTest> { + + @Override + protected List generateDataToBeRequested() { + final List requestedData = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + final BlockHeader blockHeader = blockchain.getBlockHeader(10 + i).get(); + requestedData.add( + protocolContext.getWorldStateArchive().getNodeData(blockHeader.getStateRoot()).get()); + } + return requestedData; + } + + @Override + protected EthTask>> createTask( + final List requestedData) { + final List hashes = requestedData.stream().map(Hash::hash).collect(toList()); + return GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer); + } + + @Override + protected void assertPartialResultMatchesExpectation( + final List requestedData, final List partialResponse) { + assertThat(partialResponse.size()).isLessThanOrEqualTo(requestedData.size()); + assertThat(partialResponse.size()).isGreaterThan(0); + assertThat(requestedData).containsAll(partialResponse); + } + + @Override + protected void assertResultMatchesExpectation( + final List requestedData, + final PeerTaskResult> response, + final EthPeer respondingPeer) { + assertThat(response.getResult()).containsExactlyInAnyOrderElementsOf(requestedData); + assertThat(response.getPeer()).isEqualTo(respondingPeer); + } +}