diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/RequestManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/RequestManager.java index 989af18781..cefcdab1ff 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/RequestManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/RequestManager.java @@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.manager; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; +import org.hyperledger.besu.ethereum.rlp.RLPException; import java.math.BigInteger; import java.util.Collection; @@ -70,22 +71,34 @@ public class RequestManager { public void dispatchResponse(final EthMessage ethMessage) { final Collection streams = List.copyOf(responseStreams.values()); final int count = outstandingRequests.decrementAndGet(); - if (supportsRequestId) { - // If there's a requestId, find the specific stream it belongs to - final Map.Entry requestIdAndEthMessage = - ethMessage.getData().unwrapMessageData(); - Optional.ofNullable(responseStreams.get(requestIdAndEthMessage.getKey())) - .ifPresentOrElse( - responseStream -> responseStream.processMessage(requestIdAndEthMessage.getValue()), - // disconnect on incorrect requestIds - () -> { - LOG.debug("Request ID incorrect (BREACH_OF_PROTOCOL), disconnecting peer {}", peer); - peer.disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL); - }); - } else { - // otherwise iterate through all of them - streams.forEach(stream -> stream.processMessage(ethMessage.getData())); + try { + if (supportsRequestId) { + // If there's a requestId, find the specific stream it belongs to + final Map.Entry requestIdAndEthMessage = + ethMessage.getData().unwrapMessageData(); + Optional.ofNullable(responseStreams.get(requestIdAndEthMessage.getKey())) + .ifPresentOrElse( + responseStream -> responseStream.processMessage(requestIdAndEthMessage.getValue()), + // disconnect on incorrect requestIds + () -> { + LOG.debug( + "Request ID incorrect (BREACH_OF_PROTOCOL), disconnecting peer {}", peer); + peer.disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL); + }); + } else { + // otherwise iterate through all of them + streams.forEach(stream -> stream.processMessage(ethMessage.getData())); + } + } catch (final RLPException e) { + LOG.debug( + "Received malformed message {} (BREACH_OF_PROTOCOL), disconnecting: {}", + ethMessage.getData(), + peer, + e); + + peer.disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL); } + if (count == 0) { // No possibility of any remaining outstanding messages closeOutstandingStreams(streams); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java index b79f75597a..71cbb2507a 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java @@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage; import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput; +import org.hyperledger.besu.ethereum.rlp.RLP; import org.hyperledger.besu.testutil.TestClock; import java.util.ArrayList; @@ -309,4 +310,29 @@ public class RequestManagerTest { TestClock.fixed(), Collections.emptyList()); } + + @Test + public void disconnectsPeerOnBadMessage() throws Exception { + for (final boolean supportsRequestId : List.of(true, false)) { + final EthPeer peer = createPeer(); + final RequestManager requestManager = + new RequestManager(peer, supportsRequestId, EthProtocol.NAME); + + requestManager + .dispatchRequest( + messageData -> RLP.input(messageData.getData()).nextSize(), + new RawMessage(0x01, Bytes.EMPTY)) + .then( + (closed, msg, p) -> { + if (!closed) { + RLP.input(msg.getData()).skipNext(); + } + }); + final EthMessage mockMessage = + new EthMessage(peer, new RawMessage(1, Bytes.of(0x81, 0x82, 0x83, 0x84))); + + requestManager.dispatchResponse(mockMessage); + assertThat(peer.isDisconnected()).isTrue(); + } + } }