Fix rate limit metrics; add rate limit on Call method

pull/3971/head
Rongjian Lan 3 years ago
parent bd9e27a6ce
commit 5ac1f4c684
  1. 32
      rpc/blockchain.go
  2. 30
      rpc/contract.go
  3. 8
      rpc/metrics.go
  4. 6
      rpc/pool.go
  5. 13
      rpc/staking.go

@ -50,7 +50,7 @@ type PublicBlockchainService struct {
}
const (
DefaultRateLimiterWaitTimeout = 5 * time.Second
DefaultRateLimiterWaitTimeout = 2 * time.Second
rpcGetBlocksLimit = 1024
)
@ -59,9 +59,9 @@ func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable boo
var limiter *rate.Limiter
if limiterEnable {
limiter = rate.NewLimiter(rate.Limit(limit), limit)
strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{
"rate_limit": strLimit,
"limiter_name": name,
}).Add(float64(0))
}
@ -157,10 +157,9 @@ func (s *PublicBlockchainService) wait(limiter *rate.Limiter, ctx context.Contex
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel()
if !limiter.Allow() {
strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{
name: strLimit,
"limiter_name": name,
}).Inc()
}
@ -180,7 +179,7 @@ func (s *PublicBlockchainService) GetBlockByNumber(
err = s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetBlockByNumber, FailedNumber)
DoMetricRPCQueryInfo(GetBlockByNumber, RateLimitedNumber)
return nil, err
}
@ -233,7 +232,7 @@ func (s *PublicBlockchainService) GetBlockByHash(
err = s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetBlockByHash, FailedNumber)
DoMetricRPCQueryInfo(GetBlockByHash, RateLimitedNumber)
return nil, err
}
@ -523,7 +522,7 @@ func (s *PublicBlockchainService) GetShardingStructure(
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetShardingStructure, FailedNumber)
DoMetricRPCQueryInfo(GetShardingStructure, RateLimitedNumber)
return nil, err
}
@ -580,7 +579,7 @@ func (s *PublicBlockchainService) LatestHeader(ctx context.Context) (StructuredR
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(LatestHeader, FailedNumber)
DoMetricRPCQueryInfo(LatestHeader, RateLimitedNumber)
return nil, err
}
@ -615,7 +614,7 @@ func (s *PublicBlockchainService) GetLastCrossLinks(
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetLastCrossLinks, FailedNumber)
DoMetricRPCQueryInfo(GetLastCrossLinks, RateLimitedNumber)
return nil, err
}
@ -653,7 +652,7 @@ func (s *PublicBlockchainService) GetHeaderByNumber(
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetHeaderByNumber, FailedNumber)
DoMetricRPCQueryInfo(GetHeaderByNumber, RateLimitedNumber)
return nil, err
}
@ -707,6 +706,7 @@ func (s *PublicBlockchainService) GetProof(
err = s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetProof, RateLimitedNumber)
return
}
@ -792,7 +792,7 @@ func (s *PublicBlockchainService) GetHeaderByNumberRLPHex(
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetHeaderByNumberRLPHex, FailedNumber)
DoMetricRPCQueryInfo(GetHeaderByNumberRLPHex, RateLimitedNumber)
return "", err
}
@ -824,7 +824,7 @@ func (s *PublicBlockchainService) GetCurrentUtilityMetrics(
err := s.wait(s.limiterGetCurrentUtilityMetrics, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetCurrentUtilityMetrics, FailedNumber)
DoMetricRPCQueryInfo(GetCurrentUtilityMetrics, RateLimitedNumber)
return nil, err
}
@ -853,7 +853,7 @@ func (s *PublicBlockchainService) GetSuperCommittees(
err := s.wait(s.limiterGetSuperCommittees, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetSuperCommittees, FailedNumber)
DoMetricRPCQueryInfo(GetSuperCommittees, RateLimitedNumber)
return nil, err
}
@ -882,7 +882,7 @@ func (s *PublicBlockchainService) GetCurrentBadBlocks(
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetCurrentBadBlocks, FailedNumber)
DoMetricRPCQueryInfo(GetCurrentBadBlocks, RateLimitedNumber)
return nil, err
}
@ -924,7 +924,7 @@ func (s *PublicBlockchainService) GetStakingNetworkInfo(
err := s.wait(s.limiterGetStakingNetworkInfo, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetStakingNetworkInfo, FailedNumber)
DoMetricRPCQueryInfo(GetStakingNetworkInfo, RateLimitedNumber)
return nil, err
}

