Metrics for messages (#369)

Metrics for Inbound and outbound messages, with capability, name, and message code.
Danno Ferrin 6 years ago committed by GitHub
parent ddca7c4f52
commit fd8201b4b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/protocol/IbftSubProtocol.java
  2. 34
      consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64Protocol.java
  3. 28
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/EthProtocol.java
  4. 1
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java
  5. 31
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NetworkRunner.java
  6. 11
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/AbstractHandshakeHandler.java
  7. 10
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramer.java
  8. 13
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/HandshakeHandlerInbound.java
  9. 13
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/HandshakeHandlerOutbound.java
  10. 26
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyP2PNetwork.java
  11. 25
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnection.java
  12. 13
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/SubProtocol.java
  13. 15
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/WireMessageCodes.java
  14. 10
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/NettyP2PNetworkTest.java
  15. 5
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/CapabilityMultiplexerTest.java
  16. 4
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramerTest.java
  17. 5
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnectionTest.java
  18. 3
      metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java
  19. 1
      metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java
  20. 1
      pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java

@ -51,4 +51,22 @@ public class IbftSubProtocol implements SubProtocol {
return false; return false;
} }
} }
@Override
public String messageName(final int protocolVersion, final int code) {
switch (code) {
case IbftV2.PROPOSAL:
return "Proposal";
case IbftV2.PREPARE:
return "Prepare";
case IbftV2.COMMIT:
return "Commit";
case IbftV2.ROUND_CHANGE:
return "RoundChange";
case IbftV2.NEW_ROUND:
return "NewRound";
default:
return INVALID_MESSAGE_NAME;
}
}
} }

@ -69,6 +69,40 @@ public class Istanbul64Protocol implements SubProtocol {
return false; return false;
} }
@Override
public String messageName(final int protocolVersion, final int code) {
switch (code) {
case EthPV62.STATUS:
return "Status";
case EthPV62.NEW_BLOCK_HASHES:
return "NewBlockHashes";
case EthPV62.TRANSACTIONS:
return "Transactions";
case EthPV62.GET_BLOCK_HEADERS:
return "GetBlockHeaders";
case EthPV62.BLOCK_HEADERS:
return "BlockHeaders";
case EthPV62.GET_BLOCK_BODIES:
return "GetBlockBodies";
case EthPV62.BLOCK_BODIES:
return "BlockBodies";
case EthPV62.NEW_BLOCK:
return "NewBlock";
case EthPV63.GET_NODE_DATA:
return "GetNodeData";
case EthPV63.NODE_DATA:
return "NodeData";
case EthPV63.GET_RECEIPTS:
return "GetReceipts";
case EthPV63.RECEIPTS:
return "Receipts";
case INSTANBUL_MSG:
return "InstanbulMsg";
default:
return INVALID_MESSAGE_NAME;
}
}
public static Istanbul64Protocol get() { public static Istanbul64Protocol get() {
return INSTANCE; return INSTANCE;
} }

@ -21,6 +21,10 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
/**
* Eth protocol messages as defined in
* https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol#new-model-syncing-pv62}
*/
public class EthProtocol implements SubProtocol { public class EthProtocol implements SubProtocol {
public static final String NAME = "eth"; public static final String NAME = "eth";
public static final Capability ETH62 = Capability.create(NAME, EthVersion.V62); public static final Capability ETH62 = Capability.create(NAME, EthVersion.V62);
@ -75,6 +79,30 @@ public class EthProtocol implements SubProtocol {
} }
} }
@Override
public String messageName(final int protocolVersion, final int code) {
switch (code) {
case EthPV62.STATUS:
return "Status";
case EthPV62.NEW_BLOCK_HASHES:
return "NewBlockHashes";
case EthPV62.TRANSACTIONS:
return "Transactions";
case EthPV62.GET_BLOCK_HEADERS:
return "GetBlockHeaders";
case EthPV62.BLOCK_HEADERS:
return "BlockHeaders";
case EthPV62.GET_BLOCK_BODIES:
return "GetBlockBodies";
case EthPV62.BLOCK_BODIES:
return "BlockBodies";
case EthPV62.NEW_BLOCK:
return "NewBlock";
default:
return INVALID_MESSAGE_NAME;
}
}
public static EthProtocol get() { public static EthProtocol get() {
return INSTANCE; return INSTANCE;
} }

