Add rate limit on costly rpcs

pull/3927/head
Rongjian Lan 3 years ago
parent 42c88b552e
commit 42e1476e49
  1. 49
      rpc/blockchain.go
  2. 53
      rpc/staking.go

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"reflect"
"time"
"github.com/ethereum/go-ethereum/common"
@ -38,6 +39,10 @@ type PublicBlockchainService struct {
limiter *rate.Limiter
rpcBlockFactory rpc_common.BlockFactory
helper *bcServiceHelper
// TEMP SOLUTION to rpc node spamming issue
limiterGetStakingNetworkInfo *rate.Limiter
limiterGetSuperCommittees *rate.Limiter
limiterGetCurrentUtilityMetrics *rate.Limiter
}
const (
@ -49,7 +54,7 @@ const (
func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API {
var limiter *rate.Limiter
if limiterEnable {
limiter := rate.NewLimiter(rate.Limit(limit), 1)
limiter = rate.NewLimiter(rate.Limit(limit), limit)
strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
rpcRateLimitCounterVec.With(prometheus.Labels{
"rate_limit": strLimit,
@ -57,9 +62,12 @@ func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable boo
}
s := &PublicBlockchainService{
hmy: hmy,
version: version,
limiter: limiter,
hmy: hmy,
version: version,
limiter: limiter,
limiterGetStakingNetworkInfo: rate.NewLimiter(5, 10),
limiterGetSuperCommittees: rate.NewLimiter(5, 10),
limiterGetCurrentUtilityMetrics: rate.NewLimiter(5, 10),
}
s.helper = s.newHelper()
@ -140,18 +148,19 @@ func (s *PublicBlockchainService) BlockNumber(ctx context.Context) (interface{},
}
}
func (s *PublicBlockchainService) wait(ctx context.Context) error {
if s.limiter != nil {
func (s *PublicBlockchainService) wait(limiter *rate.Limiter, ctx context.Context) error {
if limiter != nil {
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel()
if !s.limiter.Allow() {
strLimit := fmt.Sprintf("%d", int64(s.limiter.Limit()))
if !limiter.Allow() {
strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{
"rate_limit": strLimit,
name: strLimit,
}).Inc()
}
return s.limiter.Wait(deadlineCtx)
return limiter.Wait(deadlineCtx)
}
return nil
}
@ -165,7 +174,7 @@ func (s *PublicBlockchainService) GetBlockByNumber(
timer := DoMetricRPCRequest(GetBlockByNumber)
defer DoRPCRequestDuration(GetBlockByNumber, timer)
err = s.wait(ctx)
err = s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetBlockByNumber, FailedNumber)
return nil, err
@ -218,7 +227,7 @@ func (s *PublicBlockchainService) GetBlockByHash(
timer := DoMetricRPCRequest(GetBlockByHash)
defer DoRPCRequestDuration(GetBlockByHash, timer)
err = s.wait(ctx)
err = s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetBlockByHash, FailedNumber)
return nil, err
@ -508,7 +517,7 @@ func (s *PublicBlockchainService) GetShardingStructure(
timer := DoMetricRPCRequest(GetShardingStructure)
defer DoRPCRequestDuration(GetShardingStructure, timer)
err := s.wait(ctx)
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetShardingStructure, FailedNumber)
return nil, err
@ -565,7 +574,7 @@ func (s *PublicBlockchainService) LatestHeader(ctx context.Context) (StructuredR
timer := DoMetricRPCRequest(LatestHeader)
defer DoRPCRequestDuration(LatestHeader, timer)
err := s.wait(ctx)
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(LatestHeader, FailedNumber)
return nil, err
@ -600,7 +609,7 @@ func (s *PublicBlockchainService) GetLastCrossLinks(
timer := DoMetricRPCRequest(GetLastCrossLinks)
defer DoRPCRequestDuration(GetLastCrossLinks, timer)
err := s.wait(ctx)
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetLastCrossLinks, FailedNumber)
return nil, err
@ -638,7 +647,7 @@ func (s *PublicBlockchainService) GetHeaderByNumber(
timer := DoMetricRPCRequest(GetHeaderByNumber)
defer DoRPCRequestDuration(GetHeaderByNumber, timer)
err := s.wait(ctx)
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetHeaderByNumber, FailedNumber)
return nil, err
@ -670,7 +679,7 @@ func (s *PublicBlockchainService) GetCurrentUtilityMetrics(
timer := DoMetricRPCRequest(GetCurrentUtilityMetrics)
defer DoRPCRequestDuration(GetCurrentUtilityMetrics, timer)
err := s.wait(ctx)
err := s.wait(s.limiterGetCurrentUtilityMetrics, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetCurrentUtilityMetrics, FailedNumber)
return nil, err
@ -699,7 +708,7 @@ func (s *PublicBlockchainService) GetSuperCommittees(
timer := DoMetricRPCRequest(GetSuperCommittees)
defer DoRPCRequestDuration(GetSuperCommittees, timer)
err := s.wait(ctx)
err := s.wait(s.limiterGetSuperCommittees, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetSuperCommittees, FailedNumber)
return nil, err
@ -728,7 +737,7 @@ func (s *PublicBlockchainService) GetCurrentBadBlocks(
timer := DoMetricRPCRequest(GetCurrentBadBlocks)
defer DoRPCRequestDuration(GetCurrentBadBlocks, timer)
err := s.wait(ctx)
err := s.wait(s.limiter, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetCurrentBadBlocks, FailedNumber)
return nil, err
@ -770,7 +779,7 @@ func (s *PublicBlockchainService) GetStakingNetworkInfo(
timer := DoMetricRPCRequest(GetStakingNetworkInfo)
defer DoRPCRequestDuration(GetStakingNetworkInfo, timer)
err := s.wait(ctx)
err := s.wait(s.limiterGetStakingNetworkInfo, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetStakingNetworkInfo, FailedNumber)
return nil, err

@ -2,7 +2,12 @@ package rpc
import (
"context"
"fmt"
"math/big"
"reflect"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
lru "github.com/hashicorp/golang-lru"
@ -29,6 +34,9 @@ type PublicStakingService struct {
version Version
validatorInfoCache *lru.Cache // cache for detailed validator information per page and block
// TEMP SOLUTION to rpc node spamming issue
limiterGetAllValidatorInformation *rate.Limiter
limiterGetAllDelegationInformation *rate.Limiter
}
// NewPublicStakingAPI creates a new API for the RPC interface
@ -38,14 +46,34 @@ func NewPublicStakingAPI(hmy *hmy.Harmony, version Version) rpc.API {
Namespace: version.Namespace(),
Version: APIVersion,
Service: &PublicStakingService{
hmy: hmy,
version: version,
validatorInfoCache: viCache,
hmy: hmy,
version: version,
validatorInfoCache: viCache,
limiterGetAllValidatorInformation: rate.NewLimiter(1, 3),
limiterGetAllDelegationInformation: rate.NewLimiter(1, 3),
},
Public: true,
}
}
func (s *PublicStakingService) wait(limiter *rate.Limiter, ctx context.Context) error {
if limiter != nil {
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel()
if !limiter.Allow() {
strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{
name: strLimit,
}).Inc()
}
return limiter.Wait(deadlineCtx)
}
return nil
}
// getBalanceByBlockNumber returns balance by block number at given eth blockNum without checks
func (s *PublicStakingService) getBalanceByBlockNumber(
ctx context.Context, address string, blockNum rpc.BlockNumber,
@ -208,6 +236,12 @@ func (s *PublicStakingService) GetAllValidatorInformation(
timer := DoMetricRPCRequest(GetAllValidatorInformation)
defer DoRPCRequestDuration(GetAllValidatorInformation, timer)
err := s.wait(s.limiterGetAllValidatorInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber)
return nil, err
}
if !isBeaconShard(s.hmy) {
DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber)
return nil, ErrNotBeaconShard
@ -231,6 +265,12 @@ func (s *PublicStakingService) GetAllValidatorInformationByBlockNumber(
timer := DoMetricRPCRequest(GetAllValidatorInformationByBlockNumber)
defer DoRPCRequestDuration(GetAllValidatorInformationByBlockNumber, timer)
err := s.wait(s.limiterGetAllValidatorInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllValidatorInformationByBlockNumber, FailedNumber)
return nil, err
}
// Process number based on version
blockNum := blockNumber.EthBlockNumber()
@ -466,6 +506,12 @@ func (s *PublicStakingService) GetAllDelegationInformation(
timer := DoMetricRPCRequest(GetAllDelegationInformation)
defer DoRPCRequestDuration(GetAllDelegationInformation, timer)
err := s.wait(s.limiterGetAllDelegationInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber)
return nil, err
}
if !isBeaconShard(s.hmy) {
DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber)
return nil, ErrNotBeaconShard
@ -495,7 +541,6 @@ func (s *PublicStakingService) GetAllDelegationInformation(
// Fetch all delegations
validators := make([][]StructuredResponse, validatorsNum)
var err error
for i := start; i < start+validatorsNum; i++ {
validators[i-start], err = s.GetDelegationsByValidator(ctx, addresses[i].String())
if err != nil {

Loading…
Cancel
Save