Merge pull request #4042 from harjas27/goeth_metrics

Ethereum compatible metrics
pull/4075/head
Leo Chen 3 years ago committed by GitHub
commit 55c21649c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      api/service/prometheus/service.go
  2. 68
      core/blockchain.go
  3. 5
      core/headerchain.go
  4. 1
      p2p/host.go
  5. 38
      p2p/metrics.go

@ -11,6 +11,8 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/metrics"
eth_prometheus "github.com/ethereum/go-ethereum/metrics/prometheus"
"github.com/harmony-one/harmony/internal/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -104,6 +106,7 @@ func newService(cfg Config, additionalHandlers ...Handler) *Service {
mux := http.NewServeMux()
mux.Handle("/metrics", handler)
mux.Handle("/metrics/eth", eth_prometheus.Handler(metrics.DefaultRegistry))
mux.HandleFunc("/goroutinez", svc.goroutinezHandler)
// Register additional handlers.

@ -59,8 +59,25 @@ import (
)
var (
// blockInsertTimer
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil)
accountCommitTimer = metrics.NewRegisteredTimer("chain/account/commits", nil)
storageReadTimer = metrics.NewRegisteredTimer("chain/storage/reads", nil)
storageHashTimer = metrics.NewRegisteredTimer("chain/storage/hashes", nil)
storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil)
storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil)
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
// ErrNoGenesis is the error when there is no genesis.
ErrNoGenesis = errors.New("Genesis not found in chain")
// errExceedMaxPendingSlashes ..
@ -330,6 +347,7 @@ func (bc *BlockChain) loadLastState() error {
}
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64()))
// We don't need the following as we want the current header and block to be consistent
// Restore the last known head header
@ -346,9 +364,11 @@ func (bc *BlockChain) loadLastState() error {
// Restore the last known head fast block
bc.currentFastBlock.Store(currentBlock)
headFastBlockGauge.Update(int64(currentBlock.NumberU64()))
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
}
@ -410,24 +430,31 @@ func (bc *BlockChain) SetHead(head uint64) error {
// Rewind the block chain, ensuring we don't end up with a stateless head block
if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number().Uint64() < currentBlock.NumberU64() {
bc.currentBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number().Uint64()))
newHeadBlock := bc.GetBlock(currentHeader.Hash(), currentHeader.Number().Uint64())
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock != nil {
if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
// Rewound state missing, rolled back to before pivot, reset to genesis
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
}
// Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number().Uint64() < currentFastBlock.NumberU64() {
bc.currentFastBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number().Uint64()))
newHeadFastBlock := bc.GetBlock(currentHeader.Hash(), currentHeader.Number().Uint64())
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
// If either blocks reached nil, reset to the genesis state
if currentBlock := bc.CurrentBlock(); currentBlock == nil {
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil {
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
currentBlock := bc.CurrentBlock()
currentFastBlock := bc.CurrentFastBlock()
@ -532,8 +559,9 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
return err
}
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
return nil
}
@ -653,6 +681,7 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) error {
}
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
@ -664,6 +693,7 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) error {
}
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
return nil
}
@ -937,6 +967,7 @@ func (bc *BlockChain) Rollback(chain []common.Hash) error {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
if newFastBlock != nil {
bc.currentFastBlock.Store(newFastBlock)
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
}
}
@ -944,6 +975,7 @@ func (bc *BlockChain) Rollback(chain []common.Hash) error {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
if newBlock != nil {
bc.currentBlock.Store(newBlock)
headBlockGauge.Update(int64(newBlock.NumberU64()))
if err := rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash()); err != nil {
return err
}
@ -1108,6 +1140,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentFastBlock.Store(head)
headFastBlockGauge.Update(int64(head.NumberU64()))
}
}
bc.mu.Unlock()
@ -1467,6 +1500,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
}
// Process block using the parent state as reference point.
substart := time.Now()
receipts, cxReceipts, stakeMsgs, logs, usedGas, payout, newState, err := bc.processor.Process(
block, state, bc.vmConfig, true,
)
@ -1476,7 +1510,18 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
return i, events, coalescedLogs, err
}
// Update the metrics touched during block processing
accountReadTimer.Update(state.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(state.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(state.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(state.StorageUpdates) // Storage updates are complete, we can mark them
triehash := state.AccountHashes + state.StorageHashes // Save to not double count in validation
trieproc := state.AccountReads + state.AccountUpdates
trieproc += state.StorageReads + state.StorageUpdates
blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)
// Validate the state using the default validator
substart = time.Now()
if err := bc.Validator().ValidateState(
block, state, receipts, cxReceipts, usedGas,
); err != nil {
@ -1485,7 +1530,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
}
proctime := time.Since(bstart)
// Update the metrics touched during block validation
accountHashTimer.Update(state.AccountHashes) // Account hashes are complete, we can mark them
storageHashTimer.Update(state.StorageHashes) // Storage hashes are complete, we can mark them
blockValidationTimer.Update(time.Since(substart) - (state.AccountHashes + state.StorageHashes - triehash))
// Write the block to the chain and get the status.
substart = time.Now()
status, err := bc.WriteBlockWithState(
block, receipts, cxReceipts, stakeMsgs, payout, state,
)
@ -1502,6 +1553,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
Str("elapsed", common.PrettyDuration(time.Since(bstart)).String()).
Logger()
// Update the metrics touched during block commit
accountCommitTimer.Update(state.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(state.StorageCommits) // Storage commits are complete, we can mark them
blockWriteTimer.Update(time.Since(substart) - state.AccountCommits - state.StorageCommits)
blockInsertTimer.UpdateSince(bstart)
switch status {
case CanonStatTy:
logger.Info().Msg("Inserted new block")

@ -110,7 +110,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
}
}
hc.currentHeaderHash = hc.CurrentHeader().Hash()
headHeaderGauge.Update(hc.CurrentHeader().Number().Int64())
return hc, nil
}
@ -474,6 +474,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *block.Header) error {
hc.currentHeader.Store(head)
hc.currentHeaderHash = head.Hash()
headHeaderGauge.Update(head.Number().Int64())
return nil
}
@ -530,7 +531,7 @@ func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) error {
hc.currentHeader.Store(hc.genesisHeader)
}
hc.currentHeaderHash = hc.CurrentHeader().Hash()
headHeaderGauge.Update(hc.CurrentHeader().Number().Int64())
return nil
}

@ -106,6 +106,7 @@ func NewHost(cfg HostConfig) (Host, error) {
libp2p.Identity(key),
libp2p.EnableNATService(),
libp2p.ForceReachabilityPublic(),
libp2p.BandwidthReporter(newCounter()),
)
if err != nil {
return nil, errors.Wrapf(err, "cannot initialize libp2p host")

@ -0,0 +1,38 @@
//package p2p
package p2p
import (
eth_metrics "github.com/ethereum/go-ethereum/metrics"
"github.com/libp2p/go-libp2p-core/metrics"
)
const (
// ingressMeterName is the prefix of the per-packet inbound metrics.
ingressMeterName = "p2p/ingress"
// egressMeterName is the prefix of the per-packet outbound metrics.
egressMeterName = "p2p/egress"
)
var (
ingressTrafficMeter = eth_metrics.NewRegisteredMeter(ingressMeterName, nil)
egressTrafficMeter = eth_metrics.NewRegisteredMeter(egressMeterName, nil)
)
// Counter is a wrapper around a metrics.BandwidthCounter that meters both the
// inbound and outbound network traffic.
type Counter struct {
*metrics.BandwidthCounter
}
func newCounter() *Counter {
return &Counter{metrics.NewBandwidthCounter()}
}
func (c *Counter) LogRecvMessage(size int64) {
ingressTrafficMeter.Mark(size)
}
func (c *Counter) LogSentMessage(size int64) {
egressTrafficMeter.Mark(size)
}
Loading…
Cancel
Save