Add uptime stats

pull/1849/head
Rongjian Lan 5 years ago
parent a2bde5651b
commit f3927bd465
  1. 65
      core/blockchain.go
  2. 31
      core/rawdb/accessors_chain.go
  3. 6
      core/rawdb/schema.go
  4. 1
      core/state_transition.go
  5. 34
      crypto/bls/bls.go
  6. 6
      hmy/api_backend.go
  7. 1
      internal/hmyapi/backend.go
  8. 12
      internal/hmyapi/blockchain.go
  9. 2
      internal/hmyapi/types.go
  10. 14
      node/node.go
  11. 19
      node/node_handler.go
  12. 2
      node/node_newblock.go
  13. 8
      staking/types/transaction.go
  14. 10
      staking/types/validator.go

@ -27,6 +27,8 @@ import (
"sync/atomic"
"time"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
@ -71,6 +73,7 @@ const (
epochCacheLimit = 10
randomnessCacheLimit = 10
validatorCacheLimit = 1024
validatorStatsCacheLimit = 1024
validatorListCacheLimit = 10
validatorListByDelegatorCacheLimit = 1024
@ -135,7 +138,8 @@ type BlockChain struct {
lastCommitsCache *lru.Cache
epochCache *lru.Cache // Cache epoch number → first block number
randomnessCache *lru.Cache // Cache for vrf/vdf
validatorCache *lru.Cache // Cache for staking validator
validatorCache *lru.Cache // Cache for validator info
validatorStatsCache *lru.Cache // Cache for validator stats
validatorListCache *lru.Cache // Cache of validator list
validatorListByDelegatorCache *lru.Cache // Cache of validator list by delegator
@ -174,7 +178,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
commitsCache, _ := lru.New(commitsCacheLimit)
epochCache, _ := lru.New(epochCacheLimit)
randomnessCache, _ := lru.New(randomnessCacheLimit)
stakingCache, _ := lru.New(validatorCacheLimit)
validatorCache, _ := lru.New(validatorCacheLimit)
validatorStatsCache, _ := lru.New(validatorStatsCacheLimit)
validatorListCache, _ := lru.New(validatorListCacheLimit)
validatorListByDelegatorCache, _ := lru.New(validatorListByDelegatorCacheLimit)
@ -195,7 +200,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
lastCommitsCache: commitsCache,
epochCache: epochCache,
randomnessCache: randomnessCache,
validatorCache: stakingCache,
validatorCache: validatorCache,
validatorStatsCache: validatorStatsCache,
validatorListCache: validatorListCache,
validatorListByDelegatorCache: validatorListByDelegatorCache,
engine: engine,
@ -2374,6 +2380,59 @@ func (bc *BlockChain) WriteValidatorSnapshots(addrs []common.Address) error {
return nil
}
// ReadValidatorStats reads the stats of a validator
func (bc *BlockChain) ReadValidatorStats(addr common.Address) (*staking.ValidatorStats, error) {
return rawdb.ReadValidatorStats(bc.db, addr)
}
// WriteValidatorStats writes the stats for the committee
func (bc *BlockChain) WriteValidatorStats(slots shard.SlotList, mask *bls.Mask) error {
blsToAddress := make(map[shard.BlsPublicKey]common.Address)
for _, slot := range slots {
blsToAddress[slot.BlsPublicKey] = slot.EcdsaAddress
}
batch := bc.db.NewBatch()
blsKeyBytes := shard.BlsPublicKey{}
for i, blsPubKey := range mask.Publics {
err := blsKeyBytes.FromLibBLSPublicKey(blsPubKey)
if err != nil {
return err
}
if addr, ok := blsToAddress[blsKeyBytes]; ok {
stats, err := rawdb.ReadValidatorStats(bc.db, addr)
if stats == nil {
stats = &staking.ValidatorStats{big.NewInt(0), big.NewInt(0), big.NewInt(0)}
}
stats.NumBlocksToSign.Add(stats.NumBlocksToSign, big.NewInt(1))
enabled, err := mask.IndexEnabled(i)
if err != nil {
return err
}
if enabled {
stats.NumBlocksSigned.Add(stats.NumBlocksSigned, big.NewInt(1))
}
// TODO: record time being jailed.
fmt.Println(stats)
err = rawdb.WriteValidatorStats(batch, addr, stats)
fmt.Println(err)
if err != nil {
return err
}
} else {
return fmt.Errorf("Bls public key not found in committee: %x", blsKeyBytes)
}
}
if err := batch.Write(); err != nil {
return err
}
// TODO: Update cache
return nil
}
// DeleteValidatorSnapshots deletes the snapshot staking information of given validator address
func (bc *BlockChain) DeleteValidatorSnapshots(addrs []common.Address) error {
batch := bc.db.NewBatch()

@ -652,7 +652,7 @@ func ReadValidatorSnapshot(db DatabaseReader, addr common.Address) (*staking.Val
}
v := staking.ValidatorWrapper{}
if err := rlp.DecodeBytes(data, &v); err != nil {
utils.Logger().Error().Err(err).Str("address", addr.Hex()).Msg("Unable to Decode staking validator from database")
utils.Logger().Error().Err(err).Str("address", addr.Hex()).Msg("Unable to decode validator snapshot from database")
return nil, err
}
return &v, nil
@ -672,6 +672,35 @@ func WriteValidatorSnapshot(db DatabaseWriter, v *staking.ValidatorWrapper) erro
return err
}
// ReadValidatorStats retrieves validator's stats by its address
func ReadValidatorStats(db DatabaseReader, addr common.Address) (*staking.ValidatorStats, error) {
data, err := db.Get(validatorStatsKey(addr))
if err != nil || len(data) == 0 {
utils.Logger().Info().Err(err).Msg("ReadValidatorStats")
return nil, err
}
stats := staking.ValidatorStats{}
if err := rlp.DecodeBytes(data, &stats); err != nil {
utils.Logger().Error().Err(err).Str("address", addr.Hex()).Msg("Unable to decode validator stats from database")
return nil, err
}
return &stats, nil
}
// WriteValidatorStats stores validator's stats by its address
func WriteValidatorStats(db DatabaseWriter, addr common.Address, stats *staking.ValidatorStats) error {
bytes, err := rlp.EncodeToBytes(stats)
if err != nil {
utils.Logger().Error().Msg("[WriteValidatorStats] Failed to encode")
return err
}
if err := db.Put(validatorStatsKey(addr), bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorStats] Failed to store to database")
return err
}
return err
}
// DeleteValidatorSnapshot removes the validator's snapshot by its address
func DeleteValidatorSnapshot(db DatabaseDeleter, addr common.Address) {
if err := db.Delete(validatorSnapshotKey(addr)); err != nil {

@ -76,6 +76,7 @@ var (
validatorPrefix = []byte("validator-") // prefix for staking validator information
validatorSnapshotPrefix = []byte("validator-snapshot-") // prefix for staking validator's snapshot information
validatorStatsPrefix = []byte("validator-stats-") // prefix for staking validator's stats information
validatorListKey = []byte("validator-list") // key for all validators list
activeValidatorListKey = []byte("active-validator-list") // key for active validators list
@ -249,3 +250,8 @@ func validatorSnapshotKey(addr common.Address) []byte {
prefix := validatorSnapshotPrefix
return append(prefix, addr.Bytes()...)
}
func validatorStatsKey(addr common.Address) []byte {
prefix := validatorStatsPrefix
return append(prefix, addr.Bytes()...)
}

@ -487,7 +487,6 @@ func (st *StateTransition) applyUndelegateTx(undelegate *staking.Undelegate) err
if !delegatorExist {
return errNoDelegationToUndelegate
}
// TODO: do undelegated token distribution after locking period. (in leader block proposal phase)
return nil
}

@ -41,7 +41,7 @@ func AggregateSig(sigs []*bls.Sign) *bls.Sign {
// Mask represents a cosigning participation bitmask.
type Mask struct {
Bitmap []byte
publics []*bls.PublicKey
Publics []*bls.PublicKey
AggregatePublic *bls.PublicKey
}
@ -51,7 +51,7 @@ type Mask struct {
// bitmask to 1 (enabled).
func NewMask(publics []*bls.PublicKey, myKey *bls.PublicKey) (*Mask, error) {
m := &Mask{
publics: publics,
Publics: publics,
}
m.Bitmap = make([]byte, m.Len())
m.AggregatePublic = &bls.PublicKey{}
@ -80,7 +80,7 @@ func (m *Mask) Mask() []byte {
// Len returns the Bitmap length in bytes.
func (m *Mask) Len() int {
return (len(m.publics) + 7) >> 3
return (len(m.Publics) + 7) >> 3
}
// SetMask sets the participation bitmask according to the given byte slice
@ -92,16 +92,16 @@ func (m *Mask) SetMask(mask []byte) error {
"expectedBitmapLength", m.Len(),
"providedBitmapLength", len(mask))
}
for i := range m.publics {
for i := range m.Publics {
byt := i >> 3
msk := byte(1) << uint(i&7)
if ((m.Bitmap[byt] & msk) == 0) && ((mask[byt] & msk) != 0) {
m.Bitmap[byt] ^= msk // flip bit in Bitmap from 0 to 1
m.AggregatePublic.Add(m.publics[i])
m.AggregatePublic.Add(m.Publics[i])
}
if ((m.Bitmap[byt] & msk) != 0) && ((mask[byt] & msk) == 0) {
m.Bitmap[byt] ^= msk // flip bit in Bitmap from 1 to 0
m.AggregatePublic.Sub(m.publics[i])
m.AggregatePublic.Sub(m.Publics[i])
}
}
return nil
@ -110,18 +110,18 @@ func (m *Mask) SetMask(mask []byte) error {
// SetBit enables (enable: true) or disables (enable: false) the bit
// in the participation Bitmap of the given cosigner.
func (m *Mask) SetBit(i int, enable bool) error {
if i >= len(m.publics) {
if i >= len(m.Publics) {
return errors.New("index out of range")
}
byt := i >> 3
msk := byte(1) << uint(i&7)
if ((m.Bitmap[byt] & msk) == 0) && enable {
m.Bitmap[byt] ^= msk // flip bit in Bitmap from 0 to 1
m.AggregatePublic.Add(m.publics[i])
m.AggregatePublic.Add(m.Publics[i])
}
if ((m.Bitmap[byt] & msk) != 0) && !enable {
m.Bitmap[byt] ^= msk // flip bit in Bitmap from 1 to 0
m.AggregatePublic.Sub(m.publics[i])
m.AggregatePublic.Sub(m.Publics[i])
}
return nil
}
@ -130,16 +130,16 @@ func (m *Mask) SetBit(i int, enable bool) error {
// it is used to show which signers are signed or not in the cosign message
func (m *Mask) GetPubKeyFromMask(flag bool) []*bls.PublicKey {
pubKeys := []*bls.PublicKey{}
for i := range m.publics {
for i := range m.Publics {
byt := i >> 3
msk := byte(1) << uint(i&7)
if flag == true {
if (m.Bitmap[byt] & msk) != 0 {
pubKeys = append(pubKeys, m.publics[i])
pubKeys = append(pubKeys, m.Publics[i])
}
} else {
if (m.Bitmap[byt] & msk) == 0 {
pubKeys = append(pubKeys, m.publics[i])
pubKeys = append(pubKeys, m.Publics[i])
}
}
}
@ -148,7 +148,7 @@ func (m *Mask) GetPubKeyFromMask(flag bool) []*bls.PublicKey {
// IndexEnabled checks whether the given index is enabled in the Bitmap or not.
func (m *Mask) IndexEnabled(i int) (bool, error) {
if i >= len(m.publics) {
if i >= len(m.Publics) {
return false, errors.New("index out of range")
}
byt := i >> 3
@ -159,7 +159,7 @@ func (m *Mask) IndexEnabled(i int) (bool, error) {
// KeyEnabled checks whether the index, corresponding to the given key, is
// enabled in the Bitmap or not.
func (m *Mask) KeyEnabled(public *bls.PublicKey) (bool, error) {
for i, key := range m.publics {
for i, key := range m.Publics {
if key.IsEqual(public) {
return m.IndexEnabled(i)
}
@ -169,7 +169,7 @@ func (m *Mask) KeyEnabled(public *bls.PublicKey) (bool, error) {
// SetKey set the bit in the Bitmap for the given cosigner
func (m *Mask) SetKey(public *bls.PublicKey, enable bool) error {
for i, key := range m.publics {
for i, key := range m.Publics {
if key.IsEqual(public) {
return m.SetBit(i, enable)
}
@ -182,7 +182,7 @@ func (m *Mask) SetKey(public *bls.PublicKey, enable bool) error {
func (m *Mask) CountEnabled() int {
// hw is hamming weight
hw := 0
for i := range m.publics {
for i := range m.Publics {
byt := i >> 3
msk := byte(1) << uint(i&7)
if (m.Bitmap[byt] & msk) != 0 {
@ -194,7 +194,7 @@ func (m *Mask) CountEnabled() int {
// CountTotal returns the total number of nodes this CoSi instance knows.
func (m *Mask) CountTotal() int {
return len(m.publics)
return len(m.Publics)
}
// AggregateMasks computes the bitwise OR of the two given participation masks.

@ -307,6 +307,12 @@ func (b *APIBackend) GetValidatorInformation(addr common.Address) *staking.Valid
return &val.Validator
}
// GetValidatorStats returns the stats of validator
func (b *APIBackend) GetValidatorStats(addr common.Address) *staking.ValidatorStats {
val, _ := b.hmy.BlockChain().ReadValidatorStats(addr)
return val
}
// GetDelegationsByValidator returns all delegation information of a validator
func (b *APIBackend) GetDelegationsByValidator(validator common.Address) []*staking.Delegation {
wrapper, err := b.hmy.BlockChain().ReadValidatorData(validator)

@ -76,6 +76,7 @@ type Backend interface {
GetActiveValidatorAddresses() []common.Address
GetAllValidatorAddresses() []common.Address
GetValidatorInformation(addr common.Address) *staking.Validator
GetValidatorStats(addr common.Address) *staking.ValidatorStats
GetDelegationsByValidator(validator common.Address) []*staking.Delegation
GetDelegationsByDelegator(delegator common.Address) ([]common.Address, []*staking.Delegation)
GetValidatorStakingWithDelegation(addr common.Address) *big.Int

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"github.com/harmony-one/harmony/numeric"
"math/big"
"time"
@ -505,7 +507,15 @@ func (s *PublicBlockChainAPI) GetValidatorInfo(ctx context.Context, address comm
if validator == nil {
return nil, fmt.Errorf("validator not found: %s", address.Hex())
}
return newRPCValidator(validator), nil
rpcValidator := newRPCValidator(validator)
stats := s.b.GetValidatorStats(address)
if stats != nil {
rpcValidator.Uptime = numeric.NewDecFromBigInt(stats.NumBlocksSigned).Quo(numeric.NewDecFromBigInt(stats.NumBlocksToSign)).String()
}
return rpcValidator, nil
}
// GetDelegationsByDelegator returns information about a validator.

@ -76,6 +76,7 @@ type RPCValidator struct {
Commission types2.Commission `json:"commission"`
Description types2.Description `json:"description"`
CreationHeight *big.Int `json:"creation_height"`
Uptime string `json:"uptime"`
}
// RPCDelegation represents a particular delegation to a validator
@ -163,6 +164,7 @@ func newRPCValidator(validator *types2.Validator) *RPCValidator {
validator.Commission,
validator.Description,
validator.CreationHeight,
"",
}
}

@ -129,14 +129,10 @@ type Node struct {
// BeaconNeighbors store only neighbor nodes in the beacon chain shard
BeaconNeighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
TxPool *core.TxPool // TODO migrate to TxPool from pendingTransactions list below
TxPool *core.TxPool
CxPool *core.CxPool // pool for missing cross shard receipts resend
pendingTransactions map[common.Hash]*types.Transaction // All the transactions received but not yet processed for Consensus
pendingTxMutex sync.Mutex
recentTxsStats types.RecentTxsStats
pendingStakingTransactions map[common.Hash]*staking.StakingTransaction // All the staking transactions received but not yet processed for Consensus
pendingStakingTxMutex sync.Mutex
@ -272,12 +268,8 @@ func (node *Node) tryBroadcast(tx *types.Transaction) {
// Add new transactions to the pending transaction list.
func (node *Node) addPendingTransactions(newTxs types.Transactions) {
node.pendingTxMutex.Lock()
node.TxPool.AddRemotes(newTxs)
node.pendingTxMutex.Unlock()
pendingCount, queueCount := node.TxPool.Stats()
utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", pendingCount).Int("totalQueued", queueCount).Msg("Got more transactions")
}
@ -294,8 +286,8 @@ func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTra
break
}
}
node.pendingStakingTxMutex.Unlock()
utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingStakingTransactions)).Msg("Got more staking transactions")
node.pendingStakingTxMutex.Unlock()
}
// AddPendingStakingTransaction staking transactions
@ -427,7 +419,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
node.BlockChannel = make(chan *types.Block)
node.ConfirmedBlockChannel = make(chan *types.Block)
node.BeaconBlockChannel = make(chan *types.Block)
node.recentTxsStats = make(types.RecentTxsStats)
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), blockchain)
node.CxPool = core.NewCxPool(core.CxPoolSize)
node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine)
@ -437,7 +428,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
}
node.pendingCXReceipts = make(map[string]*types.CXReceiptsProof)
node.pendingTransactions = make(map[common.Hash]*types.Transaction)
node.pendingStakingTransactions = make(map[common.Hash]*staking.StakingTransaction)
node.Consensus.VerifiedNewBlock = make(chan *types.Block)
chain.Engine.SetRewarder(node.Consensus.Decider.(reward.Distributor))

@ -8,6 +8,8 @@ import (
"sync/atomic"
"time"
bls2 "github.com/harmony-one/harmony/crypto/bls"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/bls/ffi/go/bls"
@ -359,6 +361,23 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit
// Broadcast client requested missing cross shard receipts if there is any
node.BroadcastMissingCXReceipts()
// Writing validator stats (for uptime recording)
// TODO: only record for open staking validators
prevBlock := node.Blockchain().GetBlockByHash(newBlock.ParentHash())
if prevBlock != nil {
shardState, err := node.Blockchain().ReadShardState(prevBlock.Epoch())
if err == nil {
members := node.Consensus.Decider.Participants()
mask, _ := bls2.NewMask(members, nil)
mask.SetMask(commitSigAndBitmap[96:])
err = node.Blockchain().WriteValidatorStats(shardState.FindCommitteeByID(newBlock.ShardID()).Slots, mask)
if err != nil {
utils.Logger().Err(err)
}
}
}
// Update consensus keys at last so the change of leader status doesn't mess up normal flow
if shard.Schedule.IsLastBlock(newBlock.Number().Uint64()) {
node.Consensus.UpdateConsensusInformation()

@ -88,10 +88,12 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
// TODO: integrate staking transaction into tx pool
pendingStakingTransactions := types2.StakingTransactions{}
node.pendingStakingTxMutex.Lock()
for _, tx := range node.pendingStakingTransactions {
pendingStakingTransactions = append(pendingStakingTransactions, tx)
}
node.pendingStakingTransactions = make(map[common.Hash]*types2.StakingTransaction)
node.pendingStakingTxMutex.Unlock()
node.Worker.UpdateCurrent(coinbase)
if err := node.Worker.CommitTransactions(pending, pendingStakingTransactions, coinbase); err != nil {

@ -33,8 +33,12 @@ func (d *txdata) CopyFrom(d2 *txdata) {
d.AccountNonce = d2.AccountNonce
d.Price = new(big.Int).Set(d2.Price)
d.GasLimit = d2.GasLimit
// TODO: add code to protect crashing
d.StakeMsg = d2.StakeMsg.(StakeMsg).Copy()
// This is workaround, direct RLP encoding/decoding not work
payload, _ := rlp.EncodeToBytes(d2.StakeMsg)
restored, _ := RLPDecodeStakeMsg(
payload, d2.Directive,
)
d.StakeMsg = restored.(StakeMsg).Copy()
d.V = new(big.Int).Set(d2.V)
d.R = new(big.Int).Set(d2.R)
d.S = new(big.Int).Set(d2.S)

@ -40,6 +40,16 @@ type ValidatorWrapper struct {
Delegations []Delegation `json:"delegations" yaml:"delegations" rlp:"nil"`
}
// ValidatorStats to record validator's performance and history records
type ValidatorStats struct {
// The number of blocks the validator should've signed when in active mode (selected in committee)
NumBlocksToSign *big.Int
// The number of blocks the validator actually signed
NumBlocksSigned *big.Int
// The number of times they validator is jailed due to extensive downtime
NumJailed *big.Int
}
// Validator - data fields for a validator
type Validator struct {
// ECDSA address of the validator

Loading…
Cancel
Save