@ -119,6 +119,7 @@ public class TestNode implements Closeable {
new PeerBlacklist(), new PeerBlacklist(),
new NoOpMetricsSystem(), new NoOpMetricsSystem(),
new NodeWhitelistController(PermissioningConfiguration.createDefault()))) new NodeWhitelistController(PermissioningConfiguration.createDefault())))
.metricsSystem(new NoOpMetricsSystem())
.build(); .build();
network = networkRunner.getNetwork(); network = networkRunner.getNetwork();
this.port = network.getSelf().getPort(); this.port = network.getSelf().getPort();

@ -17,6 +17,10 @@ import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -50,14 +54,24 @@ public class NetworkRunner implements AutoCloseable {
private final P2PNetwork network; private final P2PNetwork network;
private final Map<String, SubProtocol> subProtocols; private final Map<String, SubProtocol> subProtocols;
private final List<ProtocolManager> protocolManagers; private final List<ProtocolManager> protocolManagers;
private final LabelledMetric<Counter> inboundMessageCounter;
private NetworkRunner( private NetworkRunner(
final P2PNetwork network, final P2PNetwork network,
final Map<String, SubProtocol> subProtocols, final Map<String, SubProtocol> subProtocols,
final List<ProtocolManager> protocolManagers) { final List<ProtocolManager> protocolManagers,
final MetricsSystem metricsSystem) {
this.network = network; this.network = network;
this.protocolManagers = protocolManagers; this.protocolManagers = protocolManagers;
this.subProtocols = subProtocols; this.subProtocols = subProtocols;
inboundMessageCounter =
metricsSystem.createLabelledCounter(
MetricCategory.NETWORK,
"p2p_messages_inbound",
"Count of each P2P message received inbound.",
"protocol",
"name",
"code");
} }
public P2PNetwork getNetwork() { public P2PNetwork getNetwork() {
@ -116,6 +130,7 @@ public class NetworkRunner implements AutoCloseable {
message -> { message -> {
final int code = message.getData().getCode(); final int code = message.getData().getCode();
if (!protocol.isValidMessageCode(cap.getVersion(), code)) { if (!protocol.isValidMessageCode(cap.getVersion(), code)) {
inboundMessageCounter.labels(cap.toString(), "Invalid", "").inc();
// Handle invalid messages by disconnecting // Handle invalid messages by disconnecting
LOG.debug( LOG.debug(
"Invalid message code ({}-{}, {}) received from peer, disconnecting from:", "Invalid message code ({}-{}, {}) received from peer, disconnecting from:",
@ -126,6 +141,12 @@ public class NetworkRunner implements AutoCloseable {
message.getConnection().disconnect(DisconnectReason.BREACH_OF_PROTOCOL); message.getConnection().disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
return; return;
} }
inboundMessageCounter
.labels(
cap.toString(),
protocol.messageName(cap.getVersion(), code),
Integer.toString(code))
.inc();
protocolManager.processMessage(cap, message); protocolManager.processMessage(cap, message);
}); });
} }
@ -162,6 +183,7 @@ public class NetworkRunner implements AutoCloseable {
private Function<List<Capability>, P2PNetwork> networkProvider; private Function<List<Capability>, P2PNetwork> networkProvider;
List<ProtocolManager> protocolManagers = new ArrayList<>(); List<ProtocolManager> protocolManagers = new ArrayList<>();
List<SubProtocol> subProtocols = new ArrayList<>(); List<SubProtocol> subProtocols = new ArrayList<>();
MetricsSystem metricsSystem;
public NetworkRunner build() { public NetworkRunner build() {
final Map<String, SubProtocol> subProtocolMap = new HashMap<>(); final Map<String, SubProtocol> subProtocolMap = new HashMap<>();
@ -180,7 +202,7 @@ public class NetworkRunner implements AutoCloseable {
} }
} }
final P2PNetwork network = networkProvider.apply(caps); final P2PNetwork network = networkProvider.apply(caps);
return new NetworkRunner(network, subProtocolMap, protocolManagers); return new NetworkRunner(network, subProtocolMap, protocolManagers, metricsSystem);
} }
public Builder protocolManagers(final List<ProtocolManager> protocolManagers) { public Builder protocolManagers(final List<ProtocolManager> protocolManagers) {
@ -202,5 +224,10 @@ public class NetworkRunner implements AutoCloseable {
this.subProtocols.addAll(subProtocols); this.subProtocols.addAll(subProtocols);
return this; return this;
} }
public Builder metricsSystem(final MetricsSystem metricsSystem) {
this.metricsSystem = metricsSystem;
return this;
}
} }
} }