@ -4,8 +4,12 @@ import (
"context"
"fmt"
"math"
"reflect"
"time"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
@ -26,6 +30,8 @@ const (
type PublicContractService struct {
hmy *hmy.Harmony
version Version
// TEMP SOLUTION to rpc node spamming issue
limiterCall *rate.Limiter
}
// NewPublicContractAPI creates a new API for the RPC interface
@ -33,11 +39,27 @@ func NewPublicContractAPI(hmy *hmy.Harmony, version Version) rpc.API {
return rpc.API{
Namespace: version.Namespace(),
Version: APIVersion,
Service: &PublicContractService{hmy, version},
Service: &PublicContractService{hmy, version, rate.NewLimiter(100, 1000)},
Public: true,
}
}
func (s *PublicContractService) wait(limiter *rate.Limiter, ctx context.Context) error {
if limiter != nil {
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel()
if !limiter.Allow() {
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{
"limiter_name": name,
}).Inc()
}
return limiter.Wait(deadlineCtx)
}
return nil
}
// Call executes the given transaction on the state for the given block number.
// It doesn't make and changes in the state/blockchain and is useful to execute and retrieve values.
func (s *PublicContractService) Call(
@ -46,6 +68,12 @@ func (s *PublicContractService) Call(
// Process number based on version
blockNum := blockNumber.EthBlockNumber()
err := s.wait(s.limiterCall, ctx)
if err != nil {
DoMetricRPCQueryInfo(Call, RateLimitedNumber)
return nil, err
}
// Execute call
result, err := DoEVMCall(ctx, s.hmy, args, blockNum, CallTimeout)
if err != nil {

@ -29,6 +29,7 @@ const (
// contract
GetCode = "GetCode"
GetStorageAt = "GetStorageAt"
Call = "Call"
DoEvmCall = "DoEVMCall"
// net
@ -75,8 +76,9 @@ const (
// info type const
const (
QueryNumber = "query_number"
FailedNumber = "failed_number"
QueryNumber = "query_number"
FailedNumber = "failed_number"
RateLimitedNumber = "rate_limited_number"
)
func init() {
@ -96,7 +98,7 @@ var (
Name: "over_ratelimit",
Help: "number of times triggered rpc rate limit",
},
[]string{"rate_limit"},
[]string{"limiter_name"},
)
rpcQueryInfoCounterVec = prometheus.NewCounterVec(

@ -2,7 +2,6 @@ package rpc
import (
"context"
"fmt"
"reflect"
"github.com/prometheus/client_golang/prometheus"
@ -52,10 +51,9 @@ func (s *PublicPoolService) wait(limiter *rate.Limiter, ctx context.Context) err
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel()
if !limiter.Allow() {
strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{
name: strLimit,
"limiter_name": name,
}).Inc()
}
@ -217,7 +215,7 @@ func (s *PublicPoolService) PendingTransactions(
err := s.wait(s.limiterPendingTransactions, ctx)
if err != nil {
DoMetricRPCQueryInfo(PendingTransactions, FailedNumber)
DoMetricRPCQueryInfo(PendingTransactions, RateLimitedNumber)
return nil, err
}

@ -2,7 +2,6 @@ package rpc
import (
"context"
"fmt"
"math/big"
"reflect"
@ -63,11 +62,9 @@ func (s *PublicStakingService) wait(limiter *rate.Limiter, ctx context.Context)
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel()
if !limiter.Allow() {
strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{
name: strLimit,
"limiter_name": name,
}).Inc()
}
@ -240,7 +237,7 @@ func (s *PublicStakingService) GetAllValidatorInformation(
err := s.wait(s.limiterGetAllValidatorInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber)
DoMetricRPCQueryInfo(GetAllValidatorInformation, RateLimitedNumber)
return nil, err
}
@ -269,7 +266,7 @@ func (s *PublicStakingService) GetAllValidatorInformationByBlockNumber(
err := s.wait(s.limiterGetAllValidatorInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllValidatorInformationByBlockNumber, FailedNumber)
DoMetricRPCQueryInfo(GetAllValidatorInformationByBlockNumber, RateLimitedNumber)
return nil, err
}
@ -510,7 +507,7 @@ func (s *PublicStakingService) GetAllDelegationInformation(
err := s.wait(s.limiterGetAllDelegationInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber)
DoMetricRPCQueryInfo(GetAllDelegationInformation, RateLimitedNumber)
return nil, err
}
@ -691,7 +688,7 @@ func (s *PublicStakingService) GetDelegationsByValidator(
err := s.wait(s.limiterGetDelegationsByValidator, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetDelegationsByValidator, FailedNumber)
DoMetricRPCQueryInfo(GetDelegationsByValidator, RateLimitedNumber)
return nil, err
}
return s.getDelegationByValidatorHelper(address)

Loading…
Cancel
Save