DNS peers handled the same as boot nodes (#4178)

* DNS peers handled the same as boot nodes
* make sure that non bonded peers can be used as initial peers
* try to connect to DNS nodes

Signed-off-by: Stefan <stefan.pingel@consensys.net>

Co-authored-by: Justin Florentine <justin+github@florentine.us>
pull/4210/head
Stefan Pingel 2 years ago committed by GitHub
parent e1c8eb7fb0
commit 42cfff0acb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      besu/src/main/java/org/hyperledger/besu/cli/options/unstable/NetworkingOptions.java
  2. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java
  3. 4
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java
  4. 23
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java
  5. 2
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerRequirement.java
  6. 19
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java
  7. 4
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PingPacketData.java
  8. 6
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PongPacketData.java
  9. 1
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java
  10. 25
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java
  11. 3
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java
  12. 4
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/ApiHandler.java
  13. 32
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetworkTest.java
  14. 78
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramerTest.java

@ -85,7 +85,7 @@ public class NetworkingOptions implements CLIOptions<NetworkingConfiguration> {
@Override
public NetworkingConfiguration toDomainObject() {
NetworkingConfiguration config = NetworkingConfiguration.create();
final NetworkingConfiguration config = NetworkingConfiguration.create();
config.setCheckMaintainedConnectionsFrequency(checkMaintainedConnectionsFrequencySec);
config.setInitiateConnectionsFrequency(initiateConnectionsFrequencySec);
config.setDnsDiscoveryServerOverride(dnsDiscoveryServerOverride);
@ -95,7 +95,7 @@ public class NetworkingOptions implements CLIOptions<NetworkingConfiguration> {
@Override
public List<String> getCLIOptions() {
List<String> retval =
final List<String> retval =
Arrays.asList(
CHECK_MAINTAINED_CONNECTIONS_FREQUENCY_FLAG,
OptionParser.format(checkMaintainedConnectionsFrequencySec),

@ -72,6 +72,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
private final BlockBroadcaster blockBroadcaster;
private final List<PeerValidator> peerValidators;
private final Optional<MergePeerFilter> mergePeerFilter;
private final int maxMessageSize;
public EthProtocolManager(
final Blockchain blockchain,
@ -101,6 +102,8 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
this.ethMessages = ethMessages;
this.ethContext = ethContext;
this.maxMessageSize = ethereumWireProtocolConfiguration.getMaxMessageSize();
this.blockBroadcaster = new BlockBroadcaster(ethContext);
supportedCapabilities = calculateCapabilities(fastSyncEnabled);
@ -278,6 +281,14 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
return;
}
if (messageData.getSize() > this.maxMessageSize) {
LOG.debug(
"Peer {} sent a message with size {}, larger than the max message size {}",
ethPeer,
messageData.getSize(),
this.maxMessageSize);
}
// This will handle responses
ethPeers.dispatchMessage(ethPeer, ethMessage, getSupportedProtocol());

@ -179,6 +179,10 @@ public abstract class PeerDiscoveryAgent {
}
}
public Optional<PeerDiscoveryController> getPeerDiscoveryController() {
return controller;
}
public void updateNodeRecord() {
if (!config.isActive()) {
return;

@ -315,7 +315,7 @@ public class PeerDiscoveryController {
.ifPresent(
interaction -> {
bondingPeers.invalidate(peer.getId());
addToPeerTable(peer);
addBondedPeerToPeerTable(peer);
recursivePeerRefreshState.onBondingComplete(peer);
});
break;
@ -369,16 +369,11 @@ public class PeerDiscoveryController {
.collect(Collectors.toList());
}
private boolean addToPeerTable(final DiscoveryPeer peer) {
private boolean addBondedPeerToPeerTable(final DiscoveryPeer peer) {
if (!peerPermissions.isAllowedInPeerTable(peer)) {
return false;
}
final PeerTable.AddResult result = peerTable.tryAdd(peer);
if (result.getOutcome() == PeerTable.AddResult.AddOutcome.SELF) {
return false;
}
// Reset the last seen timestamp.
final long now = System.currentTimeMillis();
if (peer.getFirstDiscovered() == 0) {
@ -391,6 +386,16 @@ public class PeerDiscoveryController {
notifyPeerBonded(peer, now);
}
return addToPeerTable(peer);
}
public boolean addToPeerTable(final DiscoveryPeer peer) {
final PeerTable.AddResult result = peerTable.tryAdd(peer);
if (result.getOutcome() == PeerTable.AddResult.AddOutcome.SELF) {
return false;
}
if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) {
// Bump peer.
peerTable.tryEvict(peer);
@ -584,7 +589,7 @@ public class PeerDiscoveryController {
// 16 + 4 + 4 + 64 = 88 bytes
// 88 * 13 = 1144 bytes
// To fit under 1280 bytes, we must return just 13 peers maximum.
final List<DiscoveryPeer> peers = peerTable.nearestPeers(packetData.getTarget(), 13);
final List<DiscoveryPeer> peers = peerTable.nearestBondedPeers(packetData.getTarget(), 13);
final PacketData data = NeighborsPacketData.create(peers);
sendPacket(sender, PacketType.NEIGHBORS, data);
}
@ -636,7 +641,7 @@ public class PeerDiscoveryController {
peerTable.get(peer).filter(known -> known.discoveryEndpointMatches(peer));
DiscoveryPeer resolvedPeer = maybeKnownPeer.orElse(peer);
if (maybeKnownPeer.isEmpty()) {
DiscoveryPeer bondingPeer = bondingPeers.getIfPresent(peer.getId());
final DiscoveryPeer bondingPeer = bondingPeers.getIfPresent(peer.getId());
if (bondingPeer != null) {
resolvedPeer = bondingPeer;
}

@ -25,7 +25,7 @@ public interface PeerRequirement {
static PeerRequirement combine(final Collection<PeerRequirement> peerRequirements) {
return () -> {
for (PeerRequirement peerRequirement : peerRequirements) {
for (final PeerRequirement peerRequirement : peerRequirements) {
if (!peerRequirement.hasSufficientPeers()) {
return false;
}

@ -182,6 +182,24 @@ public class PeerTable {
this.idBloom = bf;
}
/**
* Returns the <code>limit</code> peers (at most) bonded closest to the provided target, based on
* the XOR distance between the keccak-256 hash of the ID and the keccak-256 hash of the target.
*
* @param target The target node ID.
* @param limit The amount of results to return.
* @return The <code>limit</code> closest peers, at most.
*/
public List<DiscoveryPeer> nearestBondedPeers(final Bytes target, final int limit) {
final Bytes keccak256 = Hash.keccak256(target);
return streamAllPeers()
.filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDED)
.sorted(
comparingInt((peer) -> PeerDistanceCalculator.distance(peer.keccak256(), keccak256)))
.limit(limit)
.collect(toList());
}
/**
* Returns the <code>limit</code> peers (at most) closest to the provided target, based on the XOR
* distance between the keccak-256 hash of the ID and the keccak-256 hash of the target.
@ -193,7 +211,6 @@ public class PeerTable {
public List<DiscoveryPeer> nearestPeers(final Bytes target, final int limit) {
final Bytes keccak256 = Hash.keccak256(target);
return streamAllPeers()
.filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDED)
.sorted(
comparingInt((peer) -> PeerDistanceCalculator.distance(peer.keccak256(), keccak256)))
.limit(limit)

@ -98,9 +98,9 @@ public class PingPacketData implements PacketData {
if (!in.isEndOfCurrentList()) {
try {
enrSeq = UInt64.valueOf(in.readBigIntegerScalar());
LOG.debug("read PING enr as long scalar");
LOG.trace("read PING enr as long scalar");
} catch (MalformedRLPInputException malformed) {
LOG.debug("failed to read PING enr as scalar, trying to read bytes instead");
LOG.trace("failed to read PING enr as scalar, trying to read bytes instead");
enrSeq = UInt64.fromBytes(in.readBytes());
}
}

@ -63,9 +63,9 @@ public class PongPacketData implements PacketData {
if (!in.isEndOfCurrentList()) {
try {
enrSeq = UInt64.valueOf(in.readBigIntegerScalar());
LOG.debug("read PONG enr from scalar");
} catch (MalformedRLPInputException malformed) {
LOG.debug("failed to read PONG enr from scalar, trying as byte array");
LOG.trace("read PONG enr from scalar");
} catch (final MalformedRLPInputException malformed) {
LOG.trace("failed to read PONG enr from scalar, trying as byte array");
enrSeq = UInt64.fromBytes(in.readBytes());
}
}

@ -93,6 +93,7 @@ public class RecursivePeerRefreshState {
}
private void addInitialPeers(final List<DiscoveryPeer> initialPeers) {
LOG.debug("INITIAL PEERS: {}", initialPeers);
this.initialPeers = initialPeers;
for (final DiscoveryPeer peer : initialPeers) {
final MetadataPeer iterationParticipant =

@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryAgent;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerBondedEvent;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus;
import org.hyperledger.besu.ethereum.p2p.discovery.VertxPeerDiscoveryAgent;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeerPrivileges;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURLImpl;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
@ -56,7 +57,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
@ -67,7 +67,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -149,8 +148,6 @@ public class DefaultP2PNetwork implements P2PNetwork {
private final Duration shutdownTimeout = Duration.ofMinutes(1);
private DNSDaemon dnsDaemon;
@VisibleForTesting final AtomicReference<List<DiscoveryPeer>> dnsPeers = new AtomicReference<>();
/**
* Creates a peer networking service for production purposes.
*
@ -215,6 +212,8 @@ public class DefaultP2PNetwork implements P2PNetwork {
Optional.ofNullable(config.getDiscovery().getDNSDiscoveryURL())
.ifPresent(
disco -> {
// These lists are updated every 12h
// We retrieve the list every 30 minutes (1800000 msec)
LOG.info("Starting DNS discovery with URL {}", disco);
config
.getDnsDiscoveryServerOverride()
@ -228,7 +227,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
disco,
createDaemonListener(),
0L,
60000L,
1800000L,
config.getDnsDiscoveryServerOverride().orElse(null));
dnsDaemon.start();
});
@ -347,11 +346,16 @@ public class DefaultP2PNetwork implements P2PNetwork {
.build();
final DiscoveryPeer peer = DiscoveryPeer.fromEnode(enodeURL);
peers.add(peer);
rlpxAgent.connect(peer);
}
// only replace dnsPeers if the lookup was successful:
if (!peers.isEmpty()) {
dnsPeers.set(peers);
final Optional<PeerDiscoveryController> peerDiscoveryController =
peerDiscoveryAgent.getPeerDiscoveryController();
if (peerDiscoveryController.isPresent()) {
final PeerDiscoveryController controller = peerDiscoveryController.get();
LOG.debug("Adding {} DNS peers to PeerTable", peers.size());
peers.forEach(controller::addToPeerTable);
peers.forEach(rlpxAgent::connect);
}
}
};
}
@ -383,11 +387,6 @@ public class DefaultP2PNetwork implements P2PNetwork {
@Override
public Stream<DiscoveryPeer> streamDiscoveredPeers() {
final List<DiscoveryPeer> peers = dnsPeers.get();
if (peers != null) {
Collections.shuffle(peers);
return Stream.concat(peerDiscoveryAgent.streamDiscoveredPeers(), peers.stream());
}
return peerDiscoveryAgent.streamDiscoveredPeers();
}

@ -222,8 +222,7 @@ public class RlpxAgent {
// Check peer is valid
final EnodeURL enode = peer.getEnodeURL();
if (!enode.isListening()) {
final String errorMsg =
"Attempt to connect to peer with no listening port: " + enode.toString();
final String errorMsg = "Attempt to connect to peer with no listening port: " + enode;
LOG.warn(errorMsg);
return CompletableFuture.failedFuture((new IllegalArgumentException(errorMsg)));
}

@ -63,7 +63,7 @@ final class ApiHandler extends SimpleChannelInboundHandler<MessageData> {
if (demultiplexed.getCapability() == null) {
switch (message.getCode()) {
case WireMessageCodes.PING:
LOG.debug("Received Wire PING");
LOG.trace("Received Wire PING");
try {
connection.send(null, PongMessage.get());
} catch (final PeerConnection.PeerNotConnected peerNotConnected) {
@ -71,7 +71,7 @@ final class ApiHandler extends SimpleChannelInboundHandler<MessageData> {
}
break;
case WireMessageCodes.PONG:
LOG.debug("Received Wire PONG");
LOG.trace("Received Wire PONG");
waitingForPong.set(false);
break;
case WireMessageCodes.DISCONNECT:

@ -53,7 +53,6 @@ import org.hyperledger.besu.nat.NatService;
import org.hyperledger.besu.nat.core.domain.NetworkProtocol;
import org.hyperledger.besu.nat.upnp.UpnpNatManager;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -65,8 +64,6 @@ import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.crypto.SECP256K1;
import org.apache.tuweni.devp2p.EthereumNodeRecord;
import org.apache.tuweni.discovery.DNSDaemonListener;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
@ -410,35 +407,6 @@ public final class DefaultP2PNetworkTest {
verify(dnsConfig, times(2)).getDnsDiscoveryServerOverride();
}
@Test
public void shouldNotDropDnsHostsOnEmptyLookup() {
DefaultP2PNetwork network = network();
DNSDaemonListener listenerUnderTest = network.createDaemonListener();
// assert no entries prior to lookup
assertThat(network.dnsPeers.get()).isNull();
// simulate successful lookup of 1 peer
listenerUnderTest.newRecords(
1,
List.of(
EthereumNodeRecord.create(
SECP256K1.KeyPair.fromSecretKey(mockKey),
1L,
null,
null,
InetAddress.getLoopbackAddress(),
30303,
30303)));
assertThat(network.dnsPeers.get()).isNotEmpty();
assertThat(network.dnsPeers.get().size()).isEqualTo(1);
// simulate failed lookup empty list
listenerUnderTest.newRecords(2, Collections.emptyList());
assertThat(network.dnsPeers.get()).isNotEmpty();
assertThat(network.dnsPeers.get().size()).isEqualTo(1);
}
private DefaultP2PNetwork network() {
return (DefaultP2PNetwork) builder().build();
}

@ -167,26 +167,26 @@ public class DeFramerTest {
@Test
public void decode_handlesHello() throws ExecutionException, InterruptedException {
ChannelFuture future = NettyMocks.channelFuture(false);
final ChannelFuture future = NettyMocks.channelFuture(false);
when(channel.closeFuture()).thenReturn(future);
final Peer peer = createRemotePeer();
final PeerInfo remotePeerInfo = createPeerInfo(peer);
HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
when(framer.deframe(eq(data)))
.thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData()))
.thenReturn(null);
List<Object> out = new ArrayList<>();
final List<Object> out = new ArrayList<>();
deFramer.decode(ctx, data, out);
assertThat(connectFuture).isDone();
assertThat(connectFuture).isNotCompletedExceptionally();
PeerConnection peerConnection = connectFuture.get();
final PeerConnection peerConnection = connectFuture.get();
assertThat(peerConnection.getPeerInfo()).isEqualTo(remotePeerInfo);
EnodeURL expectedEnode =
final EnodeURL expectedEnode =
EnodeURLImpl.builder()
.configureFromEnode(peer.getEnodeURL())
// Discovery information is not available from peer info
@ -199,8 +199,8 @@ public class DeFramerTest {
verify(pipeline, times(1)).addLast(any());
// Next message should be pushed out
PingMessage nextMessage = PingMessage.get();
ByteBuf nextData = Unpooled.wrappedBuffer(nextMessage.getData().toArray());
final PingMessage nextMessage = PingMessage.get();
final ByteBuf nextData = Unpooled.wrappedBuffer(nextMessage.getData().toArray());
when(framer.deframe(eq(nextData)))
.thenReturn(new RawMessage(nextMessage.getCode(), nextMessage.getData()))
.thenReturn(null);
@ -212,7 +212,7 @@ public class DeFramerTest {
@Test
public void decode_handlesHelloFromPeerWithAdvertisedPortOf0()
throws ExecutionException, InterruptedException {
ChannelFuture future = NettyMocks.channelFuture(false);
final ChannelFuture future = NettyMocks.channelFuture(false);
when(channel.closeFuture()).thenReturn(future);
final Peer peer = createRemotePeer();
@ -220,17 +220,17 @@ public class DeFramerTest {
new PeerInfo(p2pVersion, clientId, capabilities, 0, peer.getId());
final DeFramer deFramer = createDeFramer(null);
HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
when(framer.deframe(eq(data)))
.thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData()))
.thenReturn(null);
List<Object> out = new ArrayList<>();
final List<Object> out = new ArrayList<>();
deFramer.decode(ctx, data, out);
assertThat(connectFuture).isDone();
assertThat(connectFuture).isNotCompletedExceptionally();
PeerConnection peerConnection = connectFuture.get();
final PeerConnection peerConnection = connectFuture.get();
assertThat(peerConnection.getPeerInfo()).isEqualTo(remotePeerInfo);
assertThat(out).isEmpty();
@ -249,8 +249,8 @@ public class DeFramerTest {
verify(pipeline, times(1)).addLast(any());
// Next message should be pushed out
PingMessage nextMessage = PingMessage.get();
ByteBuf nextData = Unpooled.wrappedBuffer(nextMessage.getData().toArray());
final PingMessage nextMessage = PingMessage.get();
final ByteBuf nextData = Unpooled.wrappedBuffer(nextMessage.getData().toArray());
when(framer.deframe(eq(nextData)))
.thenReturn(new RawMessage(nextMessage.getCode(), nextMessage.getData()))
.thenReturn(null);
@ -261,7 +261,7 @@ public class DeFramerTest {
@Test
public void decode_handlesUnexpectedPeerId() {
ChannelFuture future = NettyMocks.channelFuture(false);
final ChannelFuture future = NettyMocks.channelFuture(false);
when(channel.closeFuture()).thenReturn(future);
final Peer peer = createRemotePeer();
@ -275,12 +275,12 @@ public class DeFramerTest {
mismatchedId);
final DeFramer deFramer = createDeFramer(peer);
HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
when(framer.deframe(eq(data)))
.thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData()))
.thenReturn(null);
List<Object> out = new ArrayList<>();
final List<Object> out = new ArrayList<>();
deFramer.decode(ctx, data, out);
assertThat(connectFuture).isDone();
@ -297,22 +297,22 @@ public class DeFramerTest {
@Test
public void decode_handlesNoSharedCaps() {
ChannelFuture future = NettyMocks.channelFuture(false);
final ChannelFuture future = NettyMocks.channelFuture(false);
when(channel.closeFuture()).thenReturn(future);
PeerInfo remotePeerInfo =
final PeerInfo remotePeerInfo =
new PeerInfo(
p2pVersion,
"bla",
Arrays.asList(Capability.create("eth", 254)),
30303,
Peer.randomId());
HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
when(framer.deframe(eq(data)))
.thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData()))
.thenReturn(null);
List<Object> out = new ArrayList<>();
final List<Object> out = new ArrayList<>();
deFramer.decode(ctx, data, out);
assertThat(connectFuture).isDone();
@ -326,12 +326,13 @@ public class DeFramerTest {
@Test
public void decode_shouldHandleImmediateDisconnectMessage() {
DisconnectMessage disconnectMessage = DisconnectMessage.create(DisconnectReason.TOO_MANY_PEERS);
ByteBuf disconnectData = Unpooled.wrappedBuffer(disconnectMessage.getData().toArray());
final DisconnectMessage disconnectMessage =
DisconnectMessage.create(DisconnectReason.TOO_MANY_PEERS);
final ByteBuf disconnectData = Unpooled.wrappedBuffer(disconnectMessage.getData().toArray());
when(framer.deframe(eq(disconnectData)))
.thenReturn(new RawMessage(disconnectMessage.getCode(), disconnectMessage.getData()))
.thenReturn(null);
List<Object> out = new ArrayList<>();
final List<Object> out = new ArrayList<>();
deFramer.decode(ctx, disconnectData, out);
assertThat(connectFuture).isDone();
@ -347,15 +348,15 @@ public class DeFramerTest {
final Peer peer = createRemotePeer();
final PeerInfo remotePeerInfo =
new PeerInfo(p2pVersion, clientId, capabilities, 0, peer.getId());
HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo);
final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray());
when(framer.deframe(any()))
.thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData()))
.thenReturn(null);
when(ctx.channel().remoteAddress()).thenReturn(null);
ChannelFuture future = NettyMocks.channelFuture(true);
final ChannelFuture future = NettyMocks.channelFuture(true);
when(ctx.writeAndFlush(any())).thenReturn(future);
List<Object> out = new ArrayList<>();
final List<Object> out = new ArrayList<>();
deFramer.decode(ctx, data, out);
assertThat(connectFuture).isDone();
@ -367,22 +368,23 @@ public class DeFramerTest {
@Test
public void decode_shouldHandleInvalidMessage() {
MessageData messageData = PingMessage.get();
ByteBuf data = Unpooled.wrappedBuffer(messageData.getData().toArray());
final MessageData messageData = PingMessage.get();
final ByteBuf data = Unpooled.wrappedBuffer(messageData.getData().toArray());
when(framer.deframe(eq(data)))
.thenReturn(new RawMessage(messageData.getCode(), messageData.getData()))
.thenReturn(null);
ChannelFuture future = NettyMocks.channelFuture(true);
final ChannelFuture future = NettyMocks.channelFuture(true);
when(ctx.writeAndFlush(any())).thenReturn(future);
List<Object> out = new ArrayList<>();
final List<Object> out = new ArrayList<>();
deFramer.decode(ctx, data, out);
ArgumentCaptor<Object> outboundMessageArgumentCaptor =
final ArgumentCaptor<Object> outboundMessageArgumentCaptor =
ArgumentCaptor.forClass(OutboundMessage.class);
verify(ctx, times(1)).writeAndFlush(outboundMessageArgumentCaptor.capture());
OutboundMessage outboundMessage = (OutboundMessage) outboundMessageArgumentCaptor.getValue();
final OutboundMessage outboundMessage =
(OutboundMessage) outboundMessageArgumentCaptor.getValue();
assertThat(outboundMessage.getCapability()).isNull();
MessageData outboundMessageData = outboundMessage.getData();
final MessageData outboundMessageData = outboundMessage.getData();
assertThat(outboundMessageData.getCode()).isEqualTo(WireMessageCodes.DISCONNECT);
assertThat(DisconnectMessage.readFrom(outboundMessageData).getReason())
.isEqualTo(DisconnectReason.BREACH_OF_PROTOCOL);

Loading…
Cancel
Save