@ -22,6 +22,8 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.HelloMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.HelloMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.WireMessageCodes; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.WireMessageCodes;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.List; import java.util.List;
@ -50,17 +52,21 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte
private final CompletableFuture<PeerConnection> connectionFuture; private final CompletableFuture<PeerConnection> connectionFuture;
private final List<SubProtocol> subProtocols; private final List<SubProtocol> subProtocols;
private final LabelledMetric<Counter> outboundMessagesCounter;
AbstractHandshakeHandler( AbstractHandshakeHandler(
final List<SubProtocol> subProtocols, final List<SubProtocol> subProtocols,
final PeerInfo ourInfo, final PeerInfo ourInfo,
final CompletableFuture<PeerConnection> connectionFuture, final CompletableFuture<PeerConnection> connectionFuture,
final Callbacks callbacks, final Callbacks callbacks,
final PeerConnectionRegistry peerConnectionRegistry) { final PeerConnectionRegistry peerConnectionRegistry,
final LabelledMetric<Counter> outboundMessagesCounter) {
this.subProtocols = subProtocols; this.subProtocols = subProtocols;
this.ourInfo = ourInfo; this.ourInfo = ourInfo;
this.connectionFuture = connectionFuture; this.connectionFuture = connectionFuture;
this.callbacks = callbacks; this.callbacks = callbacks;
this.peerConnectionRegistry = peerConnectionRegistry; this.peerConnectionRegistry = peerConnectionRegistry;
this.outboundMessagesCounter = outboundMessagesCounter;
} }
/** /**
@ -101,7 +107,8 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte
final Framer framer = new Framer(handshaker.secrets()); final Framer framer = new Framer(handshaker.secrets());
final ByteToMessageDecoder deFramer = final ByteToMessageDecoder deFramer =
new DeFramer(framer, subProtocols, ourInfo, callbacks, connectionFuture); new DeFramer(
framer, subProtocols, ourInfo, callbacks, connectionFuture, outboundMessagesCounter);
ctx.channel() ctx.channel()
.pipeline() .pipeline()

@ -23,6 +23,8 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.Discon
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.HelloMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.HelloMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.WireMessageCodes; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.WireMessageCodes;
import tech.pegasys.pantheon.ethereum.rlp.RLPException; import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -49,18 +51,21 @@ final class DeFramer extends ByteToMessageDecoder {
private final PeerInfo ourInfo; private final PeerInfo ourInfo;
private final List<SubProtocol> subProtocols; private final List<SubProtocol> subProtocols;
private boolean hellosExchanged; private boolean hellosExchanged;
private final LabelledMetric<Counter> outboundMessagesCounter;
DeFramer( DeFramer(
final Framer framer, final Framer framer,
final List<SubProtocol> subProtocols, final List<SubProtocol> subProtocols,
final PeerInfo ourInfo, final PeerInfo ourInfo,
final Callbacks callbacks, final Callbacks callbacks,
final CompletableFuture<PeerConnection> connectFuture) { final CompletableFuture<PeerConnection> connectFuture,
final LabelledMetric<Counter> outboundMessagesCounter) {
this.framer = framer; this.framer = framer;
this.subProtocols = subProtocols; this.subProtocols = subProtocols;
this.ourInfo = ourInfo; this.ourInfo = ourInfo;
this.connectFuture = connectFuture; this.connectFuture = connectFuture;
this.callbacks = callbacks; this.callbacks = callbacks;
this.outboundMessagesCounter = outboundMessagesCounter;
} }
@Override @Override
@ -90,7 +95,8 @@ final class DeFramer extends ByteToMessageDecoder {
new CapabilityMultiplexer( new CapabilityMultiplexer(
subProtocols, ourInfo.getCapabilities(), peerInfo.getCapabilities()); subProtocols, ourInfo.getCapabilities(), peerInfo.getCapabilities());
final PeerConnection connection = final PeerConnection connection =
new NettyPeerConnection(ctx, peerInfo, capabilityMultiplexer, callbacks); new NettyPeerConnection(
ctx, peerInfo, capabilityMultiplexer, callbacks, outboundMessagesCounter);
if (capabilityMultiplexer.getAgreedCapabilities().size() == 0) { if (capabilityMultiplexer.getAgreedCapabilities().size() == 0) {
LOG.debug( LOG.debug(
"Disconnecting from {} because no capabilities are shared.", peerInfo.getClientId()); "Disconnecting from {} because no capabilities are shared.", peerInfo.getClientId());

@ -17,6 +17,8 @@ import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.Handshaker; import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.Handshaker;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -32,8 +34,15 @@ public final class HandshakeHandlerInbound extends AbstractHandshakeHandler {
final PeerInfo ourInfo, final PeerInfo ourInfo,
final CompletableFuture<PeerConnection> connectionFuture, final CompletableFuture<PeerConnection> connectionFuture,
final Callbacks callbacks, final Callbacks callbacks,
final PeerConnectionRegistry peerConnectionRegistry) { final PeerConnectionRegistry peerConnectionRegistry,
super(subProtocols, ourInfo, connectionFuture, callbacks, peerConnectionRegistry); final LabelledMetric<Counter> outboundMessagesCounter) {
super(
subProtocols,
ourInfo,
connectionFuture,
callbacks,
peerConnectionRegistry,
outboundMessagesCounter);
handshaker.prepareResponder(kp); handshaker.prepareResponder(kp);
} }

@ -17,6 +17,8 @@ import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.Handshaker; import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.Handshaker;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.List; import java.util.List;
@ -41,8 +43,15 @@ public final class HandshakeHandlerOutbound extends AbstractHandshakeHandler {
final PeerInfo ourInfo, final PeerInfo ourInfo,
final CompletableFuture<PeerConnection> connectionFuture, final CompletableFuture<PeerConnection> connectionFuture,
final Callbacks callbacks, final Callbacks callbacks,
final PeerConnectionRegistry peerConnectionRegistry) { final PeerConnectionRegistry peerConnectionRegistry,
super(subProtocols, ourInfo, connectionFuture, callbacks, peerConnectionRegistry); final LabelledMetric<Counter> outboundMessagesCounter) {
super(
subProtocols,
ourInfo,
connectionFuture,
callbacks,
peerConnectionRegistry,
outboundMessagesCounter);
handshaker.prepareInitiator(kp, SECP256K1.PublicKey.create(peerId)); handshaker.prepareInitiator(kp, SECP256K1.PublicKey.create(peerId));
this.first = handshaker.firstMessage(); this.first = handshaker.firstMessage();
} }

@ -31,6 +31,9 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.Subscribers; import tech.pegasys.pantheon.util.Subscribers;
@ -137,6 +140,8 @@ public final class NettyP2PNetwork implements P2PNetwork {
private final List<SubProtocol> subProtocols; private final List<SubProtocol> subProtocols;
private final LabelledMetric<Counter> outboundMessagesCounter;
/** /**
* Creates a peer networking service for production purposes. * Creates a peer networking service for production purposes.
* *
@ -173,6 +178,16 @@ public final class NettyP2PNetwork implements P2PNetwork {
peerRequirement, peerRequirement,
peerBlacklist, peerBlacklist,
nodeWhitelistController); nodeWhitelistController);
outboundMessagesCounter =
metricsSystem.createLabelledCounter(
MetricCategory.NETWORK,
"p2p_messages_outbound",
"Count of each P2P message sent outbound.",
"protocol",
"name",
"code");
subscribeDisconnect(peerDiscoveryAgent); subscribeDisconnect(peerDiscoveryAgent);
subscribeDisconnect(peerBlacklist); subscribeDisconnect(peerBlacklist);
subscribeDisconnect(connections); subscribeDisconnect(connections);
@ -234,7 +249,13 @@ public final class NettyP2PNetwork implements P2PNetwork {
new TimeoutException( new TimeoutException(
"Timed out waiting to fully establish incoming connection"))), "Timed out waiting to fully establish incoming connection"))),
new HandshakeHandlerInbound( new HandshakeHandlerInbound(
keyPair, subProtocols, ourPeerInfo, connectionFuture, callbacks, connections)); keyPair,
subProtocols,
ourPeerInfo,
connectionFuture,
callbacks,
connections,
outboundMessagesCounter));
connectionFuture.thenAccept( connectionFuture.thenAccept(
connection -> { connection -> {
@ -305,7 +326,8 @@ public final class NettyP2PNetwork implements P2PNetwork {
ourPeerInfo, ourPeerInfo,
connectionFuture, connectionFuture,
callbacks, callbacks,
connections)); connections,
outboundMessagesCounter));
} }
}) })
.connect() .connect()

@ -22,6 +22,9 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.WireMessageCodes;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.HashMap; import java.util.HashMap;
@ -49,12 +52,14 @@ final class NettyPeerConnection implements PeerConnection {
private final AtomicBoolean disconnected = new AtomicBoolean(false); private final AtomicBoolean disconnected = new AtomicBoolean(false);
private final Callbacks callbacks; private final Callbacks callbacks;
private final CapabilityMultiplexer multiplexer; private final CapabilityMultiplexer multiplexer;
private final LabelledMetric<Counter> outboundMessagesCounter;
public NettyPeerConnection( public NettyPeerConnection(
final ChannelHandlerContext ctx, final ChannelHandlerContext ctx,
final PeerInfo peerInfo, final PeerInfo peerInfo,
final CapabilityMultiplexer multiplexer, final CapabilityMultiplexer multiplexer,
final Callbacks callbacks) { final Callbacks callbacks,
final LabelledMetric<Counter> outboundMessagesCounter) {
this.ctx = ctx; this.ctx = ctx;
this.peerInfo = peerInfo; this.peerInfo = peerInfo;
this.multiplexer = multiplexer; this.multiplexer = multiplexer;
@ -63,6 +68,7 @@ final class NettyPeerConnection implements PeerConnection {
protocolToCapability.put(cap.getName(), cap); protocolToCapability.put(cap.getName(), cap);
} }
this.callbacks = callbacks; this.callbacks = callbacks;
this.outboundMessagesCounter = outboundMessagesCounter;
ctx.channel().closeFuture().addListener(f -> terminateConnection(TCP_SUBSYSTEM_ERROR, false)); ctx.channel().closeFuture().addListener(f -> terminateConnection(TCP_SUBSYSTEM_ERROR, false));
} }
@ -82,6 +88,19 @@ final class NettyPeerConnection implements PeerConnection {
+ ") via cap " + ") via cap "
+ capability); + capability);
} }
outboundMessagesCounter
.labels(
capability.toString(),
subProtocol.messageName(capability.getVersion(), message.getCode()),
Integer.toString(message.getCode()))
.inc();
} else {
outboundMessagesCounter
.labels(
"Wire",
WireMessageCodes.messageName(message.getCode()),
Integer.toString(message.getCode()))
.inc();
} }
LOG.trace("Writing {} to {} via protocol {}", message, peerInfo, capability); LOG.trace("Writing {} to {} via protocol {}", message, peerInfo, capability);
@ -152,9 +171,7 @@ final class NettyPeerConnection implements PeerConnection {
.add("nodeId", peerInfo.getNodeId()) .add("nodeId", peerInfo.getNodeId())
.add( .add(
"caps", "caps",
String.join( agreedCapabilities.stream().map(Capability::toString).collect(Collectors.joining(", ")))
", ",
agreedCapabilities.stream().map(Capability::toString).collect(Collectors.toList())))
.toString(); .toString();
} }
} }

@ -37,4 +37,17 @@ public interface SubProtocol {
* @return true if the given protocol version supports the given message code * @return true if the given protocol version supports the given message code
*/ */
boolean isValidMessageCode(int protocolVersion, int code); boolean isValidMessageCode(int protocolVersion, int code);
/** Message name for a message code not valid within this subprotocol. */
String INVALID_MESSAGE_NAME = "invalid";
/**
* Returns the name of the particular message for this protocol, suitable for human viewing.
*
* @param protocolVersion The version of the protocol for the message code.
* @param code The message code to be named.
* @return A string of the human readable name of the message, or {@link #INVALID_MESSAGE_NAME} if
* it is not a valid in the protocol.
*/
String messageName(int protocolVersion, int code);
} }

