diff --git a/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/NetworkingOptions.java b/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/NetworkingOptions.java index bb0045d612..a737883f74 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/NetworkingOptions.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/NetworkingOptions.java @@ -85,7 +85,7 @@ public class NetworkingOptions implements CLIOptions { @Override public NetworkingConfiguration toDomainObject() { - NetworkingConfiguration config = NetworkingConfiguration.create(); + final NetworkingConfiguration config = NetworkingConfiguration.create(); config.setCheckMaintainedConnectionsFrequency(checkMaintainedConnectionsFrequencySec); config.setInitiateConnectionsFrequency(initiateConnectionsFrequencySec); config.setDnsDiscoveryServerOverride(dnsDiscoveryServerOverride); @@ -95,7 +95,7 @@ public class NetworkingOptions implements CLIOptions { @Override public List getCLIOptions() { - List retval = + final List retval = Arrays.asList( CHECK_MAINTAINED_CONNECTIONS_FREQUENCY_FLAG, OptionParser.format(checkMaintainedConnectionsFrequencySec), diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index 2710cc1879..f065e0b141 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -72,6 +72,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { private final BlockBroadcaster blockBroadcaster; private final List peerValidators; private final Optional mergePeerFilter; + private final int maxMessageSize; public EthProtocolManager( final Blockchain blockchain, @@ -101,6 +102,8 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { this.ethMessages = ethMessages; this.ethContext = ethContext; + this.maxMessageSize = ethereumWireProtocolConfiguration.getMaxMessageSize(); + this.blockBroadcaster = new BlockBroadcaster(ethContext); supportedCapabilities = calculateCapabilities(fastSyncEnabled); @@ -278,6 +281,14 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { return; } + if (messageData.getSize() > this.maxMessageSize) { + LOG.debug( + "Peer {} sent a message with size {}, larger than the max message size {}", + ethPeer, + messageData.getSize(), + this.maxMessageSize); + } + // This will handle responses ethPeers.dispatchMessage(ethPeer, ethMessage, getSupportedProtocol()); diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java index 96937a5130..df1f6df077 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java @@ -179,6 +179,10 @@ public abstract class PeerDiscoveryAgent { } } + public Optional getPeerDiscoveryController() { + return controller; + } + public void updateNodeRecord() { if (!config.isActive()) { return; diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java index 9ed33d51b4..e6103d2ba4 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java @@ -315,7 +315,7 @@ public class PeerDiscoveryController { .ifPresent( interaction -> { bondingPeers.invalidate(peer.getId()); - addToPeerTable(peer); + addBondedPeerToPeerTable(peer); recursivePeerRefreshState.onBondingComplete(peer); }); break; @@ -369,16 +369,11 @@ public class PeerDiscoveryController { .collect(Collectors.toList()); } - private boolean addToPeerTable(final DiscoveryPeer peer) { + private boolean addBondedPeerToPeerTable(final DiscoveryPeer peer) { if (!peerPermissions.isAllowedInPeerTable(peer)) { return false; } - final PeerTable.AddResult result = peerTable.tryAdd(peer); - if (result.getOutcome() == PeerTable.AddResult.AddOutcome.SELF) { - return false; - } - // Reset the last seen timestamp. final long now = System.currentTimeMillis(); if (peer.getFirstDiscovered() == 0) { @@ -391,6 +386,16 @@ public class PeerDiscoveryController { notifyPeerBonded(peer, now); } + return addToPeerTable(peer); + } + + public boolean addToPeerTable(final DiscoveryPeer peer) { + final PeerTable.AddResult result = peerTable.tryAdd(peer); + + if (result.getOutcome() == PeerTable.AddResult.AddOutcome.SELF) { + return false; + } + if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) { // Bump peer. peerTable.tryEvict(peer); @@ -584,7 +589,7 @@ public class PeerDiscoveryController { // 16 + 4 + 4 + 64 = 88 bytes // 88 * 13 = 1144 bytes // To fit under 1280 bytes, we must return just 13 peers maximum. - final List peers = peerTable.nearestPeers(packetData.getTarget(), 13); + final List peers = peerTable.nearestBondedPeers(packetData.getTarget(), 13); final PacketData data = NeighborsPacketData.create(peers); sendPacket(sender, PacketType.NEIGHBORS, data); } @@ -636,7 +641,7 @@ public class PeerDiscoveryController { peerTable.get(peer).filter(known -> known.discoveryEndpointMatches(peer)); DiscoveryPeer resolvedPeer = maybeKnownPeer.orElse(peer); if (maybeKnownPeer.isEmpty()) { - DiscoveryPeer bondingPeer = bondingPeers.getIfPresent(peer.getId()); + final DiscoveryPeer bondingPeer = bondingPeers.getIfPresent(peer.getId()); if (bondingPeer != null) { resolvedPeer = bondingPeer; } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerRequirement.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerRequirement.java index 2658ebbab6..9da923bbbc 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerRequirement.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerRequirement.java @@ -25,7 +25,7 @@ public interface PeerRequirement { static PeerRequirement combine(final Collection peerRequirements) { return () -> { - for (PeerRequirement peerRequirement : peerRequirements) { + for (final PeerRequirement peerRequirement : peerRequirements) { if (!peerRequirement.hasSufficientPeers()) { return false; } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java index 48fd3af294..dee6e0cf99 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java @@ -182,6 +182,24 @@ public class PeerTable { this.idBloom = bf; } + /** + * Returns the limit peers (at most) bonded closest to the provided target, based on + * the XOR distance between the keccak-256 hash of the ID and the keccak-256 hash of the target. + * + * @param target The target node ID. + * @param limit The amount of results to return. + * @return The limit closest peers, at most. + */ + public List nearestBondedPeers(final Bytes target, final int limit) { + final Bytes keccak256 = Hash.keccak256(target); + return streamAllPeers() + .filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDED) + .sorted( + comparingInt((peer) -> PeerDistanceCalculator.distance(peer.keccak256(), keccak256))) + .limit(limit) + .collect(toList()); + } + /** * Returns the limit peers (at most) closest to the provided target, based on the XOR * distance between the keccak-256 hash of the ID and the keccak-256 hash of the target. @@ -193,7 +211,6 @@ public class PeerTable { public List nearestPeers(final Bytes target, final int limit) { final Bytes keccak256 = Hash.keccak256(target); return streamAllPeers() - .filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDED) .sorted( comparingInt((peer) -> PeerDistanceCalculator.distance(peer.keccak256(), keccak256))) .limit(limit) diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PingPacketData.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PingPacketData.java index dba2cafd8e..88c0e4ac70 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PingPacketData.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PingPacketData.java @@ -98,9 +98,9 @@ public class PingPacketData implements PacketData { if (!in.isEndOfCurrentList()) { try { enrSeq = UInt64.valueOf(in.readBigIntegerScalar()); - LOG.debug("read PING enr as long scalar"); + LOG.trace("read PING enr as long scalar"); } catch (MalformedRLPInputException malformed) { - LOG.debug("failed to read PING enr as scalar, trying to read bytes instead"); + LOG.trace("failed to read PING enr as scalar, trying to read bytes instead"); enrSeq = UInt64.fromBytes(in.readBytes()); } } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PongPacketData.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PongPacketData.java index 80ed430242..dd832c1703 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PongPacketData.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PongPacketData.java @@ -63,9 +63,9 @@ public class PongPacketData implements PacketData { if (!in.isEndOfCurrentList()) { try { enrSeq = UInt64.valueOf(in.readBigIntegerScalar()); - LOG.debug("read PONG enr from scalar"); - } catch (MalformedRLPInputException malformed) { - LOG.debug("failed to read PONG enr from scalar, trying as byte array"); + LOG.trace("read PONG enr from scalar"); + } catch (final MalformedRLPInputException malformed) { + LOG.trace("failed to read PONG enr from scalar, trying as byte array"); enrSeq = UInt64.fromBytes(in.readBytes()); } } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java index 62105b2686..dbbe594d38 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java @@ -93,6 +93,7 @@ public class RecursivePeerRefreshState { } private void addInitialPeers(final List initialPeers) { + LOG.debug("INITIAL PEERS: {}", initialPeers); this.initialPeers = initialPeers; for (final DiscoveryPeer peer : initialPeers) { final MetadataPeer iterationParticipant = diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java index afbf787283..2efdb8cbc0 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java @@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryAgent; import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerBondedEvent; import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus; import org.hyperledger.besu.ethereum.p2p.discovery.VertxPeerDiscoveryAgent; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController; import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeerPrivileges; import org.hyperledger.besu.ethereum.p2p.peers.EnodeURLImpl; import org.hyperledger.besu.ethereum.p2p.peers.LocalNode; @@ -56,7 +57,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Optional; @@ -67,7 +67,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -149,8 +148,6 @@ public class DefaultP2PNetwork implements P2PNetwork { private final Duration shutdownTimeout = Duration.ofMinutes(1); private DNSDaemon dnsDaemon; - @VisibleForTesting final AtomicReference> dnsPeers = new AtomicReference<>(); - /** * Creates a peer networking service for production purposes. * @@ -215,6 +212,8 @@ public class DefaultP2PNetwork implements P2PNetwork { Optional.ofNullable(config.getDiscovery().getDNSDiscoveryURL()) .ifPresent( disco -> { + // These lists are updated every 12h + // We retrieve the list every 30 minutes (1800000 msec) LOG.info("Starting DNS discovery with URL {}", disco); config .getDnsDiscoveryServerOverride() @@ -228,7 +227,7 @@ public class DefaultP2PNetwork implements P2PNetwork { disco, createDaemonListener(), 0L, - 60000L, + 1800000L, config.getDnsDiscoveryServerOverride().orElse(null)); dnsDaemon.start(); }); @@ -347,11 +346,16 @@ public class DefaultP2PNetwork implements P2PNetwork { .build(); final DiscoveryPeer peer = DiscoveryPeer.fromEnode(enodeURL); peers.add(peer); - rlpxAgent.connect(peer); } - // only replace dnsPeers if the lookup was successful: if (!peers.isEmpty()) { - dnsPeers.set(peers); + final Optional peerDiscoveryController = + peerDiscoveryAgent.getPeerDiscoveryController(); + if (peerDiscoveryController.isPresent()) { + final PeerDiscoveryController controller = peerDiscoveryController.get(); + LOG.debug("Adding {} DNS peers to PeerTable", peers.size()); + peers.forEach(controller::addToPeerTable); + peers.forEach(rlpxAgent::connect); + } } }; } @@ -383,11 +387,6 @@ public class DefaultP2PNetwork implements P2PNetwork { @Override public Stream streamDiscoveredPeers() { - final List peers = dnsPeers.get(); - if (peers != null) { - Collections.shuffle(peers); - return Stream.concat(peerDiscoveryAgent.streamDiscoveredPeers(), peers.stream()); - } return peerDiscoveryAgent.streamDiscoveredPeers(); } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java index 5691c63bdb..38c85dbffb 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java @@ -222,8 +222,7 @@ public class RlpxAgent { // Check peer is valid final EnodeURL enode = peer.getEnodeURL(); if (!enode.isListening()) { - final String errorMsg = - "Attempt to connect to peer with no listening port: " + enode.toString(); + final String errorMsg = "Attempt to connect to peer with no listening port: " + enode; LOG.warn(errorMsg); return CompletableFuture.failedFuture((new IllegalArgumentException(errorMsg))); } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/ApiHandler.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/ApiHandler.java index 39dbfb2d8b..c5793df074 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/ApiHandler.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/ApiHandler.java @@ -63,7 +63,7 @@ final class ApiHandler extends SimpleChannelInboundHandler { if (demultiplexed.getCapability() == null) { switch (message.getCode()) { case WireMessageCodes.PING: - LOG.debug("Received Wire PING"); + LOG.trace("Received Wire PING"); try { connection.send(null, PongMessage.get()); } catch (final PeerConnection.PeerNotConnected peerNotConnected) { @@ -71,7 +71,7 @@ final class ApiHandler extends SimpleChannelInboundHandler { } break; case WireMessageCodes.PONG: - LOG.debug("Received Wire PONG"); + LOG.trace("Received Wire PONG"); waitingForPong.set(false); break; case WireMessageCodes.DISCONNECT: diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetworkTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetworkTest.java index f9847c816b..a100257791 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetworkTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetworkTest.java @@ -53,7 +53,6 @@ import org.hyperledger.besu.nat.NatService; import org.hyperledger.besu.nat.core.domain.NetworkProtocol; import org.hyperledger.besu.nat.upnp.UpnpNatManager; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -65,8 +64,6 @@ import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.crypto.SECP256K1; -import org.apache.tuweni.devp2p.EthereumNodeRecord; -import org.apache.tuweni.discovery.DNSDaemonListener; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Test; @@ -410,35 +407,6 @@ public final class DefaultP2PNetworkTest { verify(dnsConfig, times(2)).getDnsDiscoveryServerOverride(); } - @Test - public void shouldNotDropDnsHostsOnEmptyLookup() { - DefaultP2PNetwork network = network(); - DNSDaemonListener listenerUnderTest = network.createDaemonListener(); - - // assert no entries prior to lookup - assertThat(network.dnsPeers.get()).isNull(); - - // simulate successful lookup of 1 peer - listenerUnderTest.newRecords( - 1, - List.of( - EthereumNodeRecord.create( - SECP256K1.KeyPair.fromSecretKey(mockKey), - 1L, - null, - null, - InetAddress.getLoopbackAddress(), - 30303, - 30303))); - assertThat(network.dnsPeers.get()).isNotEmpty(); - assertThat(network.dnsPeers.get().size()).isEqualTo(1); - - // simulate failed lookup empty list - listenerUnderTest.newRecords(2, Collections.emptyList()); - assertThat(network.dnsPeers.get()).isNotEmpty(); - assertThat(network.dnsPeers.get().size()).isEqualTo(1); - } - private DefaultP2PNetwork network() { return (DefaultP2PNetwork) builder().build(); } diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramerTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramerTest.java index a9d7ea4acc..232b33c65f 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramerTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramerTest.java @@ -167,26 +167,26 @@ public class DeFramerTest { @Test public void decode_handlesHello() throws ExecutionException, InterruptedException { - ChannelFuture future = NettyMocks.channelFuture(false); + final ChannelFuture future = NettyMocks.channelFuture(false); when(channel.closeFuture()).thenReturn(future); final Peer peer = createRemotePeer(); final PeerInfo remotePeerInfo = createPeerInfo(peer); - HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); - ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); + final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); + final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); when(framer.deframe(eq(data))) .thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData())) .thenReturn(null); - List out = new ArrayList<>(); + final List out = new ArrayList<>(); deFramer.decode(ctx, data, out); assertThat(connectFuture).isDone(); assertThat(connectFuture).isNotCompletedExceptionally(); - PeerConnection peerConnection = connectFuture.get(); + final PeerConnection peerConnection = connectFuture.get(); assertThat(peerConnection.getPeerInfo()).isEqualTo(remotePeerInfo); - EnodeURL expectedEnode = + final EnodeURL expectedEnode = EnodeURLImpl.builder() .configureFromEnode(peer.getEnodeURL()) // Discovery information is not available from peer info @@ -199,8 +199,8 @@ public class DeFramerTest { verify(pipeline, times(1)).addLast(any()); // Next message should be pushed out - PingMessage nextMessage = PingMessage.get(); - ByteBuf nextData = Unpooled.wrappedBuffer(nextMessage.getData().toArray()); + final PingMessage nextMessage = PingMessage.get(); + final ByteBuf nextData = Unpooled.wrappedBuffer(nextMessage.getData().toArray()); when(framer.deframe(eq(nextData))) .thenReturn(new RawMessage(nextMessage.getCode(), nextMessage.getData())) .thenReturn(null); @@ -212,7 +212,7 @@ public class DeFramerTest { @Test public void decode_handlesHelloFromPeerWithAdvertisedPortOf0() throws ExecutionException, InterruptedException { - ChannelFuture future = NettyMocks.channelFuture(false); + final ChannelFuture future = NettyMocks.channelFuture(false); when(channel.closeFuture()).thenReturn(future); final Peer peer = createRemotePeer(); @@ -220,17 +220,17 @@ public class DeFramerTest { new PeerInfo(p2pVersion, clientId, capabilities, 0, peer.getId()); final DeFramer deFramer = createDeFramer(null); - HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); - ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); + final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); + final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); when(framer.deframe(eq(data))) .thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData())) .thenReturn(null); - List out = new ArrayList<>(); + final List out = new ArrayList<>(); deFramer.decode(ctx, data, out); assertThat(connectFuture).isDone(); assertThat(connectFuture).isNotCompletedExceptionally(); - PeerConnection peerConnection = connectFuture.get(); + final PeerConnection peerConnection = connectFuture.get(); assertThat(peerConnection.getPeerInfo()).isEqualTo(remotePeerInfo); assertThat(out).isEmpty(); @@ -249,8 +249,8 @@ public class DeFramerTest { verify(pipeline, times(1)).addLast(any()); // Next message should be pushed out - PingMessage nextMessage = PingMessage.get(); - ByteBuf nextData = Unpooled.wrappedBuffer(nextMessage.getData().toArray()); + final PingMessage nextMessage = PingMessage.get(); + final ByteBuf nextData = Unpooled.wrappedBuffer(nextMessage.getData().toArray()); when(framer.deframe(eq(nextData))) .thenReturn(new RawMessage(nextMessage.getCode(), nextMessage.getData())) .thenReturn(null); @@ -261,7 +261,7 @@ public class DeFramerTest { @Test public void decode_handlesUnexpectedPeerId() { - ChannelFuture future = NettyMocks.channelFuture(false); + final ChannelFuture future = NettyMocks.channelFuture(false); when(channel.closeFuture()).thenReturn(future); final Peer peer = createRemotePeer(); @@ -275,12 +275,12 @@ public class DeFramerTest { mismatchedId); final DeFramer deFramer = createDeFramer(peer); - HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); - ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); + final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); + final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); when(framer.deframe(eq(data))) .thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData())) .thenReturn(null); - List out = new ArrayList<>(); + final List out = new ArrayList<>(); deFramer.decode(ctx, data, out); assertThat(connectFuture).isDone(); @@ -297,22 +297,22 @@ public class DeFramerTest { @Test public void decode_handlesNoSharedCaps() { - ChannelFuture future = NettyMocks.channelFuture(false); + final ChannelFuture future = NettyMocks.channelFuture(false); when(channel.closeFuture()).thenReturn(future); - PeerInfo remotePeerInfo = + final PeerInfo remotePeerInfo = new PeerInfo( p2pVersion, "bla", Arrays.asList(Capability.create("eth", 254)), 30303, Peer.randomId()); - HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); - ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); + final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); + final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); when(framer.deframe(eq(data))) .thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData())) .thenReturn(null); - List out = new ArrayList<>(); + final List out = new ArrayList<>(); deFramer.decode(ctx, data, out); assertThat(connectFuture).isDone(); @@ -326,12 +326,13 @@ public class DeFramerTest { @Test public void decode_shouldHandleImmediateDisconnectMessage() { - DisconnectMessage disconnectMessage = DisconnectMessage.create(DisconnectReason.TOO_MANY_PEERS); - ByteBuf disconnectData = Unpooled.wrappedBuffer(disconnectMessage.getData().toArray()); + final DisconnectMessage disconnectMessage = + DisconnectMessage.create(DisconnectReason.TOO_MANY_PEERS); + final ByteBuf disconnectData = Unpooled.wrappedBuffer(disconnectMessage.getData().toArray()); when(framer.deframe(eq(disconnectData))) .thenReturn(new RawMessage(disconnectMessage.getCode(), disconnectMessage.getData())) .thenReturn(null); - List out = new ArrayList<>(); + final List out = new ArrayList<>(); deFramer.decode(ctx, disconnectData, out); assertThat(connectFuture).isDone(); @@ -347,15 +348,15 @@ public class DeFramerTest { final Peer peer = createRemotePeer(); final PeerInfo remotePeerInfo = new PeerInfo(p2pVersion, clientId, capabilities, 0, peer.getId()); - HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); - ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); + final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); + final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); when(framer.deframe(any())) .thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData())) .thenReturn(null); when(ctx.channel().remoteAddress()).thenReturn(null); - ChannelFuture future = NettyMocks.channelFuture(true); + final ChannelFuture future = NettyMocks.channelFuture(true); when(ctx.writeAndFlush(any())).thenReturn(future); - List out = new ArrayList<>(); + final List out = new ArrayList<>(); deFramer.decode(ctx, data, out); assertThat(connectFuture).isDone(); @@ -367,22 +368,23 @@ public class DeFramerTest { @Test public void decode_shouldHandleInvalidMessage() { - MessageData messageData = PingMessage.get(); - ByteBuf data = Unpooled.wrappedBuffer(messageData.getData().toArray()); + final MessageData messageData = PingMessage.get(); + final ByteBuf data = Unpooled.wrappedBuffer(messageData.getData().toArray()); when(framer.deframe(eq(data))) .thenReturn(new RawMessage(messageData.getCode(), messageData.getData())) .thenReturn(null); - ChannelFuture future = NettyMocks.channelFuture(true); + final ChannelFuture future = NettyMocks.channelFuture(true); when(ctx.writeAndFlush(any())).thenReturn(future); - List out = new ArrayList<>(); + final List out = new ArrayList<>(); deFramer.decode(ctx, data, out); - ArgumentCaptor outboundMessageArgumentCaptor = + final ArgumentCaptor outboundMessageArgumentCaptor = ArgumentCaptor.forClass(OutboundMessage.class); verify(ctx, times(1)).writeAndFlush(outboundMessageArgumentCaptor.capture()); - OutboundMessage outboundMessage = (OutboundMessage) outboundMessageArgumentCaptor.getValue(); + final OutboundMessage outboundMessage = + (OutboundMessage) outboundMessageArgumentCaptor.getValue(); assertThat(outboundMessage.getCapability()).isNull(); - MessageData outboundMessageData = outboundMessage.getData(); + final MessageData outboundMessageData = outboundMessage.getData(); assertThat(outboundMessageData.getCode()).isEqualTo(WireMessageCodes.DISCONNECT); assertThat(DisconnectMessage.readFrom(outboundMessageData).getReason()) .isEqualTo(DisconnectReason.BREACH_OF_PROTOCOL);