@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected ;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData ;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage ;
import org.hyperledger.besu.ethereum.rlp.RLPException ;
import java.math.BigInteger ;
import java.util.Collection ;
@ -70,22 +71,34 @@ public class RequestManager {
public void dispatchResponse ( final EthMessage ethMessage ) {
final Collection < ResponseStream > streams = List . copyOf ( responseStreams . values ( ) ) ;
final int count = outstandingRequests . decrementAndGet ( ) ;
if ( supportsRequestId ) {
// If there's a requestId, find the specific stream it belongs to
final Map . Entry < BigInteger , MessageData > requestIdAndEthMessage =
ethMessage . getData ( ) . unwrapMessageData ( ) ;
Optional . ofNullable ( responseStreams . get ( requestIdAndEthMessage . getKey ( ) ) )
. ifPresentOrElse (
responseStream - > responseStream . processMessage ( requestIdAndEthMessage . getValue ( ) ) ,
// disconnect on incorrect requestIds
( ) - > {
LOG . debug ( "Request ID incorrect (BREACH_OF_PROTOCOL), disconnecting peer {}" , peer ) ;
peer . disconnect ( DisconnectMessage . DisconnectReason . BREACH_OF_PROTOCOL ) ;
} ) ;
} else {
// otherwise iterate through all of them
streams . forEach ( stream - > stream . processMessage ( ethMessage . getData ( ) ) ) ;
try {
if ( supportsRequestId ) {
// If there's a requestId, find the specific stream it belongs to
final Map . Entry < BigInteger , MessageData > requestIdAndEthMessage =
ethMessage . getData ( ) . unwrapMessageData ( ) ;
Optional . ofNullable ( responseStreams . get ( requestIdAndEthMessage . getKey ( ) ) )
. ifPresentOrElse (
responseStream - > responseStream . processMessage ( requestIdAndEthMessage . getValue ( ) ) ,
// disconnect on incorrect requestIds
( ) - > {
LOG . debug (
"Request ID incorrect (BREACH_OF_PROTOCOL), disconnecting peer {}" , peer ) ;
peer . disconnect ( DisconnectMessage . DisconnectReason . BREACH_OF_PROTOCOL ) ;
} ) ;
} else {
// otherwise iterate through all of them
streams . forEach ( stream - > stream . processMessage ( ethMessage . getData ( ) ) ) ;
}
} catch ( final RLPException e ) {
LOG . debug (
"Received malformed message {} (BREACH_OF_PROTOCOL), disconnecting: {}" ,
ethMessage . getData ( ) ,
peer ,
e ) ;
peer . disconnect ( DisconnectMessage . DisconnectReason . BREACH_OF_PROTOCOL ) ;
}
if ( count = = 0 ) {
// No possibility of any remaining outstanding messages
closeOutstandingStreams ( streams ) ;