@ -19,4 +19,19 @@ public final class WireMessageCodes {
public static final int PONG = 0x03; public static final int PONG = 0x03;
private WireMessageCodes() {} private WireMessageCodes() {}
public static String messageName(final int code) {
switch (code) {
case HELLO:
return "Hello";
case DISCONNECT:
return "Disconnect";
case PING:
return "Ping";
case PONG:
return "Pong";
default:
return "invalid";
}
}
} }

@ -411,6 +411,11 @@ public final class NettyP2PNetworkTest {
public boolean isValidMessageCode(final int protocolVersion, final int code) { public boolean isValidMessageCode(final int protocolVersion, final int code) {
return true; return true;
} }
@Override
public String messageName(final int protocolVersion, final int code) {
return INVALID_MESSAGE_NAME;
}
}; };
} }
@ -430,6 +435,11 @@ public final class NettyP2PNetworkTest {
public boolean isValidMessageCode(final int protocolVersion, final int code) { public boolean isValidMessageCode(final int protocolVersion, final int code) {
return true; return true;
} }
@Override
public String messageName(final int protocolVersion, final int code) {
return INVALID_MESSAGE_NAME;
}
}; };
} }

@ -105,6 +105,11 @@ public class CapabilityMultiplexerTest {
public boolean isValidMessageCode(final int protocolVersion, final int code) { public boolean isValidMessageCode(final int protocolVersion, final int code) {
return true; return true;
} }
@Override
public String messageName(final int protocolVersion, final int code) {
return INVALID_MESSAGE_NAME;
}
}; };
} }
} }

