Add GetNodeDataFromPeerTask. (#597)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by mbaxter
parent 89dbba4f97
commit 05895495cc
  1. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessage.java
  2. 3
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java
  3. 89
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetNodeDataFromPeerTask.java
  4. 3
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java
  5. 11
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java
  6. 3
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java
  7. 19
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java
  8. 65
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetNodeDataFromPeerTaskTest.java

@ -22,6 +22,8 @@ import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.List;
public final class BlockBodiesMessage extends AbstractMessageData { public final class BlockBodiesMessage extends AbstractMessageData {
public static BlockBodiesMessage readFrom(final MessageData message) { public static BlockBodiesMessage readFrom(final MessageData message) {
@ -53,7 +55,7 @@ public final class BlockBodiesMessage extends AbstractMessageData {
return EthPV62.BLOCK_BODIES; return EthPV62.BLOCK_BODIES;
} }
public <C> Iterable<BlockBody> bodies(final ProtocolSchedule<C> protocolSchedule) { public <C> List<BlockBody> bodies(final ProtocolSchedule<C> protocolSchedule) {
final BlockHashFunction blockHashFunction = final BlockHashFunction blockHashFunction =
ScheduleBasedBlockHashFunction.create(protocolSchedule); ScheduleBasedBlockHashFunction.create(protocolSchedule);
return new BytesValueRLPInput(data, false) return new BytesValueRLPInput(data, false)

@ -42,7 +42,6 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -103,7 +102,7 @@ public class GetBodiesFromPeerTask<C> extends AbstractPeerRequestTask<List<Block
} }
final BlockBodiesMessage bodiesMessage = BlockBodiesMessage.readFrom(message); final BlockBodiesMessage bodiesMessage = BlockBodiesMessage.readFrom(message);
final List<BlockBody> bodies = Lists.newArrayList(bodiesMessage.bodies(protocolSchedule)); final List<BlockBody> bodies = bodiesMessage.bodies(protocolSchedule);
if (bodies.size() == 0) { if (bodies.size() == 0) {
// Message contains no data - nothing to do // Message contains no data - nothing to do
return Optional.empty(); return Optional.empty();

@ -0,0 +1,89 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;
import static java.util.Collections.emptyList;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerRequestTask;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.messages.NodeDataMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class GetNodeDataFromPeerTask extends AbstractPeerRequestTask<List<BytesValue>> {
private static final Logger LOG = LogManager.getLogger();
private final Set<Hash> hashes;
private GetNodeDataFromPeerTask(
final EthContext ethContext,
final Collection<Hash> hashes,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, EthPV63.GET_NODE_DATA, ethTasksTimer);
this.hashes = new HashSet<>(hashes);
}
public static GetNodeDataFromPeerTask forHashes(
final EthContext ethContext,
final Collection<Hash> hashes,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetNodeDataFromPeerTask(ethContext, hashes, ethTasksTimer);
}
@Override
protected ResponseStream sendRequest(final EthPeer peer) throws PeerNotConnected {
LOG.debug("Requesting {} node data entries from peer {}.", hashes.size(), peer);
return peer.getNodeData(hashes);
}
@Override
protected Optional<List<BytesValue>> processResponse(
final boolean streamClosed, final MessageData message, final EthPeer peer) {
if (streamClosed) {
// We don't record this as a useless response because it's impossible to know if a peer has
// the data we're requesting.
return Optional.of(emptyList());
}
final NodeDataMessage nodeDataMessage = NodeDataMessage.readFrom(message);
final List<BytesValue> nodeData = nodeDataMessage.nodeData();
if (nodeData.isEmpty()) {
return Optional.empty();
} else if (nodeData.size() > hashes.size()) {
// Can't be the response to our request
return Optional.empty();
}
if (nodeData.stream().anyMatch(data -> !hashes.contains(Hash.hash(data)))) {
// Message contains unrequested data, must not be the response to our request.
return Optional.empty();
}
return Optional.of(nodeData);
}
}

@ -211,11 +211,12 @@ public class RespondingEthPeer {
*/ */
public static <C> Responder partialResponder( public static <C> Responder partialResponder(
final Blockchain blockchain, final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final ProtocolSchedule<C> protocolSchedule, final ProtocolSchedule<C> protocolSchedule,
final float portion) { final float portion) {
checkArgument(portion >= 0.0 && portion <= 1.0, "Portion is in the range [0.0..1.0]"); checkArgument(portion >= 0.0 && portion <= 1.0, "Portion is in the range [0.0..1.0]");
final Responder fullResponder = blockchainResponder(blockchain); final Responder fullResponder = blockchainResponder(blockchain, worldStateArchive);
return (cap, msg) -> { return (cap, msg) -> {
final Optional<MessageData> maybeResponse = fullResponder.respond(cap, msg); final Optional<MessageData> maybeResponse = fullResponder.respond(cap, msg);
if (!maybeResponse.isPresent()) { if (!maybeResponse.isPresent()) {

@ -29,7 +29,6 @@ import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -83,9 +82,10 @@ public abstract class AbstractMessageTaskTest<T, R> {
T requestedData, R response, EthPeer respondingPeer); T requestedData, R response, EthPeer respondingPeer);
@Test @Test
public void completesWhenPeersAreResponsive() throws ExecutionException, InterruptedException { public void completesWhenPeersAreResponsive() {
// Setup a responsive peer // Setup a responsive peer
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); final Responder responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeer = final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
@ -109,8 +109,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
} }
@Test @Test
public void doesNotCompleteWhenPeersDoNotRespond() public void doesNotCompleteWhenPeersDoNotRespond() {
throws ExecutionException, InterruptedException {
// Setup a unresponsive peer // Setup a unresponsive peer
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
@ -129,7 +128,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
} }
@Test @Test
public void cancel() throws ExecutionException, InterruptedException { public void cancel() {
// Setup a unresponsive peer // Setup a unresponsive peer
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);

@ -41,7 +41,8 @@ public abstract class PeerMessageTaskTest<T> extends AbstractMessageTaskTest<T,
public void completesWhenPeerReturnsPartialResult() { public void completesWhenPeerReturnsPartialResult() {
// Setup a partially responsive peer // Setup a partially responsive peer
final Responder responder = final Responder responder =
RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.5f); RespondingEthPeer.partialResponder(
blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.5f);
final RespondingEthPeer respondingEthPeer = final RespondingEthPeer respondingEthPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);

@ -52,7 +52,8 @@ public abstract class RetryingMessageTaskTest<T> extends AbstractMessageTaskTest
// Setup a partially responsive peer and a non-responsive peer // Setup a partially responsive peer and a non-responsive peer
final Responder partialResponder = final Responder partialResponder =
RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.5f); RespondingEthPeer.partialResponder(
blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.5f);
final Responder emptyResponder = RespondingEthPeer.emptyResponder(); final Responder emptyResponder = RespondingEthPeer.emptyResponder();
final RespondingEthPeer respondingPeer = final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager); EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
@ -96,10 +97,18 @@ public abstract class RetryingMessageTaskTest<T> extends AbstractMessageTaskTest
final CompletableFuture<T> future = task.run(); final CompletableFuture<T> future = task.run();
// Respond with partial data up until complete. // Respond with partial data up until complete.
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.25f)); respondingPeer.respond(
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.50f)); RespondingEthPeer.partialResponder(
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.75f)); blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.25f));
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 1.0f)); respondingPeer.respond(
RespondingEthPeer.partialResponder(
blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.50f));
respondingPeer.respond(
RespondingEthPeer.partialResponder(
blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.75f));
respondingPeer.respond(
RespondingEthPeer.partialResponder(
blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 1.0f));
assertThat(future.isDone()).isTrue(); assertThat(future.isDone()).isTrue();
assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer()); assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer());

