Add getNodeData to EthPeer to enable requesting node data as part of fast sync. (#589)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent ccf0e0958e
commit c6f2c5901a
  1. 29
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java
  2. 61
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeerTest.java

@ -21,6 +21,7 @@ import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockBodiesMessage; import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockBodiesMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockHeadersMessage; import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetReceiptsMessage; import tech.pegasys.pantheon.ethereum.eth.messages.GetReceiptsMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
@ -58,6 +59,7 @@ public class EthPeer {
private final RequestManager headersRequestManager = new RequestManager(this); private final RequestManager headersRequestManager = new RequestManager(this);
private final RequestManager bodiesRequestManager = new RequestManager(this); private final RequestManager bodiesRequestManager = new RequestManager(this);
private final RequestManager receiptsRequestManager = new RequestManager(this); private final RequestManager receiptsRequestManager = new RequestManager(this);
private final RequestManager nodeDataRequestManager = new RequestManager(this);
private final AtomicReference<Consumer<EthPeer>> onStatusesExchanged = new AtomicReference<>(); private final AtomicReference<Consumer<EthPeer>> onStatusesExchanged = new AtomicReference<>();
private final PeerReputation reputation = new PeerReputation(); private final PeerReputation reputation = new PeerReputation();
@ -120,6 +122,8 @@ public class EthPeer {
return sendBodiesRequest(messageData); return sendBodiesRequest(messageData);
case EthPV63.GET_RECEIPTS: case EthPV63.GET_RECEIPTS:
return sendReceiptsRequest(messageData); return sendReceiptsRequest(messageData);
case EthPV63.GET_NODE_DATA:
return sendNodeDataRequest(messageData);
default: default:
connection.sendForProtocol(protocolName, messageData); connection.sendForProtocol(protocolName, messageData);
return null; return null;
@ -168,6 +172,17 @@ public class EthPeer {
() -> connection.sendForProtocol(protocolName, messageData)); () -> connection.sendForProtocol(protocolName, messageData));
} }
public ResponseStream getNodeData(final Iterable<Hash> nodeHashes) throws PeerNotConnected {
final GetNodeDataMessage message = GetNodeDataMessage.create(nodeHashes);
return sendNodeDataRequest(message);
}
private ResponseStream sendNodeDataRequest(final MessageData messageData)
throws PeerNotConnected {
return nodeDataRequestManager.dispatchRequest(
() -> connection.sendForProtocol(protocolName, messageData));
}
boolean validateReceivedMessage(final EthMessage message) { boolean validateReceivedMessage(final EthMessage message) {
checkArgument(message.getPeer().equals(this), "Mismatched message sent to peer for dispatch"); checkArgument(message.getPeer().equals(this), "Mismatched message sent to peer for dispatch");
switch (message.getData().getCode()) { switch (message.getData().getCode()) {
@ -189,6 +204,12 @@ public class EthPeer {
return false; return false;
} }
break; break;
case EthPV63.NODE_DATA:
if (nodeDataRequestManager.outstandingRequests() == 0) {
LOG.warn("Unsolicited node data received.");
return false;
}
break;
default: default:
// Nothing to do // Nothing to do
} }
@ -215,6 +236,10 @@ public class EthPeer {
reputation.resetTimeoutCount(EthPV63.GET_RECEIPTS); reputation.resetTimeoutCount(EthPV63.GET_RECEIPTS);
receiptsRequestManager.dispatchResponse(message); receiptsRequestManager.dispatchResponse(message);
break; break;
case EthPV63.NODE_DATA:
reputation.resetTimeoutCount(EthPV63.GET_NODE_DATA);
nodeDataRequestManager.dispatchResponse(message);
break;
default: default:
// Nothing to do // Nothing to do
} }
@ -228,6 +253,7 @@ public class EthPeer {
headersRequestManager.close(); headersRequestManager.close();
bodiesRequestManager.close(); bodiesRequestManager.close();
receiptsRequestManager.close(); receiptsRequestManager.close();
nodeDataRequestManager.close();
disconnectCallbacks.forEach(callback -> callback.onDisconnect(this)); disconnectCallbacks.forEach(callback -> callback.onDisconnect(this));
} }
@ -294,7 +320,8 @@ public class EthPeer {
public int outstandingRequests() { public int outstandingRequests() {
return headersRequestManager.outstandingRequests() return headersRequestManager.outstandingRequests()
+ bodiesRequestManager.outstandingRequests() + bodiesRequestManager.outstandingRequests()
+ receiptsRequestManager.outstandingRequests(); + receiptsRequestManager.outstandingRequests()
+ nodeDataRequestManager.outstandingRequests();
} }
public BytesValue nodeId() { public BytesValue nodeId() {

@ -12,6 +12,8 @@
*/ */
package tech.pegasys.pantheon.ethereum.eth.manager; package tech.pegasys.pantheon.ethereum.eth.manager;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
@ -20,14 +22,13 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseCallbac
import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockBodiesMessage; import tech.pegasys.pantheon.ethereum.eth.messages.BlockBodiesMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage; import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.NodeDataMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage; import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -43,9 +44,8 @@ public class EthPeerTest {
final ResponseStreamSupplier getStream = final ResponseStreamSupplier getStream =
(peer) -> peer.getHeadersByHash(gen.hash(), 5, 0, false); (peer) -> peer.getHeadersByHash(gen.hash(), 5, 0, false);
final MessageData targetMessage = final MessageData targetMessage =
BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header())); BlockHeadersMessage.create(asList(gen.header(), gen.header()));
final MessageData otherMessage = final MessageData otherMessage = BlockBodiesMessage.create(asList(gen.body(), gen.body()));
BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body()));
messageStream(getStream, targetMessage, otherMessage); messageStream(getStream, targetMessage, otherMessage);
} }
@ -53,11 +53,9 @@ public class EthPeerTest {
@Test @Test
public void getBodiesStream() throws PeerNotConnected { public void getBodiesStream() throws PeerNotConnected {
final ResponseStreamSupplier getStream = final ResponseStreamSupplier getStream =
(peer) -> peer.getBodies(Arrays.asList(gen.hash(), gen.hash())); (peer) -> peer.getBodies(asList(gen.hash(), gen.hash()));
final MessageData targetMessage = final MessageData targetMessage = BlockBodiesMessage.create(asList(gen.body(), gen.body()));
BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body())); final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header()));
final MessageData otherMessage =
BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header()));
messageStream(getStream, targetMessage, otherMessage); messageStream(getStream, targetMessage, otherMessage);
} }
@ -65,11 +63,20 @@ public class EthPeerTest {
@Test @Test
public void getReceiptsStream() throws PeerNotConnected { public void getReceiptsStream() throws PeerNotConnected {
final ResponseStreamSupplier getStream = final ResponseStreamSupplier getStream =
(peer) -> peer.getReceipts(Arrays.asList(gen.hash(), gen.hash())); (peer) -> peer.getReceipts(asList(gen.hash(), gen.hash()));
final MessageData targetMessage = final MessageData targetMessage =
ReceiptsMessage.create(Collections.singletonList(gen.receipts(gen.block()))); ReceiptsMessage.create(singletonList(gen.receipts(gen.block())));
final MessageData otherMessage = final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header()));
BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header()));
messageStream(getStream, targetMessage, otherMessage);
}
@Test
public void getNodeDataStream() throws PeerNotConnected {
final ResponseStreamSupplier getStream =
(peer) -> peer.getNodeData(asList(gen.hash(), gen.hash()));
final MessageData targetMessage = NodeDataMessage.create(singletonList(gen.bytesValue()));
final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header()));
messageStream(getStream, targetMessage, otherMessage); messageStream(getStream, targetMessage, otherMessage);
} }
@ -88,7 +95,7 @@ public class EthPeerTest {
}); });
// Bodies stream // Bodies stream
final AtomicInteger bodiesClosedCount = new AtomicInteger(0); final AtomicInteger bodiesClosedCount = new AtomicInteger(0);
peer.getBodies(Arrays.asList(gen.hash(), gen.hash())) peer.getBodies(asList(gen.hash(), gen.hash()))
.then( .then(
(closed, msg, p) -> { (closed, msg, p) -> {
if (closed) { if (closed) {
@ -97,24 +104,35 @@ public class EthPeerTest {
}); });
// Receipts stream // Receipts stream
final AtomicInteger receiptsClosedCount = new AtomicInteger(0); final AtomicInteger receiptsClosedCount = new AtomicInteger(0);
peer.getReceipts(Arrays.asList(gen.hash(), gen.hash())) peer.getReceipts(asList(gen.hash(), gen.hash()))
.then( .then(
(closed, msg, p) -> { (closed, msg, p) -> {
if (closed) { if (closed) {
receiptsClosedCount.incrementAndGet(); receiptsClosedCount.incrementAndGet();
} }
}); });
// NodeData stream
final AtomicInteger nodeDataClosedCount = new AtomicInteger(0);
peer.getNodeData(asList(gen.hash(), gen.hash()))
.then(
(closed, msg, p) -> {
if (closed) {
nodeDataClosedCount.incrementAndGet();
}
});
// Sanity check // Sanity check
assertThat(headersClosedCount.get()).isEqualTo(0); assertThat(headersClosedCount.get()).isEqualTo(0);
assertThat(bodiesClosedCount.get()).isEqualTo(0); assertThat(bodiesClosedCount.get()).isEqualTo(0);
assertThat(receiptsClosedCount.get()).isEqualTo(0); assertThat(receiptsClosedCount.get()).isEqualTo(0);
assertThat(nodeDataClosedCount.get()).isEqualTo(0);
// Disconnect and check // Disconnect and check
peer.handleDisconnect(); peer.handleDisconnect();
assertThat(headersClosedCount.get()).isEqualTo(1); assertThat(headersClosedCount.get()).isEqualTo(1);
assertThat(bodiesClosedCount.get()).isEqualTo(1); assertThat(bodiesClosedCount.get()).isEqualTo(1);
assertThat(receiptsClosedCount.get()).isEqualTo(1); assertThat(receiptsClosedCount.get()).isEqualTo(1);
assertThat(nodeDataClosedCount.get()).isEqualTo(1);
} }
@Test @Test
@ -122,12 +140,11 @@ public class EthPeerTest {
// Setup peer and messages // Setup peer and messages
final EthPeer peer = createPeer(); final EthPeer peer = createPeer();
final EthMessage headersMessage = final EthMessage headersMessage =
new EthMessage(peer, BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header()))); new EthMessage(peer, BlockHeadersMessage.create(asList(gen.header(), gen.header())));
final EthMessage bodiesMessage = final EthMessage bodiesMessage =
new EthMessage(peer, BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body()))); new EthMessage(peer, BlockBodiesMessage.create(asList(gen.body(), gen.body())));
final EthMessage otherMessage = final EthMessage otherMessage =
new EthMessage( new EthMessage(peer, ReceiptsMessage.create(singletonList(gen.receipts(gen.block()))));
peer, ReceiptsMessage.create(Collections.singletonList(gen.receipts(gen.block()))));
// Set up stream for headers // Set up stream for headers
final AtomicInteger headersMessageCount = new AtomicInteger(0); final AtomicInteger headersMessageCount = new AtomicInteger(0);
@ -147,7 +164,7 @@ public class EthPeerTest {
final AtomicInteger bodiesMessageCount = new AtomicInteger(0); final AtomicInteger bodiesMessageCount = new AtomicInteger(0);
final AtomicInteger bodiesClosedCount = new AtomicInteger(0); final AtomicInteger bodiesClosedCount = new AtomicInteger(0);
final ResponseStream bodiesStream = final ResponseStream bodiesStream =
peer.getBodies(Arrays.asList(gen.hash(), gen.hash())) peer.getBodies(asList(gen.hash(), gen.hash()))
.then( .then(
(closed, msg, p) -> { (closed, msg, p) -> {
if (closed) { if (closed) {
@ -265,7 +282,7 @@ public class EthPeerTest {
} }
private EthPeer createPeer() { private EthPeer createPeer() {
final Set<Capability> caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63)); final Set<Capability> caps = new HashSet<>(singletonList(EthProtocol.ETH63));
final PeerConnection peerConnection = new MockPeerConnection(caps); final PeerConnection peerConnection = new MockPeerConnection(caps);
final Consumer<EthPeer> onPeerReady = (peer) -> {}; final Consumer<EthPeer> onPeerReady = (peer) -> {};
return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady); return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady);

Loading…
Cancel
Save