Merge branch 'main' of github.com:harmony-one/harmony into HEAD

pull/3969/head
Rongjian Lan 3 years ago
commit a42f6f9897
  1. 186
      rpc/blockchain.go
  2. 2
      rpc/metrics.go
  3. 47
      rpc/staking.go

@ -4,10 +4,15 @@ import (
"context" "context"
"fmt" "fmt"
"math/big" "math/big"
"reflect"
"time" "time"
"encoding/hex"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -38,6 +43,10 @@ type PublicBlockchainService struct {
limiter *rate.Limiter limiter *rate.Limiter
rpcBlockFactory rpc_common.BlockFactory rpcBlockFactory rpc_common.BlockFactory
helper *bcServiceHelper helper *bcServiceHelper
// TEMP SOLUTION to rpc node spamming issue
limiterGetStakingNetworkInfo *rate.Limiter
limiterGetSuperCommittees *rate.Limiter
limiterGetCurrentUtilityMetrics *rate.Limiter
} }
const ( const (
@ -49,7 +58,7 @@ const (
func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API { func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API {
var limiter *rate.Limiter var limiter *rate.Limiter
if limiterEnable { if limiterEnable {
limiter := rate.NewLimiter(rate.Limit(limit), 1) limiter = rate.NewLimiter(rate.Limit(limit), limit)
strLimit := fmt.Sprintf("%d", int64(limiter.Limit())) strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
rpcRateLimitCounterVec.With(prometheus.Labels{ rpcRateLimitCounterVec.With(prometheus.Labels{
"rate_limit": strLimit, "rate_limit": strLimit,
@ -60,6 +69,9 @@ func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable boo
hmy: hmy, hmy: hmy,
version: version, version: version,
limiter: limiter, limiter: limiter,
limiterGetStakingNetworkInfo: rate.NewLimiter(5, 10),
limiterGetSuperCommittees: rate.NewLimiter(5, 10),
limiterGetCurrentUtilityMetrics: rate.NewLimiter(5, 10),
} }
s.helper = s.newHelper() s.helper = s.newHelper()
@ -140,18 +152,19 @@ func (s *PublicBlockchainService) BlockNumber(ctx context.Context) (interface{},
} }
} }
func (s *PublicBlockchainService) wait(ctx context.Context) error { func (s *PublicBlockchainService) wait(limiter *rate.Limiter, ctx context.Context) error {
if s.limiter != nil { if limiter != nil {
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout) deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel() defer cancel()
if !s.limiter.Allow() { if !limiter.Allow() {
strLimit := fmt.Sprintf("%d", int64(s.limiter.Limit())) strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{ rpcRateLimitCounterVec.With(prometheus.Labels{
"rate_limit": strLimit, name: strLimit,
}).Inc() }).Inc()
} }
return s.limiter.Wait(deadlineCtx) return limiter.Wait(deadlineCtx)
} }
return nil return nil
} }
@ -165,7 +178,7 @@ func (s *PublicBlockchainService) GetBlockByNumber(
timer := DoMetricRPCRequest(GetBlockByNumber) timer := DoMetricRPCRequest(GetBlockByNumber)
defer DoRPCRequestDuration(GetBlockByNumber, timer) defer DoRPCRequestDuration(GetBlockByNumber, timer)
err = s.wait(ctx) err = s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetBlockByNumber, FailedNumber) DoMetricRPCQueryInfo(GetBlockByNumber, FailedNumber)
return nil, err return nil, err
@ -218,7 +231,7 @@ func (s *PublicBlockchainService) GetBlockByHash(
timer := DoMetricRPCRequest(GetBlockByHash) timer := DoMetricRPCRequest(GetBlockByHash)
defer DoRPCRequestDuration(GetBlockByHash, timer) defer DoRPCRequestDuration(GetBlockByHash, timer)
err = s.wait(ctx) err = s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetBlockByHash, FailedNumber) DoMetricRPCQueryInfo(GetBlockByHash, FailedNumber)
return nil, err return nil, err
@ -508,7 +521,7 @@ func (s *PublicBlockchainService) GetShardingStructure(
timer := DoMetricRPCRequest(GetShardingStructure) timer := DoMetricRPCRequest(GetShardingStructure)
defer DoRPCRequestDuration(GetShardingStructure, timer) defer DoRPCRequestDuration(GetShardingStructure, timer)
err := s.wait(ctx) err := s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetShardingStructure, FailedNumber) DoMetricRPCQueryInfo(GetShardingStructure, FailedNumber)
return nil, err return nil, err
@ -565,7 +578,7 @@ func (s *PublicBlockchainService) LatestHeader(ctx context.Context) (StructuredR
timer := DoMetricRPCRequest(LatestHeader) timer := DoMetricRPCRequest(LatestHeader)
defer DoRPCRequestDuration(LatestHeader, timer) defer DoRPCRequestDuration(LatestHeader, timer)
err := s.wait(ctx) err := s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(LatestHeader, FailedNumber) DoMetricRPCQueryInfo(LatestHeader, FailedNumber)
return nil, err return nil, err
@ -600,7 +613,7 @@ func (s *PublicBlockchainService) GetLastCrossLinks(
timer := DoMetricRPCRequest(GetLastCrossLinks) timer := DoMetricRPCRequest(GetLastCrossLinks)
defer DoRPCRequestDuration(GetLastCrossLinks, timer) defer DoRPCRequestDuration(GetLastCrossLinks, timer)
err := s.wait(ctx) err := s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetLastCrossLinks, FailedNumber) DoMetricRPCQueryInfo(GetLastCrossLinks, FailedNumber)
return nil, err return nil, err
@ -638,7 +651,7 @@ func (s *PublicBlockchainService) GetHeaderByNumber(
timer := DoMetricRPCRequest(GetHeaderByNumber) timer := DoMetricRPCRequest(GetHeaderByNumber)
defer DoRPCRequestDuration(GetHeaderByNumber, timer) defer DoRPCRequestDuration(GetHeaderByNumber, timer)
err := s.wait(ctx) err := s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetHeaderByNumber, FailedNumber) DoMetricRPCQueryInfo(GetHeaderByNumber, FailedNumber)
return nil, err return nil, err
@ -663,6 +676,145 @@ func (s *PublicBlockchainService) GetHeaderByNumber(
return nil, err return nil, err
} }
// Result structs for GetProof
type AccountResult struct {
Address common.Address `json:"address"`
AccountProof []string `json:"accountProof"`
Balance *hexutil.Big `json:"balance"`
CodeHash common.Hash `json:"codeHash"`
Nonce hexutil.Uint64 `json:"nonce"`
StorageHash common.Hash `json:"storageHash"`
StorageProof []StorageResult `json:"storageProof"`
}
type StorageResult struct {
Key string `json:"key"`
Value *hexutil.Big `json:"value"`
Proof []string `json:"proof"`
}
// GetHeaderByNumberRLPHex returns block header at given number by `hex(rlp(header))`
func (s *PublicBlockchainService) GetProof(
ctx context.Context, address common.Address, storageKeys []string, blockNumber BlockNumber) (ret *AccountResult, err error) {
timer := DoMetricRPCRequest(GetProof)
defer DoRPCRequestDuration(GetProof, timer)
defer func() {
if ret == nil || err != nil {
DoMetricRPCQueryInfo(GetProof, FailedNumber)
}
}()
err = s.wait(s.limiter, ctx)
if err != nil {
return
}
// Process number based on version
blockNum := blockNumber.EthBlockNumber()
// Ensure valid block number
if s.version != Eth && isBlockGreaterThanLatest(s.hmy, blockNum) {
err = ErrRequestedBlockTooHigh
return
}
// Fetch Header
header, err := s.hmy.HeaderByNumber(ctx, blockNum)
if header == nil && err != nil {
return
}
state, err := s.hmy.BeaconChain.StateAt(header.Root())
if state == nil || err != nil {
return
}
storageTrie := state.StorageTrie(address)
storageHash := types.EmptyRootHash
codeHash := state.GetCodeHash(address)
storageProof := make([]StorageResult, len(storageKeys))
// if we have a storageTrie, (which means the account exists), we can update the storagehash
if storageTrie != nil {
storageHash = storageTrie.Hash()
} else {
// no storageTrie means the account does not exist, so the codeHash is the hash of an empty bytearray.
codeHash = crypto.Keccak256Hash(nil)
}
// create the proof for the storageKeys
for i, key := range storageKeys {
if storageTrie != nil {
proof, storageError := state.GetStorageProof(address, common.HexToHash(key))
if storageError != nil {
err = storageError
return
}
storageProof[i] = StorageResult{key, (*hexutil.Big)(state.GetState(address, common.HexToHash(key)).Big()), toHexSlice(proof)}
} else {
storageProof[i] = StorageResult{key, &hexutil.Big{}, []string{}}
}
}
// create the accountProof
accountProof, err := state.GetProof(address)
if err != nil {
return
}
ret, err = &AccountResult{
Address: address,
AccountProof: toHexSlice(accountProof),
Balance: (*hexutil.Big)(state.GetBalance(address)),
CodeHash: codeHash,
Nonce: hexutil.Uint64(state.GetNonce(address)),
StorageHash: storageHash,
StorageProof: storageProof,
}, state.Error()
return
}
// toHexSlice creates a slice of hex-strings based on []byte.
func toHexSlice(b [][]byte) []string {
r := make([]string, len(b))
for i := range b {
r[i] = hexutil.Encode(b[i])
}
return r
}
// GetHeaderByNumberRLPHex returns block header at given number by `hex(rlp(header))`
func (s *PublicBlockchainService) GetHeaderByNumberRLPHex(
ctx context.Context, blockNumber BlockNumber,
) (string, error) {
timer := DoMetricRPCRequest(GetHeaderByNumberRLPHex)
defer DoRPCRequestDuration(GetHeaderByNumberRLPHex, timer)
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetHeaderByNumberRLPHex, FailedNumber)
return "", err
}
// Process number based on version
blockNum := blockNumber.EthBlockNumber()
// Ensure valid block number
if s.version != Eth && isBlockGreaterThanLatest(s.hmy, blockNum) {
DoMetricRPCQueryInfo(GetHeaderByNumberRLPHex, FailedNumber)
return "", ErrRequestedBlockTooHigh
}
// Fetch Header
header, err := s.hmy.HeaderByNumber(ctx, blockNum)
if header != nil && err == nil {
// Response output is the same for all versions
val, _ := rlp.EncodeToBytes(header)
return hex.EncodeToString(val), nil
}
return "", err
}
// GetCurrentUtilityMetrics .. // GetCurrentUtilityMetrics ..
func (s *PublicBlockchainService) GetCurrentUtilityMetrics( func (s *PublicBlockchainService) GetCurrentUtilityMetrics(
ctx context.Context, ctx context.Context,
@ -670,7 +822,7 @@ func (s *PublicBlockchainService) GetCurrentUtilityMetrics(
timer := DoMetricRPCRequest(GetCurrentUtilityMetrics) timer := DoMetricRPCRequest(GetCurrentUtilityMetrics)
defer DoRPCRequestDuration(GetCurrentUtilityMetrics, timer) defer DoRPCRequestDuration(GetCurrentUtilityMetrics, timer)
err := s.wait(ctx) err := s.wait(s.limiterGetCurrentUtilityMetrics, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetCurrentUtilityMetrics, FailedNumber) DoMetricRPCQueryInfo(GetCurrentUtilityMetrics, FailedNumber)
return nil, err return nil, err
@ -699,7 +851,7 @@ func (s *PublicBlockchainService) GetSuperCommittees(
timer := DoMetricRPCRequest(GetSuperCommittees) timer := DoMetricRPCRequest(GetSuperCommittees)
defer DoRPCRequestDuration(GetSuperCommittees, timer) defer DoRPCRequestDuration(GetSuperCommittees, timer)
err := s.wait(ctx) err := s.wait(s.limiterGetSuperCommittees, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetSuperCommittees, FailedNumber) DoMetricRPCQueryInfo(GetSuperCommittees, FailedNumber)
return nil, err return nil, err
@ -728,7 +880,7 @@ func (s *PublicBlockchainService) GetCurrentBadBlocks(
timer := DoMetricRPCRequest(GetCurrentBadBlocks) timer := DoMetricRPCRequest(GetCurrentBadBlocks)
defer DoRPCRequestDuration(GetCurrentBadBlocks, timer) defer DoRPCRequestDuration(GetCurrentBadBlocks, timer)
err := s.wait(ctx) err := s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetCurrentBadBlocks, FailedNumber) DoMetricRPCQueryInfo(GetCurrentBadBlocks, FailedNumber)
return nil, err return nil, err
@ -770,7 +922,7 @@ func (s *PublicBlockchainService) GetStakingNetworkInfo(
timer := DoMetricRPCRequest(GetStakingNetworkInfo) timer := DoMetricRPCRequest(GetStakingNetworkInfo)
defer DoRPCRequestDuration(GetStakingNetworkInfo, timer) defer DoRPCRequestDuration(GetStakingNetworkInfo, timer)
err := s.wait(ctx) err := s.wait(s.limiterGetStakingNetworkInfo, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetStakingNetworkInfo, FailedNumber) DoMetricRPCQueryInfo(GetStakingNetworkInfo, FailedNumber)
return nil, err return nil, err

@ -19,6 +19,8 @@ const (
GetLatestChainHeaders = "GetLatestChainHeaders" GetLatestChainHeaders = "GetLatestChainHeaders"
GetLastCrossLinks = "GetLastCrossLinks" GetLastCrossLinks = "GetLastCrossLinks"
GetHeaderByNumber = "GetHeaderByNumber" GetHeaderByNumber = "GetHeaderByNumber"
GetHeaderByNumberRLPHex = "GetHeaderByNumberRLPHex"
GetProof = "GetProof"
GetCurrentUtilityMetrics = "GetCurrentUtilityMetrics" GetCurrentUtilityMetrics = "GetCurrentUtilityMetrics"
GetSuperCommittees = "GetSuperCommittees" GetSuperCommittees = "GetSuperCommittees"
GetCurrentBadBlocks = "GetCurrentBadBlocks" GetCurrentBadBlocks = "GetCurrentBadBlocks"

@ -2,7 +2,12 @@ package rpc
import ( import (
"context" "context"
"fmt"
"math/big" "math/big"
"reflect"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
@ -29,6 +34,9 @@ type PublicStakingService struct {
version Version version Version
validatorInfoCache *lru.Cache // cache for detailed validator information per page and block validatorInfoCache *lru.Cache // cache for detailed validator information per page and block
// TEMP SOLUTION to rpc node spamming issue
limiterGetAllValidatorInformation *rate.Limiter
limiterGetAllDelegationInformation *rate.Limiter
} }
// NewPublicStakingAPI creates a new API for the RPC interface // NewPublicStakingAPI creates a new API for the RPC interface
@ -41,11 +49,31 @@ func NewPublicStakingAPI(hmy *hmy.Harmony, version Version) rpc.API {
hmy: hmy, hmy: hmy,
version: version, version: version,
validatorInfoCache: viCache, validatorInfoCache: viCache,
limiterGetAllValidatorInformation: rate.NewLimiter(1, 3),
limiterGetAllDelegationInformation: rate.NewLimiter(1, 3),
}, },
Public: true, Public: true,
} }
} }
func (s *PublicStakingService) wait(limiter *rate.Limiter, ctx context.Context) error {
if limiter != nil {
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel()
if !limiter.Allow() {
strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{
name: strLimit,
}).Inc()
}
return limiter.Wait(deadlineCtx)
}
return nil
}
// getBalanceByBlockNumber returns balance by block number at given eth blockNum without checks // getBalanceByBlockNumber returns balance by block number at given eth blockNum without checks
func (s *PublicStakingService) getBalanceByBlockNumber( func (s *PublicStakingService) getBalanceByBlockNumber(
ctx context.Context, address string, blockNum rpc.BlockNumber, ctx context.Context, address string, blockNum rpc.BlockNumber,
@ -208,6 +236,12 @@ func (s *PublicStakingService) GetAllValidatorInformation(
timer := DoMetricRPCRequest(GetAllValidatorInformation) timer := DoMetricRPCRequest(GetAllValidatorInformation)
defer DoRPCRequestDuration(GetAllValidatorInformation, timer) defer DoRPCRequestDuration(GetAllValidatorInformation, timer)
err := s.wait(s.limiterGetAllValidatorInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber)
return nil, err
}
if !isBeaconShard(s.hmy) { if !isBeaconShard(s.hmy) {
DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber) DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber)
return nil, ErrNotBeaconShard return nil, ErrNotBeaconShard
@ -231,6 +265,12 @@ func (s *PublicStakingService) GetAllValidatorInformationByBlockNumber(
timer := DoMetricRPCRequest(GetAllValidatorInformationByBlockNumber) timer := DoMetricRPCRequest(GetAllValidatorInformationByBlockNumber)
defer DoRPCRequestDuration(GetAllValidatorInformationByBlockNumber, timer) defer DoRPCRequestDuration(GetAllValidatorInformationByBlockNumber, timer)
err := s.wait(s.limiterGetAllValidatorInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllValidatorInformationByBlockNumber, FailedNumber)
return nil, err
}
// Process number based on version // Process number based on version
blockNum := blockNumber.EthBlockNumber() blockNum := blockNumber.EthBlockNumber()
@ -466,6 +506,12 @@ func (s *PublicStakingService) GetAllDelegationInformation(
timer := DoMetricRPCRequest(GetAllDelegationInformation) timer := DoMetricRPCRequest(GetAllDelegationInformation)
defer DoRPCRequestDuration(GetAllDelegationInformation, timer) defer DoRPCRequestDuration(GetAllDelegationInformation, timer)
err := s.wait(s.limiterGetAllDelegationInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber)
return nil, err
}
if !isBeaconShard(s.hmy) { if !isBeaconShard(s.hmy) {
DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber) DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber)
return nil, ErrNotBeaconShard return nil, ErrNotBeaconShard
@ -495,7 +541,6 @@ func (s *PublicStakingService) GetAllDelegationInformation(
// Fetch all delegations // Fetch all delegations
validators := make([][]StructuredResponse, validatorsNum) validators := make([][]StructuredResponse, validatorsNum)
var err error
for i := start; i < start+validatorsNum; i++ { for i := start; i < start+validatorsNum; i++ {
validators[i-start], err = s.GetDelegationsByValidator(ctx, addresses[i].String()) validators[i-start], err = s.GetDelegationsByValidator(ctx, addresses[i].String())
if err != nil { if err != nil {

Loading…
Cancel
Save