@ -0,0 +1,65 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.ArrayList;
import java.util.List;
public class GetNodeDataFromPeerTaskTest extends PeerMessageTaskTest<List<BytesValue>> {
@Override
protected List<BytesValue> generateDataToBeRequested() {
final List<BytesValue> requestedData = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final BlockHeader blockHeader = blockchain.getBlockHeader(10 + i).get();
requestedData.add(
protocolContext.getWorldStateArchive().getNodeData(blockHeader.getStateRoot()).get());
}
return requestedData;
}
@Override
protected EthTask<PeerTaskResult<List<BytesValue>>> createTask(
final List<BytesValue> requestedData) {
final List<Hash> hashes = requestedData.stream().map(Hash::hash).collect(toList());
return GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer);
}
@Override
protected void assertPartialResultMatchesExpectation(
final List<BytesValue> requestedData, final List<BytesValue> partialResponse) {
assertThat(partialResponse.size()).isLessThanOrEqualTo(requestedData.size());
assertThat(partialResponse.size()).isGreaterThan(0);
assertThat(requestedData).containsAll(partialResponse);
}
@Override
protected void assertResultMatchesExpectation(
final List<BytesValue> requestedData,
final PeerTaskResult<List<BytesValue>> response,
final EthPeer respondingPeer) {
assertThat(response.getResult()).containsExactlyInAnyOrderElementsOf(requestedData);
assertThat(response.getPeer()).isEqualTo(respondingPeer);
}
}
Loading…
Cancel
Save