From 1885f1df747d52015c53332ffc559e1146080f8e Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Thu, 18 Mar 2021 05:20:49 -0700 Subject: [PATCH] [stream] added some prometheus metric --- p2p/stream/common/streammanager/metric.go | 81 +++++++++++++++++++ .../common/streammanager/streammanager.go | 13 +++ p2p/stream/types/metric.go | 75 +++++++++++++++++ p2p/stream/types/stream.go | 29 +++++-- 4 files changed, 193 insertions(+), 5 deletions(-) create mode 100644 p2p/stream/common/streammanager/metric.go create mode 100644 p2p/stream/types/metric.go diff --git a/p2p/stream/common/streammanager/metric.go b/p2p/stream/common/streammanager/metric.go new file mode 100644 index 000000000..521abf567 --- /dev/null +++ b/p2p/stream/common/streammanager/metric.go @@ -0,0 +1,81 @@ +package streammanager + +import ( + prom "github.com/harmony-one/harmony/api/service/prometheus" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + prom.PromRegistry().MustRegister( + discoverCounterVec, + discoveredPeersCounterVec, + addedStreamsCounterVec, + removedStreamsCounterVec, + setupStreamDuration, + numStreamsGaugeVec, + ) +} + +var ( + discoverCounterVec = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "discover", + Help: "number of intentions to actively discover peers", + }, + []string{"topic"}, + ) + + discoveredPeersCounterVec = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "discover_peers", + Help: "number of peers discovered and connect actively", + }, + []string{"topic"}, + ) + + addedStreamsCounterVec = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "added_streams", + Help: "number of streams added in stream manager", + }, + []string{"topic"}, + ) + + removedStreamsCounterVec = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "removed_streams", + Help: "number of streams removed in stream manager", + }, + []string{"topic"}, + ) + + setupStreamDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "setup_stream_duration", + Help: "duration in seconds of setting up connection to a discovered peer", + // buckets: 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1280ms, +INF + Buckets: prometheus.ExponentialBuckets(0.02, 2, 8), + }, + []string{"topic"}, + ) + + numStreamsGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "num_streams", + Help: "number of connected streams", + }, + []string{"topic"}, + ) +) diff --git a/p2p/stream/common/streammanager/streammanager.go b/p2p/stream/common/streammanager/streammanager.go index f44a9c2ec..ddc6278dc 100644 --- a/p2p/stream/common/streammanager/streammanager.go +++ b/p2p/stream/common/streammanager/streammanager.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/ethereum/go-ethereum/event" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" @@ -229,6 +231,8 @@ func (sm *streamManager) handleAddStream(st sttypes.Stream) error { sm.streams.addStream(st) sm.addStreamFeed.Send(EvtStreamAdded{st}) + addedStreamsCounterVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Inc() + numStreamsGaugeVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Set(float64(sm.streams.size())) return nil } @@ -246,7 +250,10 @@ func (sm *streamManager) handleRemoveStream(id sttypes.StreamID) error { default: } } + sm.removeStreamFeed.Send(EvtStreamRemoved{id}) + removedStreamsCounterVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Inc() + numStreamsGaugeVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Set(float64(sm.streams.size())) return nil } @@ -275,10 +282,13 @@ func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) error { if err != nil { return errors.Wrap(err, "failed to discover") } + discoverCounterVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Inc() + for peer := range peers { if peer.ID == sm.host.ID() { continue } + discoveredPeersCounterVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Inc() go func(pid libp2p_peer.ID) { // The ctx here is using the module context instead of discover context err := sm.setupStreamWithPeer(sm.ctx, pid) @@ -306,6 +316,9 @@ func (sm *streamManager) discover(ctx context.Context) (<-chan libp2p_peer.AddrI } func (sm *streamManager) setupStreamWithPeer(ctx context.Context, pid libp2p_peer.ID) error { + timer := prometheus.NewTimer(setupStreamDuration.With(prometheus.Labels{"topic": string(sm.myProtoID)})) + defer timer.ObserveDuration() + nCtx, cancel := context.WithTimeout(ctx, connectTimeout) defer cancel() diff --git a/p2p/stream/types/metric.go b/p2p/stream/types/metric.go new file mode 100644 index 000000000..399531843 --- /dev/null +++ b/p2p/stream/types/metric.go @@ -0,0 +1,75 @@ +package sttypes + +import ( + prom "github.com/harmony-one/harmony/api/service/prometheus" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + prom.PromRegistry().MustRegister( + bytesReadCounter, + bytesWriteCounter, + msgReadCounter, + msgWriteCounter, + msgReadFailedCounterVec, + msgWriteFailedCounterVec, + ) +} + +var ( + bytesReadCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "bytes_read", + Help: "total bytes read from stream", + }, + ) + + bytesWriteCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "bytes_write", + Help: "total bytes write to stream", + }, + ) + + msgReadCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "msg_read", + Help: "number of messages read from stream", + }, + ) + + msgWriteCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "msg_write", + Help: "number of messages write to stream", + }, + ) + + msgReadFailedCounterVec = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "msg_read_failed", + Help: "number of messages failed reading from stream", + }, + []string{"error"}, + ) + + msgWriteFailedCounterVec = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "hmy", + Subsystem: "stream", + Name: "msg_write_failed", + Help: "number of messages failed writing to stream", + }, + []string{"error"}, + ) +) diff --git a/p2p/stream/types/stream.go b/p2p/stream/types/stream.go index 492a77166..59578fa3d 100644 --- a/p2p/stream/types/stream.go +++ b/p2p/stream/types/stream.go @@ -9,6 +9,7 @@ import ( libp2p_network "github.com/libp2p/go-libp2p-core/network" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" ) // Stream is the interface for streams implemented in each service. @@ -77,26 +78,43 @@ const ( // WriteBytes write the bytes to the stream. // First 4 bytes is used as the size bytes, and the rest is the content -func (st *BaseStream) WriteBytes(b []byte) error { +func (st *BaseStream) WriteBytes(b []byte) (err error) { + defer func() { + msgWriteCounter.Inc() + if err != nil { + msgWriteFailedCounterVec.With(prometheus.Labels{"error": err.Error()}).Inc() + } + }() + if len(b) > maxMsgBytes { return errors.New("message too long") } - if _, err := st.rw.Write(intToBytes(len(b))); err != nil { + if _, err = st.rw.Write(intToBytes(len(b))); err != nil { return errors.Wrap(err, "write size bytes") } - if _, err := st.rw.Write(b); err != nil { + bytesWriteCounter.Add(sizeBytes) + if _, err = st.rw.Write(b); err != nil { return errors.Wrap(err, "write content") } + bytesWriteCounter.Add(float64(len(b))) return st.rw.Flush() } // ReadMsg read the bytes from the stream -func (st *BaseStream) ReadBytes() ([]byte, error) { +func (st *BaseStream) ReadBytes() (b []byte, err error) { + defer func() { + msgReadCounter.Inc() + if err != nil { + msgReadFailedCounterVec.With(prometheus.Labels{"error": err.Error()}).Inc() + } + }() + sb := make([]byte, sizeBytes) - _, err := st.rw.Read(sb) + _, err = st.rw.Read(sb) if err != nil { return nil, errors.Wrap(err, "read size") } + bytesReadCounter.Add(sizeBytes) size := bytesToInt(sb) if size > maxMsgBytes { return nil, fmt.Errorf("message size exceed max: %v > %v", size, maxMsgBytes) @@ -107,6 +125,7 @@ func (st *BaseStream) ReadBytes() ([]byte, error) { if err != nil { return nil, errors.Wrap(err, "read content") } + bytesReadCounter.Add(float64(n)) if n != size { return nil, errors.New("ReadBytes sanity failed: byte size") }