[PAN-2433] Fix netty pipeline (#1257)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
mbaxter 6 years ago committed by GitHub
parent 110089fd63
commit 907bbb5299
  1. 27
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramer.java
  2. 20
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/exceptions/BreachOfProtocolException.java
  3. 22
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/exceptions/PeerDisconnectedException.java
  4. 5
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/RawMessage.java
  5. 187
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramerTest.java
  6. 38
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/testhelpers/NettyMocks.java
  7. 45
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/testhelpers/SubProtocolMock.java

@ -14,11 +14,14 @@ package tech.pegasys.pantheon.ethereum.p2p.netty;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.BreachOfProtocolException;
import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.IncompatiblePeerException;
import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.PeerDisconnectedException;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.Framer;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.FramingException;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.HelloMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.WireMessageCodes;
@ -73,7 +76,9 @@ final class DeFramer extends ByteToMessageDecoder {
MessageData message;
while ((message = framer.deframe(in)) != null) {
if (!hellosExchanged && message.getCode() == WireMessageCodes.HELLO) {
if (hellosExchanged) {
out.add(message);
} else if (message.getCode() == WireMessageCodes.HELLO) {
hellosExchanged = true;
// Decode first hello and use the payload to modify pipeline
final PeerInfo peerInfo;
@ -103,7 +108,6 @@ final class DeFramer extends ByteToMessageDecoder {
connectFuture.completeExceptionally(
new IncompatiblePeerException("No shared capabilities"));
connection.disconnect(DisconnectReason.USELESS_PEER);
return;
}
// Setup next stage
@ -116,8 +120,25 @@ final class DeFramer extends ByteToMessageDecoder {
new ApiHandler(capabilityMultiplexer, connection, callbacks, waitingForPong),
new MessageFramer(capabilityMultiplexer, framer));
connectFuture.complete(connection);
} else if (message.getCode() == WireMessageCodes.DISCONNECT) {
DisconnectMessage disconnectMessage = DisconnectMessage.readFrom(message);
LOG.debug(
"Peer disconnected before sending HELLO. Reason: " + disconnectMessage.getReason());
ctx.close();
connectFuture.completeExceptionally(
new PeerDisconnectedException(disconnectMessage.getReason()));
} else {
out.add(message);
// Unexpected message - disconnect
LOG.debug(
"Message received before HELLO's exchanged, disconnecting. Code: {}, Data: {}",
message.getCode(),
message.getData().toString());
ctx.writeAndFlush(
new OutboundMessage(
null, DisconnectMessage.create(DisconnectReason.BREACH_OF_PROTOCOL)))
.addListener((f) -> ctx.close());
connectFuture.completeExceptionally(
new BreachOfProtocolException("Message received before HELLO's exchanged"));
}
}
}

@ -0,0 +1,20 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.netty.exceptions;
public class BreachOfProtocolException extends RuntimeException {
public BreachOfProtocolException(final String message) {
super(message);
}
}

@ -0,0 +1,22 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.netty.exceptions;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
public class PeerDisconnectedException extends RuntimeException {
public PeerDisconnectedException(final DisconnectReason reason) {
super("Peer disconnected for reason: " + reason.toString());
}
}

@ -27,4 +27,9 @@ public final class RawMessage extends AbstractMessageData {
public int getCode() {
return code;
}
@Override
public String toString() {
return "RawMessage{" + "code=" + code + ", data=" + data + '}';
}
}

@ -12,42 +12,100 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.netty;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.BreachOfProtocolException;
import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.IncompatiblePeerException;
import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.PeerDisconnectedException;
import tech.pegasys.pantheon.ethereum.p2p.netty.testhelpers.NettyMocks;
import tech.pegasys.pantheon.ethereum.p2p.netty.testhelpers.SubProtocolMock;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.Framer;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.FramingException;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.HelloMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.PingMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.WireMessageCodes;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.DecoderException;
import io.netty.util.concurrent.ScheduledFuture;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class DeFramerTest {
private final ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
private final Channel channel = mock(Channel.class);
private final ChannelPipeline pipeline = mock(ChannelPipeline.class);
private final EventLoop eventLoop = mock(EventLoop.class);
private final Framer framer = mock(Framer.class);
private final Callbacks callbacks = mock(Callbacks.class);
private final PeerConnection peerConnection = mock(PeerConnection.class);
private final CompletableFuture<PeerConnection> connectFuture = new CompletableFuture<>();
private final PeerInfo peerInfo =
new PeerInfo(
5,
"abc",
Arrays.asList(Capability.create("eth", 63)),
0,
BytesValue.fromHexString("0x01"));
private final DeFramer deFramer =
new DeFramer(
framer,
Collections.emptyList(),
new PeerInfo(5, "abc", Collections.emptyList(), 0, BytesValue.fromHexString("0x01")),
Arrays.asList(SubProtocolMock.create("eth")),
peerInfo,
callbacks,
connectFuture,
NoOpMetricsSystem.NO_OP_LABELLED_3_COUNTER);
@Before
@SuppressWarnings("unchecked")
public void setup() {
when(ctx.channel()).thenReturn(channel);
when(channel.pipeline()).thenReturn(pipeline);
when(pipeline.addLast(any())).thenReturn(pipeline);
when(pipeline.addFirst(any())).thenReturn(pipeline);
when(channel.eventLoop()).thenReturn(eventLoop);
when(eventLoop.schedule(any(Callable.class), anyLong(), any()))
.thenReturn(mock(ScheduledFuture.class));
}
@Test
public void shouldDisconnectForBreachOfProtocolWhenFramingExceptionThrown() throws Exception {
public void exceptionCaught_shouldDisconnectForBreachOfProtocolWhenFramingExceptionThrown()
throws Exception {
connectFuture.complete(peerConnection);
deFramer.exceptionCaught(ctx, new DecoderException(new FramingException("Test")));
@ -56,7 +114,8 @@ public class DeFramerTest {
}
@Test
public void shouldHandleFramingExceptionWhenFutureCompletedExceptionally() throws Exception {
public void exceptionCaught_shouldHandleFramingExceptionWhenFutureCompletedExceptionally()
throws Exception {
connectFuture.completeExceptionally(new Exception());
deFramer.exceptionCaught(ctx, new DecoderException(new FramingException("Test")));
@ -65,11 +124,127 @@ public class DeFramerTest {
}
@Test
public void shouldHandleGenericExceptionWhenFutureCompletedExceptionally() throws Exception {
public void exceptionCaught_shouldHandleGenericExceptionWhenFutureCompletedExceptionally()
throws Exception {
connectFuture.completeExceptionally(new Exception());
deFramer.exceptionCaught(ctx, new DecoderException(new RuntimeException("Test")));
verify(ctx).close();
}
@Test
public void decode_handlesHello() throws ExecutionException, InterruptedException {
ChannelFuture future = NettyMocks.channelFuture(false);
when(channel.closeFuture()).thenReturn(future);
PeerInfo remotePeerInfo =
new PeerInfo(
peerInfo.getVersion(),
peerInfo.getClientId(),
peerInfo.getCapabilities(),
peerInfo.getPort(),
Peer.randomId());
HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().extractArray());
when(framer.deframe(eq(data)))
.thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData()))
.thenReturn(null);
List<Object> out = new ArrayList<>();
deFramer.decode(ctx, data, out);
assertThat(connectFuture).isDone();
assertThat(connectFuture).isNotCompletedExceptionally();
PeerConnection peerConnection = connectFuture.get();
assertThat(peerConnection.getPeer()).isEqualTo(remotePeerInfo);
assertThat(out).isEmpty();
// Next phase of pipeline should be setup
verify(pipeline, times(1)).addLast(any());
// Next message should be pushed out
PingMessage nextMessage = PingMessage.get();
ByteBuf nextData = Unpooled.wrappedBuffer(nextMessage.getData().extractArray());
when(framer.deframe(eq(nextData)))
.thenReturn(new RawMessage(nextMessage.getCode(), nextMessage.getData()))
.thenReturn(null);
verify(pipeline, times(1)).addLast(any());
deFramer.decode(ctx, nextData, out);
assertThat(out.size()).isEqualTo(1);
}
@Test
public void decode_handlesNoSharedCaps() throws ExecutionException, InterruptedException {
ChannelFuture future = NettyMocks.channelFuture(false);
when(channel.closeFuture()).thenReturn(future);
PeerInfo remotePeerInfo =
new PeerInfo(
peerInfo.getVersion(),
peerInfo.getClientId(),
Arrays.asList(Capability.create("eth", 254)),
peerInfo.getPort(),
Peer.randomId());
HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().extractArray());
when(framer.deframe(eq(data)))
.thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData()))
.thenReturn(null);
List<Object> out = new ArrayList<>();
deFramer.decode(ctx, data, out);
assertThat(connectFuture).isDone();
assertThat(connectFuture).isCompletedExceptionally();
assertThatThrownBy(connectFuture::get).hasCauseInstanceOf(IncompatiblePeerException.class);
assertThat(out).isEmpty();
// Next phase of pipeline should be setup
verify(pipeline, times(1)).addLast(any());
}
@Test
public void decode_shouldHandleImmediateDisconnectMessage() {
DisconnectMessage disconnectMessage = DisconnectMessage.create(DisconnectReason.TOO_MANY_PEERS);
ByteBuf disconnectData = Unpooled.wrappedBuffer(disconnectMessage.getData().extractArray());
when(framer.deframe(eq(disconnectData)))
.thenReturn(new RawMessage(disconnectMessage.getCode(), disconnectMessage.getData()))
.thenReturn(null);
List<Object> out = new ArrayList<>();
deFramer.decode(ctx, disconnectData, out);
assertThat(connectFuture).isDone();
assertThatThrownBy(connectFuture::get)
.hasCauseInstanceOf(PeerDisconnectedException.class)
.hasMessageContaining(disconnectMessage.getReason().toString());
verify(ctx).close();
assertThat(out).isEmpty();
}
@Test
public void decode_shouldHandleInvalidMessage() {
MessageData messageData = PingMessage.get();
ByteBuf data = Unpooled.wrappedBuffer(messageData.getData().extractArray());
when(framer.deframe(eq(data)))
.thenReturn(new RawMessage(messageData.getCode(), messageData.getData()))
.thenReturn(null);
ChannelFuture future = NettyMocks.channelFuture(true);
when(ctx.writeAndFlush(any())).thenReturn(future);
List<Object> out = new ArrayList<>();
deFramer.decode(ctx, data, out);
ArgumentCaptor<Object> outboundMessageArgumentCaptor =
ArgumentCaptor.forClass(OutboundMessage.class);
verify(ctx, times(1)).writeAndFlush(outboundMessageArgumentCaptor.capture());
OutboundMessage outboundMessage = (OutboundMessage) outboundMessageArgumentCaptor.getValue();
assertThat(outboundMessage.getCapability()).isNull();
MessageData outboundMessageData = outboundMessage.getData();
assertThat(outboundMessageData.getCode()).isEqualTo(WireMessageCodes.DISCONNECT);
assertThat(DisconnectMessage.readFrom(outboundMessageData).getReason())
.isEqualTo(DisconnectReason.BREACH_OF_PROTOCOL);
assertThat(connectFuture).isDone();
assertThatThrownBy(connectFuture::get).hasCauseInstanceOf(BreachOfProtocolException.class);
verify(ctx).close();
assertThat(out).isEmpty();
}
}

@ -0,0 +1,38 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.netty.testhelpers;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
public class NettyMocks {
public static ChannelFuture channelFuture(final boolean completeImmediately) {
ChannelFuture channelFuture = mock(ChannelFuture.class);
when(channelFuture.addListener(any()))
.then(
(invocation) -> {
if (completeImmediately) {
GenericFutureListener<Future<?>> listener = invocation.getArgument(0);
listener.operationComplete(mock(Future.class));
}
return channelFuture;
});
return channelFuture;
}
}

@ -0,0 +1,45 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.netty.testhelpers;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
public class SubProtocolMock {
public static SubProtocol create() {
return create("eth");
}
public static SubProtocol create(final String name) {
return new SubProtocol() {
@Override
public String getName() {
return name;
}
@Override
public int messageSpace(final int protocolVersion) {
return 8;
}
@Override
public boolean isValidMessageCode(final int protocolVersion, final int code) {
return true;
}
@Override
public String messageName(final int protocolVersion, final int code) {
return INVALID_MESSAGE_NAME;
}
};
}
}
Loading…
Cancel
Save