diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java index 4acd15e9ee..a21306e4e6 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java @@ -21,6 +21,7 @@ import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockBodiesMessage; import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockHeadersMessage; +import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage; import tech.pegasys.pantheon.ethereum.eth.messages.GetReceiptsMessage; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; @@ -58,6 +59,7 @@ public class EthPeer { private final RequestManager headersRequestManager = new RequestManager(this); private final RequestManager bodiesRequestManager = new RequestManager(this); private final RequestManager receiptsRequestManager = new RequestManager(this); + private final RequestManager nodeDataRequestManager = new RequestManager(this); private final AtomicReference> onStatusesExchanged = new AtomicReference<>(); private final PeerReputation reputation = new PeerReputation(); @@ -120,6 +122,8 @@ public class EthPeer { return sendBodiesRequest(messageData); case EthPV63.GET_RECEIPTS: return sendReceiptsRequest(messageData); + case EthPV63.GET_NODE_DATA: + return sendNodeDataRequest(messageData); default: connection.sendForProtocol(protocolName, messageData); return null; @@ -168,6 +172,17 @@ public class EthPeer { () -> connection.sendForProtocol(protocolName, messageData)); } + public ResponseStream getNodeData(final Iterable nodeHashes) throws PeerNotConnected { + final GetNodeDataMessage message = GetNodeDataMessage.create(nodeHashes); + return sendNodeDataRequest(message); + } + + private ResponseStream sendNodeDataRequest(final MessageData messageData) + throws PeerNotConnected { + return nodeDataRequestManager.dispatchRequest( + () -> connection.sendForProtocol(protocolName, messageData)); + } + boolean validateReceivedMessage(final EthMessage message) { checkArgument(message.getPeer().equals(this), "Mismatched message sent to peer for dispatch"); switch (message.getData().getCode()) { @@ -189,6 +204,12 @@ public class EthPeer { return false; } break; + case EthPV63.NODE_DATA: + if (nodeDataRequestManager.outstandingRequests() == 0) { + LOG.warn("Unsolicited node data received."); + return false; + } + break; default: // Nothing to do } @@ -215,6 +236,10 @@ public class EthPeer { reputation.resetTimeoutCount(EthPV63.GET_RECEIPTS); receiptsRequestManager.dispatchResponse(message); break; + case EthPV63.NODE_DATA: + reputation.resetTimeoutCount(EthPV63.GET_NODE_DATA); + nodeDataRequestManager.dispatchResponse(message); + break; default: // Nothing to do } @@ -228,6 +253,7 @@ public class EthPeer { headersRequestManager.close(); bodiesRequestManager.close(); receiptsRequestManager.close(); + nodeDataRequestManager.close(); disconnectCallbacks.forEach(callback -> callback.onDisconnect(this)); } @@ -294,7 +320,8 @@ public class EthPeer { public int outstandingRequests() { return headersRequestManager.outstandingRequests() + bodiesRequestManager.outstandingRequests() - + receiptsRequestManager.outstandingRequests(); + + receiptsRequestManager.outstandingRequests() + + nodeDataRequestManager.outstandingRequests(); } public BytesValue nodeId() { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeerTest.java index 05b2d86b21..bc1fdc57be 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeerTest.java @@ -12,6 +12,8 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; @@ -20,14 +22,13 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseCallbac import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; import tech.pegasys.pantheon.ethereum.eth.messages.BlockBodiesMessage; import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage; +import tech.pegasys.pantheon.ethereum.eth.messages.NodeDataMessage; import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; -import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -43,9 +44,8 @@ public class EthPeerTest { final ResponseStreamSupplier getStream = (peer) -> peer.getHeadersByHash(gen.hash(), 5, 0, false); final MessageData targetMessage = - BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header())); - final MessageData otherMessage = - BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body())); + BlockHeadersMessage.create(asList(gen.header(), gen.header())); + final MessageData otherMessage = BlockBodiesMessage.create(asList(gen.body(), gen.body())); messageStream(getStream, targetMessage, otherMessage); } @@ -53,11 +53,9 @@ public class EthPeerTest { @Test public void getBodiesStream() throws PeerNotConnected { final ResponseStreamSupplier getStream = - (peer) -> peer.getBodies(Arrays.asList(gen.hash(), gen.hash())); - final MessageData targetMessage = - BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body())); - final MessageData otherMessage = - BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header())); + (peer) -> peer.getBodies(asList(gen.hash(), gen.hash())); + final MessageData targetMessage = BlockBodiesMessage.create(asList(gen.body(), gen.body())); + final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header())); messageStream(getStream, targetMessage, otherMessage); } @@ -65,11 +63,20 @@ public class EthPeerTest { @Test public void getReceiptsStream() throws PeerNotConnected { final ResponseStreamSupplier getStream = - (peer) -> peer.getReceipts(Arrays.asList(gen.hash(), gen.hash())); + (peer) -> peer.getReceipts(asList(gen.hash(), gen.hash())); final MessageData targetMessage = - ReceiptsMessage.create(Collections.singletonList(gen.receipts(gen.block()))); - final MessageData otherMessage = - BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header())); + ReceiptsMessage.create(singletonList(gen.receipts(gen.block()))); + final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header())); + + messageStream(getStream, targetMessage, otherMessage); + } + + @Test + public void getNodeDataStream() throws PeerNotConnected { + final ResponseStreamSupplier getStream = + (peer) -> peer.getNodeData(asList(gen.hash(), gen.hash())); + final MessageData targetMessage = NodeDataMessage.create(singletonList(gen.bytesValue())); + final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header())); messageStream(getStream, targetMessage, otherMessage); } @@ -88,7 +95,7 @@ public class EthPeerTest { }); // Bodies stream final AtomicInteger bodiesClosedCount = new AtomicInteger(0); - peer.getBodies(Arrays.asList(gen.hash(), gen.hash())) + peer.getBodies(asList(gen.hash(), gen.hash())) .then( (closed, msg, p) -> { if (closed) { @@ -97,24 +104,35 @@ public class EthPeerTest { }); // Receipts stream final AtomicInteger receiptsClosedCount = new AtomicInteger(0); - peer.getReceipts(Arrays.asList(gen.hash(), gen.hash())) + peer.getReceipts(asList(gen.hash(), gen.hash())) .then( (closed, msg, p) -> { if (closed) { receiptsClosedCount.incrementAndGet(); } }); + // NodeData stream + final AtomicInteger nodeDataClosedCount = new AtomicInteger(0); + peer.getNodeData(asList(gen.hash(), gen.hash())) + .then( + (closed, msg, p) -> { + if (closed) { + nodeDataClosedCount.incrementAndGet(); + } + }); // Sanity check assertThat(headersClosedCount.get()).isEqualTo(0); assertThat(bodiesClosedCount.get()).isEqualTo(0); assertThat(receiptsClosedCount.get()).isEqualTo(0); + assertThat(nodeDataClosedCount.get()).isEqualTo(0); // Disconnect and check peer.handleDisconnect(); assertThat(headersClosedCount.get()).isEqualTo(1); assertThat(bodiesClosedCount.get()).isEqualTo(1); assertThat(receiptsClosedCount.get()).isEqualTo(1); + assertThat(nodeDataClosedCount.get()).isEqualTo(1); } @Test @@ -122,12 +140,11 @@ public class EthPeerTest { // Setup peer and messages final EthPeer peer = createPeer(); final EthMessage headersMessage = - new EthMessage(peer, BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header()))); + new EthMessage(peer, BlockHeadersMessage.create(asList(gen.header(), gen.header()))); final EthMessage bodiesMessage = - new EthMessage(peer, BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body()))); + new EthMessage(peer, BlockBodiesMessage.create(asList(gen.body(), gen.body()))); final EthMessage otherMessage = - new EthMessage( - peer, ReceiptsMessage.create(Collections.singletonList(gen.receipts(gen.block())))); + new EthMessage(peer, ReceiptsMessage.create(singletonList(gen.receipts(gen.block())))); // Set up stream for headers final AtomicInteger headersMessageCount = new AtomicInteger(0); @@ -147,7 +164,7 @@ public class EthPeerTest { final AtomicInteger bodiesMessageCount = new AtomicInteger(0); final AtomicInteger bodiesClosedCount = new AtomicInteger(0); final ResponseStream bodiesStream = - peer.getBodies(Arrays.asList(gen.hash(), gen.hash())) + peer.getBodies(asList(gen.hash(), gen.hash())) .then( (closed, msg, p) -> { if (closed) { @@ -265,7 +282,7 @@ public class EthPeerTest { } private EthPeer createPeer() { - final Set caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63)); + final Set caps = new HashSet<>(singletonList(EthProtocol.ETH63)); final PeerConnection peerConnection = new MockPeerConnection(caps); final Consumer onPeerReady = (peer) -> {}; return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady);