[project] Remove unused internal memprofiling, profiling, metrics service (#2679)

* [project] Remove unused internal memprofiling, profiling

* [internal] Unused attack package

* [internal] More dead metrics related code
pull/2685/head
Edgar Aroutiounian 5 years ago committed by GitHub
parent 7da8ec0a02
commit 45135b21d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      api/service/config.go
  2. 3
      api/service/manager.go
  3. 236
      api/service/metrics/service.go
  4. 102
      api/service/metrics/storage.go
  5. 57
      cmd/harmony/main.go
  6. 2
      consensus/consensus.go
  7. 39
      consensus/consensus_service.go
  8. 2
      consensus/consensus_v2.go
  9. 118
      internal/attack/attack.go
  10. 26
      internal/attack/attack_test.go
  11. 35
      internal/configs/node/config.go
  12. 2
      internal/configs/node/group.go
  13. 136
      internal/memprofiling/lib.go
  14. 14
      internal/memprofiling/singleton.go
  15. 96
      internal/profiler/profiler.go
  16. 6
      internal/utils/metrics.go
  17. 19
      internal/utils/metrics_test.go
  18. 10
      node/node.go
  19. 92
      node/node_metrics.go
  20. 12
      node/service_setup.go

@ -10,16 +10,13 @@ import (
// cyclic imports
type NodeConfig struct {
// The three groupID design, please refer to https://github.com/harmony-one/harmony/blob/master/node/node.md#libp2p-integration
Beacon nodeconfig.GroupID // the beacon group ID
ShardGroupID nodeconfig.GroupID // the group ID of the shard
Client nodeconfig.GroupID // the client group ID of the shard
IsClient bool // whether this node is a client node
IsBeacon bool // whether this node is a beacon node or not
ShardID uint32 // shardID of this node
Actions map[nodeconfig.GroupID]nodeconfig.ActionType // actions on the groups
PushgatewayIP string // prometheus pushgateway ip
PushgatewayPort string // prometheus pushgateway port
MetricsFlag bool // flag to collect metrics or not
Beacon nodeconfig.GroupID // the beacon group ID
ShardGroupID nodeconfig.GroupID // the group ID of the shard
Client nodeconfig.GroupID // the client group ID of the shard
IsClient bool // whether this node is a client node
IsBeacon bool // whether this node is a beacon node or not
ShardID uint32 // shardID of this node
Actions map[nodeconfig.GroupID]nodeconfig.ActionType // actions on the groups
}
// GroupIDShards is a map of ShardGroupID ID

@ -26,7 +26,6 @@ const (
ClientSupport Type = iota
SupportExplorer
Consensus
Metrics
BlockProposal
NetworkInfo
PeerDiscovery
@ -38,8 +37,6 @@ func (t Type) String() string {
return "SupportExplorer"
case ClientSupport:
return "ClientSupport"
case Metrics:
return "Metrics"
case Consensus:
return "Consensus"
case BlockProposal:

@ -1,236 +0,0 @@
package metrics
import (
"fmt"
"math"
"math/big"
"strconv"
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
)
// Constants for metrics service.
const (
BalanceScale int = 18
BalancePrecision int = 13
ConnectionsNumberPush int = 0
BlockHeightPush int = 1
NodeBalancePush int = 2
LastConsensusPush int = 3
BlockRewardPush int = 4
TxPoolPush int = 5
IsLeaderPush int = 6
metricsServicePortDifference = 2000
)
// Service is the struct for metrics service.
type Service struct {
BLSPublicKey string
IP string
Port string
PushgatewayIP string
PushgatewayPort string
storage *Storage
pusher *push.Pusher
messageChan chan *msg_pb.Message
}
// init vars for prometheus
var (
curTxPoolSize = uint64(0)
curBlockHeight = uint64(0)
curBlocks = uint64(0)
curBalance = big.NewInt(0)
curConnectionsNumber = 0
curIsLeader = false
lastBlockReward = big.NewInt(0)
lastConsensusTime = int64(0)
metricsPush = make(chan int)
blockHeightGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "block_height",
Help: "Get current block height.",
})
txPoolGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tx_pool_size",
Help: "Get current tx pool size.",
})
isLeaderGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "is_leader",
Help: "Is node a leader now.",
})
blocksAcceptedGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "blocks_accepted",
Help: "Get accepted blocks.",
})
connectionsNumberGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "connections_number",
Help: "Get current connections number for a node.",
})
nodeBalanceGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "node_balance",
Help: "Get current node balance.",
})
lastConsensusGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "last_consensus",
Help: "Get last consensus time.",
})
blockRewardGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "block_reward",
Help: "Get last block reward.",
})
)
// New returns metrics service.
func New(selfPeer *p2p.Peer, blsPublicKey, pushgatewayIP, pushgatewayPort string) *Service {
return &Service{
BLSPublicKey: blsPublicKey,
IP: selfPeer.IP,
Port: selfPeer.Port,
PushgatewayIP: pushgatewayIP,
PushgatewayPort: pushgatewayPort,
}
}
// StartService starts metrics service.
func (s *Service) StartService() {
utils.Logger().Info().Msg("Starting metrics service.")
s.Run()
}
// StopService shutdowns metrics service.
func (s *Service) StopService() {
utils.Logger().Info().Msg("Shutting down metrics service.")
metricsPush <- -1
}
// GetMetricsServicePort returns the port serving metrics service dashboard. This port is metricsServicePortDifference less than the node port.
func GetMetricsServicePort(nodePort string) string {
if port, err := strconv.Atoi(nodePort); err == nil {
return fmt.Sprintf("%d", port-metricsServicePortDifference)
}
utils.Logger().Error().Msg("Error on parsing.")
return ""
}
// Run is to run http serving metrics service.
func (s *Service) Run() {
// Init local storage for metrics.
s.storage = GetStorageInstance(s.IP, s.Port, true)
registry := prometheus.NewRegistry()
registry.MustRegister(blockHeightGauge, connectionsNumberGauge, nodeBalanceGauge, lastConsensusGauge, blockRewardGauge, blocksAcceptedGauge, txPoolGauge, isLeaderGauge)
s.pusher = push.New("http://"+s.PushgatewayIP+":"+s.PushgatewayPort, "node_metrics").Gatherer(registry).Grouping("instance", s.IP+":"+s.Port).Grouping("bls_key", s.BLSPublicKey)
go s.PushMetrics()
}
// FormatBalance formats big.Int balance with precision.
func FormatBalance(balance *big.Int) float64 {
scaledBalance := new(big.Float).Quo(new(big.Float).SetInt(balance), new(big.Float).SetFloat64(math.Pow10(BalanceScale)))
floatBalance, _ := scaledBalance.Float64()
return floatBalance
}
// UpdateBlockHeight updates block height.
func UpdateBlockHeight(blockHeight uint64) {
blockHeightGauge.Set(float64(blockHeight))
blocksAcceptedGauge.Set(float64(blockHeight) - float64(curBlockHeight))
curBlockHeight = blockHeight
metricsPush <- BlockHeightPush
}
// UpdateNodeBalance updates node balance.
func UpdateNodeBalance(balance *big.Int) {
nodeBalanceGauge.Set(FormatBalance(balance))
curBalance = balance
metricsPush <- NodeBalancePush
}
// UpdateTxPoolSize updates tx pool size.
func UpdateTxPoolSize(txPoolSize uint64) {
txPoolGauge.Set(float64(txPoolSize))
curTxPoolSize = txPoolSize
metricsPush <- TxPoolPush
}
// UpdateBlockReward updates block reward.
func UpdateBlockReward(blockReward *big.Int) {
blockRewardGauge.Set(FormatBalance(blockReward))
lastBlockReward = blockReward
metricsPush <- BlockRewardPush
}
// UpdateLastConsensus updates last consensus time.
func UpdateLastConsensus(consensusTime int64) {
lastConsensusGauge.Set(float64(consensusTime))
lastConsensusTime = consensusTime
metricsPush <- LastConsensusPush
}
// UpdateConnectionsNumber updates connections number.
func UpdateConnectionsNumber(connectionsNumber int) {
connectionsNumberGauge.Set(float64(connectionsNumber))
curConnectionsNumber = connectionsNumber
metricsPush <- ConnectionsNumberPush
}
// UpdateIsLeader updates if node is a leader.
func UpdateIsLeader(isLeader bool) {
if isLeader {
isLeaderGauge.Set(1.0)
} else {
isLeaderGauge.Set(0.0)
}
curIsLeader = isLeader
metricsPush <- IsLeaderPush
}
// PushMetrics pushes metrics updates to prometheus pushgateway.
func (s *Service) PushMetrics() {
for metricType := range metricsPush {
if metricType == -1 {
break
}
if err := s.pusher.Add(); err != nil {
utils.Logger().Error().Err(err).Msg("Could not push to a prometheus pushgateway.")
// No dump for now, not necessarily for metrics and consumes memory, doesn't restore from db anyway.
/* switch metricType {
case ConnectionsNumberPush:
s.storage.Dump(curConnectionsNumber, ConnectionsNumberPrefix)
case BlockHeightPush:
s.storage.Dump(curBlockHeight, BlockHeightPrefix)
s.storage.Dump(curBlocks, BlocksPrefix)
case BlockRewardPush:
s.storage.Dump(lastBlockReward, BlockHeightPrefix)
case NodeBalancePush:
s.storage.Dump(curBalance, BalancePrefix)
case LastConsensusPush:
s.storage.Dump(lastConsensusTime, ConsensusTimePrefix)
case TxPoolPush:
s.storage.Dump(curTxPoolSize, TxPoolPrefix)
case IsLeaderPush:
s.storage.Dump(curIsLeader, IsLeaderPrefix)
}*/
}
}
return
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {
return
}
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -1,102 +0,0 @@
package metrics
import (
"fmt"
"os"
"sync"
"time"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/internal/utils"
)
// Constants for storage.
const (
BalancePrefix = "bap"
BlockHeightPrefix = "bhp"
BlocksPrefix = "bp"
BlockRewardPrefix = "brp"
ConnectionsNumberPrefix = "cnp"
ConsensusTimePrefix = "ltp"
IsLeaderPrefix = "ilp"
TxPoolPrefix = "tpp"
)
// GetKey returns key by prefix and pushed time momemnt.
func GetKey(prefix string, moment int64) string {
return fmt.Sprintf("%s_%d", prefix, moment)
}
// storage instance
var storage *Storage
var onceMetrics sync.Once
// Storage storage dump the block info into leveldb.
type Storage struct {
db *ethdb.LDBDatabase
}
// GetStorageInstance returns attack model by using singleton pattern.
func GetStorageInstance(ip, port string, remove bool) *Storage {
onceMetrics.Do(func() {
storage = &Storage{}
storage.Init(ip, port, remove)
})
return storage
}
// Init initializes storage.
func (storage *Storage) Init(ip, port string, remove bool) {
dbFileName := "/tmp/db_metrics_" + ip + "_" + port
var err error
if remove {
var err = os.RemoveAll(dbFileName)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to remove existing database files.")
}
}
if storage.db, err = ethdb.NewLDBDatabase(dbFileName, 0, 0); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to create new database.")
}
}
// GetDB returns the LDBDatabase of the storage.
func (storage *Storage) GetDB() *ethdb.LDBDatabase {
return storage.db
}
// Dump data into lvdb by value and prefix.
func (storage *Storage) Dump(value interface{}, prefix string) error {
currentTime := time.Now().UnixNano()
utils.Logger().Info().Msgf("Store %s %v at time %d", prefix, value, currentTime)
batch := storage.db.NewBatch()
// Update database.
if err := batch.Put([]byte(GetKey(prefix, currentTime)), []byte(fmt.Sprintf("%v", value.(interface{})))); err != nil {
utils.Logger().Warn().Err(err).Msgf("Cannot batch %s.", prefix)
return err
}
if err := batch.Write(); err != nil {
utils.Logger().Warn().Err(err).Msg("Cannot write batch.")
return err
}
return nil
}
// Read returns data list of a particular metric by since, until, prefix, interface.
func (storage *Storage) Read(since, until int64, prefix string, varType interface{}) []interface{} {
dataList := make([]interface{}, 0)
for i := since; i <= until; i++ {
data, err := storage.db.Get([]byte(GetKey(prefix, i)))
if err != nil {
continue
}
decodedData := varType
if rlp.DecodeBytes(data, decodedData) != nil {
utils.Logger().Error().Msg("Error on getting data from db.")
os.Exit(1)
}
dataList = append(dataList, decodedData)
}
return dataList
}

@ -34,7 +34,6 @@ import (
viperconfig "github.com/harmony-one/harmony/internal/configs/viper"
"github.com/harmony-one/harmony/internal/genesis"
hmykey "github.com/harmony-one/harmony/internal/keystore"
"github.com/harmony-one/harmony/internal/memprofiling"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
@ -105,8 +104,6 @@ var (
stakingFlag = flag.Bool("staking", false, "whether the node should operate in staking mode")
// shardID indicates the shard ID of this node
shardID = flag.Int("shard_id", -1, "the shard ID of this node")
enableMemProfiling = flag.Bool("enableMemProfiling", false, "Enable memsize logging.")
enableGC = flag.Bool("enableGC", true, "Enable calling garbage collector manually .")
cmkEncryptedBLSKey = flag.String("aws_blskey", "", "The aws CMK encrypted bls private key file.")
blsKeyFile = flag.String("blskey_file", "", "The encrypted file of bls serialized private key by passphrase.")
blsFolder = flag.String("blsfolder", ".hmy/blskeys", "The folder that stores the bls keys and corresponding passphrases; e.g. <blskey>.key and <blskey>.pass; all bls keys mapped to same shard")
@ -131,11 +128,7 @@ var (
dbDir = flag.String("db_dir", "", "blockchain database directory")
// Disable view change.
disableViewChange = flag.Bool("disable_view_change", false, "Do not propose view change (testing only)")
// metrics flag to collct meetrics or not, pushgateway ip and port for metrics
metricsFlag = flag.Bool("metrics", false, "Collect and upload node metrics")
pushgatewayIP = flag.String("pushgateway_ip", "grafana.harmony.one", "Metrics view ip")
pushgatewayPort = flag.String("pushgateway_port", "9091", "Metrics view port")
publicRPC = flag.Bool("public_rpc", false, "Enable Public RPC Access (default: false)")
publicRPC = flag.Bool("public_rpc", false, "Enable Public RPC Access (default: false)")
// Bad block revert
doRevertBefore = flag.Int("do_revert_before", 0, "If the current block is less than do_revert_before, revert all blocks until (including) revert_to block")
revertTo = flag.Int("revert_to", 0, "The revert will rollback all blocks until and including block number revert_to")
@ -181,9 +174,6 @@ func initSetup() {
// Set sharding schedule
nodeconfig.SetShardingSchedule(shard.Schedule)
// Setup mem profiling.
memprofiling.GetMemProfiling().Config()
// Set default keystore Dir
hmykey.DefaultKeyStoreDir = *keystoreDir
@ -419,13 +409,10 @@ func createGlobalConfig() (*nodeconfig.ConfigType, error) {
// Set network type
netType := nodeconfig.NetworkType(*networkType)
nodeconfig.SetNetworkType(netType) // sets for both global and shard configs
nodeConfig.SetPushgatewayIP(*pushgatewayIP)
nodeConfig.SetPushgatewayPort(*pushgatewayPort)
nodeConfig.SetMetricsFlag(*metricsFlag)
nodeConfig.SetArchival(*isArchival)
// P2p private key is used for secure message transfer between p2p nodes.
nodeConfig.P2pPriKey, _, err = utils.LoadKeyFromFile(*keyFile)
// P2P private key is used for secure message transfer between p2p nodes.
nodeConfig.P2PPriKey, _, err = utils.LoadKeyFromFile(*keyFile)
if err != nil {
return nil, errors.Wrapf(err, "cannot load or create P2P key at %#v",
*keyFile)
@ -437,7 +424,7 @@ func createGlobalConfig() (*nodeconfig.ConfigType, error) {
ConsensusPubKey: nodeConfig.ConsensusPubKey.PublicKey[0],
}
myHost, err = p2pimpl.NewHost(&selfPeer, nodeConfig.P2pPriKey)
myHost, err = p2pimpl.NewHost(&selfPeer, nodeConfig.P2PPriKey)
if err != nil {
return nil, errors.Wrap(err, "cannot create P2P network host")
}
@ -530,10 +517,6 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// TODO: refactor the creation of blockchain out of node.New()
currentConsensus.ChainReader = currentNode.Blockchain()
currentNode.NodeConfig.DNSZone = *dnsZone
// Set up prometheus pushgateway for metrics monitoring serivce.
currentNode.NodeConfig.SetPushgatewayIP(nodeConfig.PushgatewayIP)
currentNode.NodeConfig.SetPushgatewayPort(nodeConfig.PushgatewayPort)
currentNode.NodeConfig.SetMetricsFlag(nodeConfig.MetricsFlag)
currentNode.NodeConfig.SetBeaconGroupID(
nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID),
@ -595,10 +578,6 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// update consensus information based on the blockchain
currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation())
// Watching currentNode and currentConsensus.
memprofiling.GetMemProfiling().Add("currentNode", currentNode)
memprofiling.GetMemProfiling().Add("currentConsensus", currentConsensus)
return currentNode
}
@ -623,13 +602,10 @@ func setupBlacklist() (map[ethCommon.Address]struct{}, error) {
}
func setupViperConfig() {
// read from environment
envViper := viperconfig.CreateEnvViper()
//read from config file
configFileViper := viperconfig.CreateConfFileViper("./.hmy", "nodeconfig", "json")
viperconfig.ResetConfString(ip, envViper, configFileViper, "", "ip")
viperconfig.ResetConfString(port, envViper, configFileViper, "", "port")
viperconfig.ResetConfString(logFolder, envViper, configFileViper, "", "log_folder")
@ -638,7 +614,6 @@ func setupViperConfig() {
viperconfig.ResetConfBool(profile, envViper, configFileViper, "", "profile")
viperconfig.ResetConfString(metricsReportURL, envViper, configFileViper, "", "metrics_report_url")
viperconfig.ResetConfString(pprof, envViper, configFileViper, "", "pprof")
viperconfig.ResetConfBool(versionFlag, envViper, configFileViper, "", "version")
viperconfig.ResetConfBool(onlyLogTps, envViper, configFileViper, "", "only_log_tps")
viperconfig.ResetConfString(dnsZone, envViper, configFileViper, "", "dns_zone")
@ -650,38 +625,30 @@ func setupViperConfig() {
viperconfig.ResetConfString(delayCommit, envViper, configFileViper, "", "delay_commit")
viperconfig.ResetConfString(nodeType, envViper, configFileViper, "", "node_type")
viperconfig.ResetConfString(networkType, envViper, configFileViper, "", "network_type")
viperconfig.ResetConfInt(syncFreq, envViper, configFileViper, "", "sync_freq")
viperconfig.ResetConfInt(beaconSyncFreq, envViper, configFileViper, "", "beacon_sync_freq")
viperconfig.ResetConfInt(blockPeriod, envViper, configFileViper, "", "block_period")
viperconfig.ResetConfBool(leaderOverride, envViper, configFileViper, "", "leader_override")
viperconfig.ResetConfBool(stakingFlag, envViper, configFileViper, "", "staking")
viperconfig.ResetConfInt(shardID, envViper, configFileViper, "", "shard_id")
viperconfig.ResetConfBool(enableMemProfiling, envViper, configFileViper, "", "enableMemProfiling")
viperconfig.ResetConfBool(enableGC, envViper, configFileViper, "", "enableGC")
viperconfig.ResetConfString(blsKeyFile, envViper, configFileViper, "", "blskey_file")
viperconfig.ResetConfString(blsFolder, envViper, configFileViper, "", "blsfolder")
viperconfig.ResetConfString(blsPass, envViper, configFileViper, "", "blsPass")
viperconfig.ResetConfUInt(devnetNumShards, envViper, configFileViper, "", "dn_num_shards")
viperconfig.ResetConfInt(devnetShardSize, envViper, configFileViper, "", "dn_shard_size")
viperconfig.ResetConfInt(devnetHarmonySize, envViper, configFileViper, "", "dn_hmy_size")
viperconfig.ResetConfBool(logConn, envViper, configFileViper, "", "log_conn")
viperconfig.ResetConfString(keystoreDir, envViper, configFileViper, "", "keystore")
viperconfig.ResetConfBool(logP2P, envViper, configFileViper, "", "log_p2p")
viperconfig.ResetConfInt(verbosity, envViper, configFileViper, "", "verbosity")
viperconfig.ResetConfString(dbDir, envViper, configFileViper, "", "db_dir")
viperconfig.ResetConfBool(disableViewChange, envViper, configFileViper, "", "disable_view_change")
viperconfig.ResetConfBool(metricsFlag, envViper, configFileViper, "", "metrics")
viperconfig.ResetConfString(pushgatewayIP, envViper, configFileViper, "", "pushgateway_ip")
viperconfig.ResetConfString(pushgatewayPort, envViper, configFileViper, "", "pushgateway_port")
viperconfig.ResetConfBool(publicRPC, envViper, configFileViper, "", "public_rpc")
viperconfig.ResetConfInt(doRevertBefore, envViper, configFileViper, "", "do_revert_before")
viperconfig.ResetConfInt(revertTo, envViper, configFileViper, "", "revert_to")
viperconfig.ResetConfBool(revertBeacon, envViper, configFileViper, "", "revert_beacon")
viperconfig.ResetConfString(blacklistPath, envViper, configFileViper, "", "blacklist")
viperconfig.ResetConfString(webHookYamlPath, envViper, configFileViper, "", "webhook_yaml")
}
func main() {
@ -749,11 +716,6 @@ func main() {
initSetup()
// Set up manual call for garbage collection.
if *enableGC {
memprofiling.MaybeCallGCPeriodically()
}
if *nodeType == "validator" {
var err error
if *stakingFlag {
@ -852,10 +814,6 @@ func main() {
Str("multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, myHost.GetID().Pretty())).
Msg(startMsg)
if *enableMemProfiling {
memprofiling.GetMemProfiling().Start()
}
if *logP2P {
f, err := os.OpenFile(path.Join(*logFolder, "libp2p.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
@ -870,7 +828,6 @@ func main() {
go currentNode.SupportSyncing()
currentNode.ServiceManagerSetup()
currentNode.RunServices()
// RPC for SDK not supported for mainnet.
if err := currentNode.StartRPC(*port); err != nil {
@ -879,11 +836,5 @@ func main() {
Msg("StartRPC failed")
}
// Run additional node collectors
// Collect node metrics if metrics flag is set
if currentNode.NodeConfig.GetMetricsFlag() {
go currentNode.CollectMetrics()
}
currentNode.StartServer()
}

@ -12,7 +12,6 @@ import (
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/memprofiling"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/p2p"
@ -233,6 +232,5 @@ func New(
consensus.lastBlockReward = common.Big0
// channel for receiving newly generated VDF
consensus.RndChannel = make(chan [vdfAndSeedSize]byte)
memprofiling.GetMemProfiling().Add("consensus.FBFTLog", consensus.FBFTLog)
return &consensus, nil
}

@ -4,7 +4,6 @@ import (
"encoding/hex"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
protobuf "github.com/golang/protobuf/proto"
@ -13,12 +12,10 @@ import (
"github.com/harmony-one/harmony/block"
consensus_engine "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/crypto/hash"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/profiler"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/p2p"
@ -357,42 +354,6 @@ func (consensus *Consensus) ReadSignatureBitmapPayload(
)
}
func (consensus *Consensus) reportMetrics(block types.Block) {
endTime := time.Now()
timeElapsed := endTime.Sub(startTime)
numOfTxs := len(block.Transactions())
tps := float64(numOfTxs) / timeElapsed.Seconds()
consensus.getLogger().Info().
Int("numOfTXs", numOfTxs).
Time("startTime", startTime).
Time("endTime", endTime).
Dur("timeElapsed", endTime.Sub(startTime)).
Float64("TPS", tps).
Msg("TPS Report")
// Post metrics
profiler := profiler.GetProfiler()
if profiler.MetricsReportURL == "" {
return
}
txHashes := []string{}
for i, end := 0, len(block.Transactions()); i < 3 && i < end; i++ {
txHash := block.Transactions()[end-1-i].Hash()
txHashes = append(txHashes, hex.EncodeToString(txHash[:]))
}
metrics := map[string]interface{}{
"key": hex.EncodeToString(consensus.LeaderPubKey.Serialize()),
"tps": tps,
"txCount": numOfTxs,
"nodeCount": consensus.Decider.ParticipantsCount() + 1,
"latestBlockHash": hex.EncodeToString(consensus.blockHash[:]),
"latestTxHashes": txHashes,
"blockLatency": int(timeElapsed / time.Millisecond),
}
profiler.LogMetrics(metrics)
}
// getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() *zerolog.Logger {
logger := utils.Logger().With().

@ -157,8 +157,6 @@ func (consensus *Consensus) finalizeCommits() {
Msg("[Finalizing] Sent Committed Message")
}
consensus.reportMetrics(*block)
// Dump new block into level db
// In current code, we add signatures in block in tryCatchup, the block dump to explorer does not contains signatures
// but since explorer doesn't need signatures, it should be fine

@ -1,118 +0,0 @@
package attack
import (
"math/rand"
"os"
"sync"
"time"
"github.com/harmony-one/harmony/internal/utils"
)
// Constants used for attack model.
const (
DroppingTickDuration = 2 * time.Second
HitRate = 10
DelayResponseDuration = 10 * time.Second
ViewIDThresholdMin = 10
ViewIDThresholdMax = 100
)
// Type is the type of attack model.
type Type byte
// Constants of different attack models.
const (
KilledItself Type = iota
DelayResponse
IncorrectResponse
)
// Model contains different models of attacking.
type Model struct {
AttackEnabled bool
attackType Type
ViewIDThreshold uint64
readyByConsensusThreshold bool
}
var attackModel *Model
var once sync.Once
// GetInstance returns attack model by using singleton pattern.
func GetInstance() *Model {
once.Do(func() {
attackModel = &Model{}
attackModel.Init()
})
return attackModel
}
// Init initializes attack model.
func (attack *Model) Init() {
attack.AttackEnabled = false
attack.readyByConsensusThreshold = false
}
// SetAttackEnabled sets attack model enabled.
func (attack *Model) SetAttackEnabled(AttackEnabled bool) {
attack.AttackEnabled = AttackEnabled
if AttackEnabled {
attack.attackType = Type(rand.Intn(3))
attack.ViewIDThreshold = uint64(ViewIDThresholdMin + rand.Intn(ViewIDThresholdMax-ViewIDThresholdMin))
}
}
// Run runs enabled attacks.
func (attack *Model) Run() {
attack.NodeKilledByItSelf()
attack.DelayResponse()
}
// NodeKilledByItSelf runs killing itself attack
func (attack *Model) NodeKilledByItSelf() {
if !attack.AttackEnabled || attack.attackType != KilledItself || !attack.readyByConsensusThreshold {
return
}
if rand.Intn(HitRate) == 0 {
utils.Logger().Debug().
Int("PID", os.Getpid()).
Msg("******************Killing myself******************")
os.Exit(1)
}
}
// DelayResponse does attack by delaying response.
func (attack *Model) DelayResponse() {
if !attack.AttackEnabled || attack.attackType != DelayResponse || !attack.readyByConsensusThreshold {
return
}
if rand.Intn(HitRate) == 0 {
utils.Logger().Debug().
Int("PID", os.Getpid()).
Msg("******************Model: DelayResponse******************")
time.Sleep(DelayResponseDuration)
}
}
// IncorrectResponse returns if the attack model enable incorrect responding.
func (attack *Model) IncorrectResponse() bool {
if !attack.AttackEnabled || attack.attackType != IncorrectResponse || !attack.readyByConsensusThreshold {
return false
}
if rand.Intn(HitRate) == 0 {
utils.Logger().Debug().
Int("PID", os.Getpid()).
Msg("******************Model: IncorrectResponse******************")
return true
}
return false
}
// UpdateConsensusReady enables an attack type given the current viewID.
func (attack *Model) UpdateConsensusReady(viewID uint64) {
if viewID > attack.ViewIDThreshold {
attack.readyByConsensusThreshold = true
}
}

@ -1,26 +0,0 @@
package attack
import (
"testing"
"github.com/stretchr/testify/assert"
)
// Simple test for IncorrectResponse
func TestIncorrectResponse(t *testing.T) {
GetInstance().SetAttackEnabled(false)
assert.False(t, GetInstance().IncorrectResponse(), "error")
GetInstance().SetAttackEnabled(true)
}
// Simple test for UpdateConsensusReady
func TestUpdateConsensusReady(t *testing.T) {
model := GetInstance()
model.NodeKilledByItSelf()
model.UpdateConsensusReady(model.ViewIDThreshold - 1)
model.DelayResponse()
model.UpdateConsensusReady(model.ViewIDThreshold + 1)
model.DelayResponse()
}

@ -74,11 +74,8 @@ type ConfigType struct {
role Role // Role of the node
Port string // Port of the node.
IP string // IP of the node.
MetricsFlag bool // collect and upload metrics flag
PushgatewayIP string // metrics pushgateway prometheus ip
PushgatewayPort string // metrics pushgateway prometheus port
StringRole string
P2pPriKey p2p_crypto.PrivKey
P2PPriKey p2p_crypto.PrivKey
ConsensusPriKey *multibls.PrivateKey
ConsensusPubKey *multibls.PublicKey
// Database directory
@ -161,36 +158,6 @@ func (conf *ConfigType) SetRole(r Role) {
conf.role = r
}
// SetPushgatewayIP set the pushgateway ip
func (conf *ConfigType) SetPushgatewayIP(ip string) {
conf.PushgatewayIP = ip
}
// SetPushgatewayPort set the pushgateway port
func (conf *ConfigType) SetPushgatewayPort(port string) {
conf.PushgatewayPort = port
}
// SetMetricsFlag set the metrics flag
func (conf *ConfigType) SetMetricsFlag(flag bool) {
conf.MetricsFlag = flag
}
// GetMetricsFlag get the metrics flag
func (conf *ConfigType) GetMetricsFlag() bool {
return conf.MetricsFlag
}
// GetPushgatewayIP get the pushgateway ip
func (conf *ConfigType) GetPushgatewayIP() string {
return conf.PushgatewayIP
}
// GetPushgatewayPort get the pushgateway port
func (conf *ConfigType) GetPushgatewayPort() string {
return conf.PushgatewayPort
}
// GetBeaconGroupID returns the groupID for beacon group
func (conf *ConfigType) GetBeaconGroupID() GroupID {
return conf.beacon

@ -16,7 +16,7 @@ import (
type GroupID string
func (id GroupID) String() string {
return fmt.Sprintf("%s", string(id))
return string(id)
}
// Const of group ID

@ -1,136 +0,0 @@
package memprofiling
import (
"fmt"
"net/http"
"reflect"
"runtime"
"sync"
"time"
"github.com/fjl/memsize"
"github.com/fjl/memsize/memsizeui"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
)
// Constants for mem profiling.
const (
MemProfilingPortDiff = 1000
// Constants of for scanning mem size.
memSizeScanTime = 30 * time.Second
// Run garbage collector every 30 minutes.
gcTime = 10 * time.Minute
// Print out memstat every memStatTime.
memStatTime = 300 * time.Second
)
// MemProfiling is the struct to watch objects for memprofiling.
type MemProfiling struct {
h *memsizeui.Handler
s *http.Server
observedObject map[string]interface{}
mu sync.Mutex
}
// New returns MemProfiling object.
func New() *MemProfiling {
return &MemProfiling{
observedObject: map[string]interface{}{},
h: new(memsizeui.Handler),
}
}
// Config configures mem profiling.
func (m *MemProfiling) Config() {
m.s = &http.Server{
Addr: fmt.Sprintf("%s:%s", nodeconfig.GetDefaultConfig().IP, utils.GetPortFromDiff(nodeconfig.GetDefaultConfig().Port, MemProfilingPortDiff)),
Handler: m.h,
}
utils.Logger().Info().
Str("port", utils.GetPortFromDiff(nodeconfig.GetDefaultConfig().Port, MemProfilingPortDiff)).
Msgf("running mem profiling")
}
// Add adds variables to watch for profiling.
func (m *MemProfiling) Add(name string, v interface{}) {
m.mu.Lock()
defer m.mu.Unlock()
if v != nil {
rv := reflect.ValueOf(v)
if !(rv.Kind() != reflect.Ptr || rv.IsNil()) {
m.h.Add(name, v)
m.observedObject[name] = v
}
}
}
// Start starts profiling server.
func (m *MemProfiling) Start() {
go m.s.ListenAndServe()
m.PeriodicallyScanMemSize()
}
// Stop stops mem profiling.
func (m *MemProfiling) Stop() {
m.s.Shutdown(nil)
}
// PeriodicallyScanMemSize scans memsize of the observed objects every 30 seconds.
func (m *MemProfiling) PeriodicallyScanMemSize() {
go func() {
// TODO ek – infinite loop; add shutdown/cleanup logic
for {
select {
case <-time.After(memSizeScanTime):
m.mu.Lock()
m := GetMemProfiling()
for k, v := range m.observedObject {
s := memsize.Scan(v)
r := s.Report()
utils.Logger().Info().Msgf("memsize report for %s:\n %s", k, r)
}
m.mu.Unlock()
}
}
}()
}
// MaybeCallGCPeriodically runs GC manually every gcTime minutes. This is one of the options to mitigate the OOM issue.
func MaybeCallGCPeriodically() {
go func() {
// TODO ek – infinite loop; add shutdown/cleanup logic
for {
select {
case <-time.After(gcTime):
PrintMemUsage("Memory stats before GC")
runtime.GC()
PrintMemUsage("Memory stats after GC")
}
}
}()
go func() {
// TODO ek – infinite loop; add shutdown/cleanup logic
for {
select {
case <-time.After(memStatTime):
PrintMemUsage("Memory stats")
}
}
}()
}
// PrintMemUsage prints memory usage.
func PrintMemUsage(msg string) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
utils.Logger().Info().
Uint64("alloc", bToMb(m.Alloc)).
Uint64("totalalloc", bToMb(m.TotalAlloc)).
Uint64("sys", bToMb(m.Sys)).
Uint32("numgc", m.NumGC)
}
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}

@ -1,14 +0,0 @@
package memprofiling
import "sync"
var singleton *MemProfiling
var once sync.Once
// GetMemProfiling returns a pointer of MemProfiling.
func GetMemProfiling() *MemProfiling {
once.Do(func() {
singleton = New()
})
return singleton
}

@ -1,96 +0,0 @@
package profiler
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"os"
"sync"
"time"
"github.com/harmony-one/harmony/internal/utils"
"github.com/shirou/gopsutil/process"
)
// Profiler is the profiler data structure.
type Profiler struct {
pid int32
shardID uint32
MetricsReportURL string
// Internal
proc *process.Process
}
var singleton *Profiler
var once sync.Once
// GetProfiler returns a pointer of Profiler.
// TODO: This should be a New method.
func GetProfiler() *Profiler {
once.Do(func() {
singleton = &Profiler{}
})
return singleton
}
// Config configurates Profiler.
func (profiler *Profiler) Config(shardID uint32, metricsReportURL string) {
profiler.pid = int32(os.Getpid())
profiler.shardID = shardID
profiler.MetricsReportURL = metricsReportURL
}
// LogMemory logs memory.
func (profiler *Profiler) LogMemory() {
// TODO ek – infinite loop; add shutdown/cleanup logic
for {
// log mem usage
info, _ := profiler.proc.MemoryInfo()
memMap, _ := profiler.proc.MemoryMaps(false)
loggedMemMap := ""
for _, mems := range *memMap {
loggedMemMap = fmt.Sprintf("%v; %v", loggedMemMap, mems)
}
utils.Logger().Info().
Str("info", info.String()).
Str("map", loggedMemMap).
Uint32("shardID", profiler.shardID).
Msg("Mem Report")
time.Sleep(3 * time.Second)
}
}
// LogCPU logs CPU metrics.
func (profiler *Profiler) LogCPU() {
// TODO ek – infinite loop; add shutdown/cleanup logic
for {
// log cpu usage
percent, _ := profiler.proc.CPUPercent()
times, _ := profiler.proc.Times()
utils.Logger().Info().
Float64("percent", percent).
Str("times", times.String()).
Uint32("shardID", profiler.shardID).
Msg("CPU Report")
time.Sleep(3 * time.Second)
}
}
// LogMetrics logs metrics.
func (profiler *Profiler) LogMetrics(metrics map[string]interface{}) {
jsonValue, _ := json.Marshal(metrics)
rsp, err := http.Post(profiler.MetricsReportURL, "application/json", bytes.NewBuffer(jsonValue))
if err == nil {
defer rsp.Body.Close()
}
}
// Start starts profiling.
func (profiler *Profiler) Start() {
profiler.proc, _ = process.NewProcess(profiler.pid)
go profiler.LogCPU()
go profiler.LogMemory()
}

@ -1,6 +0,0 @@
package utils
// BToMb ...
func BToMb(b uint64) uint64 {
return b / 1024 / 1024
}

@ -1,19 +0,0 @@
package utils
import (
"testing"
"github.com/stretchr/testify/assert"
)
// Test for BToMb.
func TestBToMb(t *testing.T) {
a := uint64(1024*1024 + 1)
assert.Equal(t, BToMb(a), uint64(1), "should be equal to 1")
a = uint64(1024*1024 - 1)
assert.Equal(t, BToMb(a), uint64(0), "should be equal to 0")
a = uint64(1024 * 1024)
assert.Equal(t, BToMb(a), uint64(1), "should be equal to 0")
}

@ -719,12 +719,10 @@ func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) {
chanPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
PushgatewayIP: node.NodeConfig.GetPushgatewayIP(),
PushgatewayPort: node.NodeConfig.GetPushgatewayPort(),
IsClient: node.NodeConfig.IsClient(),
Beacon: nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID),
ShardGroupID: node.NodeConfig.GetShardGroupID(),
Actions: make(map[nodeconfig.GroupID]nodeconfig.ActionType),
IsClient: node.NodeConfig.IsClient(),
Beacon: nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID),
ShardGroupID: node.NodeConfig.GetShardGroupID(),
Actions: make(map[nodeconfig.GroupID]nodeconfig.ActionType),
}
if nodeConfig.IsClient {

@ -1,92 +0,0 @@
package node
import (
"time"
metrics "github.com/harmony-one/harmony/api/service/metrics"
"github.com/harmony-one/harmony/internal/utils"
)
// UpdateBlockHeightForMetrics updates block height for metrics service.
func (node *Node) UpdateBlockHeightForMetrics(prevBlockHeight uint64) uint64 {
curBlock := node.Blockchain().CurrentBlock()
curBlockHeight := curBlock.NumberU64()
if curBlockHeight == prevBlockHeight {
return prevBlockHeight
}
utils.Logger().Info().Msgf("Updating metrics block height %d", curBlockHeight)
metrics.UpdateBlockHeight(curBlockHeight)
blockReward := node.Consensus.GetBlockReward()
if blockReward != nil {
utils.Logger().Info().Msgf("Updating metrics block reward %d", blockReward.Uint64())
metrics.UpdateBlockReward(blockReward)
}
return curBlockHeight
}
// UpdateConnectionsNumberForMetrics uppdates connections number for metrics service.
func (node *Node) UpdateConnectionsNumberForMetrics(prevNumPeers int) int {
curNumPeers := node.numPeers
if curNumPeers == prevNumPeers {
return prevNumPeers
}
utils.Logger().Info().Msgf("Updating metrics connections number %d", curNumPeers)
metrics.UpdateConnectionsNumber(curNumPeers)
return curNumPeers
}
// UpdateTxPoolSizeForMetrics updates tx pool size for metrics service.
func (node *Node) UpdateTxPoolSizeForMetrics(txPoolSize uint64) {
utils.Logger().Info().Msgf("Updating metrics tx pool size %d", txPoolSize)
metrics.UpdateTxPoolSize(txPoolSize)
}
// UpdateBalanceForMetrics uppdates node balance for metrics service.
func (node *Node) UpdateBalanceForMetrics() {
for _, addr := range node.Consensus.SelfAddresses {
curBalance, err := node.GetBalanceOfAddress(addr)
if err != nil {
return
}
utils.Logger().Info().Msgf("Updating metrics node balance %d", curBalance.Uint64())
metrics.UpdateNodeBalance(curBalance)
}
}
// UpdateLastConsensusTimeForMetrics uppdates last consensus reached time for metrics service.
func (node *Node) UpdateLastConsensusTimeForMetrics(prevLastConsensusTime int64) int64 {
lastConsensusTime := node.lastConsensusTime
if lastConsensusTime == prevLastConsensusTime {
return prevLastConsensusTime
}
utils.Logger().Info().Msgf("Updating metrics last consensus time reached %d", lastConsensusTime)
metrics.UpdateLastConsensus(lastConsensusTime)
return lastConsensusTime
}
// UpdateIsLeaderForMetrics updates if node is a leader now for metrics serivce.
func (node *Node) UpdateIsLeaderForMetrics() {
if node.Consensus.LeaderPubKey.SerializeToHexStr() == node.Consensus.PubKey.SerializeToHexStr() {
utils.Logger().Info().Msgf("Node %s is a leader now", node.Consensus.PubKey.SerializeToHexStr())
metrics.UpdateIsLeader(true)
} else {
utils.Logger().Info().Msgf("Node %s is not a leader now", node.Consensus.PubKey.SerializeToHexStr())
metrics.UpdateIsLeader(false)
}
}
// CollectMetrics collects metrics: block height, connections number, node balance, block reward, last consensus, accepted blocks.
func (node *Node) CollectMetrics() {
utils.Logger().Info().Msg("[Metrics Service] Update metrics")
prevNumPeers := 0
prevBlockHeight := uint64(0)
prevLastConsensusTime := int64(0)
for range time.Tick(100 * time.Millisecond) {
prevBlockHeight = node.UpdateBlockHeightForMetrics(prevBlockHeight)
prevNumPeers = node.UpdateConnectionsNumberForMetrics(prevNumPeers)
prevLastConsensusTime = node.UpdateLastConsensusTimeForMetrics(prevLastConsensusTime)
node.UpdateBalanceForMetrics()
node.UpdateTxPoolSizeForMetrics(node.TxPool.GetTxPoolSize())
node.UpdateIsLeaderForMetrics()
}
}

@ -10,7 +10,6 @@ import (
"github.com/harmony-one/harmony/api/service/consensus"
"github.com/harmony-one/harmony/api/service/discovery"
"github.com/harmony-one/harmony/api/service/explorer"
"github.com/harmony-one/harmony/api/service/metrics"
"github.com/harmony-one/harmony/api/service/networkinfo"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
@ -51,17 +50,6 @@ func (node *Node) setupForValidator() {
),
)
}
// Register new metrics service
if node.NodeConfig.GetMetricsFlag() {
node.serviceManager.RegisterService(
service.Metrics,
metrics.New(
&node.SelfPeer,
node.NodeConfig.ConsensusPubKey.SerializeToHexStr(),
node.NodeConfig.GetPushgatewayIP(),
node.NodeConfig.GetPushgatewayPort()),
)
}
}
func (node *Node) setupForExplorerNode() {

Loading…
Cancel
Save