@ -20,6 +20,7 @@ import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.Framer;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.FramingException; import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.FramingException;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Collections; import java.util.Collections;
@ -42,7 +43,8 @@ public class DeFramerTest {
Collections.emptyList(), Collections.emptyList(),
new PeerInfo(5, "abc", Collections.emptyList(), 0, BytesValue.fromHexString("0x01")), new PeerInfo(5, "abc", Collections.emptyList(), 0, BytesValue.fromHexString("0x01")),
callbacks, callbacks,
connectFuture); connectFuture,
NoOpMetricsSystem.NO_OP_LABELLED_COUNTER);
@Test @Test
public void shouldDisconnectForBreachOfProtocolWhenFramingExceptionThrown() throws Exception { public void shouldDisconnectForBreachOfProtocolWhenFramingExceptionThrown() throws Exception {

@ -20,6 +20,7 @@ import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.HelloMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.HelloMessage;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.bytes.BytesValue;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -47,7 +48,9 @@ public class NettyPeerConnectionTest {
when(context.channel()).thenReturn(channel); when(context.channel()).thenReturn(channel);
when(channel.closeFuture()).thenReturn(closeFuture); when(channel.closeFuture()).thenReturn(closeFuture);
when(channel.eventLoop()).thenReturn(eventLoop); when(channel.eventLoop()).thenReturn(eventLoop);
connection = new NettyPeerConnection(context, peerInfo, multiplexer, callbacks); connection =
new NettyPeerConnection(
context, peerInfo, multiplexer, callbacks, NoOpMetricsSystem.NO_OP_LABELLED_COUNTER);
} }
@Test @Test

@ -18,7 +18,8 @@ public enum MetricCategory {
JVM("jvm", false), JVM("jvm", false),
PROCESS("process", false), PROCESS("process", false),
BLOCKCHAIN("blockchain"), BLOCKCHAIN("blockchain"),
SYNCHRONIZER("synchronizer"); SYNCHRONIZER("synchronizer"),
NETWORK("network");
private final String name; private final String name;
private final boolean pantheonSpecific; private final boolean pantheonSpecific;

@ -29,6 +29,7 @@ public class NoOpMetricsSystem implements MetricsSystem {
private static final TimingContext NO_OP_TIMING_CONTEXT = () -> 0; private static final TimingContext NO_OP_TIMING_CONTEXT = () -> 0;
private static final OperationTimer NO_OP_TIMER = () -> NO_OP_TIMING_CONTEXT; private static final OperationTimer NO_OP_TIMER = () -> NO_OP_TIMING_CONTEXT;
public static final LabelledMetric<OperationTimer> NO_OP_LABELLED_TIMER = label -> NO_OP_TIMER; public static final LabelledMetric<OperationTimer> NO_OP_LABELLED_TIMER = label -> NO_OP_TIMER;
public static final LabelledMetric<Counter> NO_OP_LABELLED_COUNTER = label -> NO_OP_COUNTER;
@Override @Override
public LabelledMetric<Counter> createLabelledCounter( public LabelledMetric<Counter> createLabelledCounter(

@ -214,6 +214,7 @@ public class RunnerBuilder {
peerBlacklist, peerBlacklist,
metricsSystem, metricsSystem,
nodeWhitelistController)) nodeWhitelistController))
.metricsSystem(metricsSystem)
.build(); .build();
final Synchronizer synchronizer = pantheonController.getSynchronizer(); final Synchronizer synchronizer = pantheonController.getSynchronizer();

Loading…
Cancel
Save