From 42e1476e49006f15de010399020659a5b205ff3e Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Thu, 11 Nov 2021 17:37:06 -0800 Subject: [PATCH] Add rate limit on costly rpcs --- rpc/blockchain.go | 49 +++++++++++++++++++++++++------------------ rpc/staking.go | 53 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 78 insertions(+), 24 deletions(-) diff --git a/rpc/blockchain.go b/rpc/blockchain.go index eb9d5cbe2..7827726e3 100644 --- a/rpc/blockchain.go +++ b/rpc/blockchain.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "reflect" "time" "github.com/ethereum/go-ethereum/common" @@ -38,6 +39,10 @@ type PublicBlockchainService struct { limiter *rate.Limiter rpcBlockFactory rpc_common.BlockFactory helper *bcServiceHelper + // TEMP SOLUTION to rpc node spamming issue + limiterGetStakingNetworkInfo *rate.Limiter + limiterGetSuperCommittees *rate.Limiter + limiterGetCurrentUtilityMetrics *rate.Limiter } const ( @@ -49,7 +54,7 @@ const ( func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API { var limiter *rate.Limiter if limiterEnable { - limiter := rate.NewLimiter(rate.Limit(limit), 1) + limiter = rate.NewLimiter(rate.Limit(limit), limit) strLimit := fmt.Sprintf("%d", int64(limiter.Limit())) rpcRateLimitCounterVec.With(prometheus.Labels{ "rate_limit": strLimit, @@ -57,9 +62,12 @@ func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable boo } s := &PublicBlockchainService{ - hmy: hmy, - version: version, - limiter: limiter, + hmy: hmy, + version: version, + limiter: limiter, + limiterGetStakingNetworkInfo: rate.NewLimiter(5, 10), + limiterGetSuperCommittees: rate.NewLimiter(5, 10), + limiterGetCurrentUtilityMetrics: rate.NewLimiter(5, 10), } s.helper = s.newHelper() @@ -140,18 +148,19 @@ func (s *PublicBlockchainService) BlockNumber(ctx context.Context) (interface{}, } } -func (s *PublicBlockchainService) wait(ctx context.Context) error { - if s.limiter != nil { +func (s *PublicBlockchainService) wait(limiter *rate.Limiter, ctx context.Context) error { + if limiter != nil { deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout) defer cancel() - if !s.limiter.Allow() { - strLimit := fmt.Sprintf("%d", int64(s.limiter.Limit())) + if !limiter.Allow() { + strLimit := fmt.Sprintf("%d", int64(limiter.Limit())) + name := reflect.TypeOf(limiter).Elem().Name() rpcRateLimitCounterVec.With(prometheus.Labels{ - "rate_limit": strLimit, + name: strLimit, }).Inc() } - return s.limiter.Wait(deadlineCtx) + return limiter.Wait(deadlineCtx) } return nil } @@ -165,7 +174,7 @@ func (s *PublicBlockchainService) GetBlockByNumber( timer := DoMetricRPCRequest(GetBlockByNumber) defer DoRPCRequestDuration(GetBlockByNumber, timer) - err = s.wait(ctx) + err = s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetBlockByNumber, FailedNumber) return nil, err @@ -218,7 +227,7 @@ func (s *PublicBlockchainService) GetBlockByHash( timer := DoMetricRPCRequest(GetBlockByHash) defer DoRPCRequestDuration(GetBlockByHash, timer) - err = s.wait(ctx) + err = s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetBlockByHash, FailedNumber) return nil, err @@ -508,7 +517,7 @@ func (s *PublicBlockchainService) GetShardingStructure( timer := DoMetricRPCRequest(GetShardingStructure) defer DoRPCRequestDuration(GetShardingStructure, timer) - err := s.wait(ctx) + err := s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetShardingStructure, FailedNumber) return nil, err @@ -565,7 +574,7 @@ func (s *PublicBlockchainService) LatestHeader(ctx context.Context) (StructuredR timer := DoMetricRPCRequest(LatestHeader) defer DoRPCRequestDuration(LatestHeader, timer) - err := s.wait(ctx) + err := s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(LatestHeader, FailedNumber) return nil, err @@ -600,7 +609,7 @@ func (s *PublicBlockchainService) GetLastCrossLinks( timer := DoMetricRPCRequest(GetLastCrossLinks) defer DoRPCRequestDuration(GetLastCrossLinks, timer) - err := s.wait(ctx) + err := s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetLastCrossLinks, FailedNumber) return nil, err @@ -638,7 +647,7 @@ func (s *PublicBlockchainService) GetHeaderByNumber( timer := DoMetricRPCRequest(GetHeaderByNumber) defer DoRPCRequestDuration(GetHeaderByNumber, timer) - err := s.wait(ctx) + err := s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetHeaderByNumber, FailedNumber) return nil, err @@ -670,7 +679,7 @@ func (s *PublicBlockchainService) GetCurrentUtilityMetrics( timer := DoMetricRPCRequest(GetCurrentUtilityMetrics) defer DoRPCRequestDuration(GetCurrentUtilityMetrics, timer) - err := s.wait(ctx) + err := s.wait(s.limiterGetCurrentUtilityMetrics, ctx) if err != nil { DoMetricRPCQueryInfo(GetCurrentUtilityMetrics, FailedNumber) return nil, err @@ -699,7 +708,7 @@ func (s *PublicBlockchainService) GetSuperCommittees( timer := DoMetricRPCRequest(GetSuperCommittees) defer DoRPCRequestDuration(GetSuperCommittees, timer) - err := s.wait(ctx) + err := s.wait(s.limiterGetSuperCommittees, ctx) if err != nil { DoMetricRPCQueryInfo(GetSuperCommittees, FailedNumber) return nil, err @@ -728,7 +737,7 @@ func (s *PublicBlockchainService) GetCurrentBadBlocks( timer := DoMetricRPCRequest(GetCurrentBadBlocks) defer DoRPCRequestDuration(GetCurrentBadBlocks, timer) - err := s.wait(ctx) + err := s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetCurrentBadBlocks, FailedNumber) return nil, err @@ -770,7 +779,7 @@ func (s *PublicBlockchainService) GetStakingNetworkInfo( timer := DoMetricRPCRequest(GetStakingNetworkInfo) defer DoRPCRequestDuration(GetStakingNetworkInfo, timer) - err := s.wait(ctx) + err := s.wait(s.limiterGetStakingNetworkInfo, ctx) if err != nil { DoMetricRPCQueryInfo(GetStakingNetworkInfo, FailedNumber) return nil, err diff --git a/rpc/staking.go b/rpc/staking.go index 0be522a4a..41aae8fc4 100644 --- a/rpc/staking.go +++ b/rpc/staking.go @@ -2,7 +2,12 @@ package rpc import ( "context" + "fmt" "math/big" + "reflect" + + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" lru "github.com/hashicorp/golang-lru" @@ -29,6 +34,9 @@ type PublicStakingService struct { version Version validatorInfoCache *lru.Cache // cache for detailed validator information per page and block + // TEMP SOLUTION to rpc node spamming issue + limiterGetAllValidatorInformation *rate.Limiter + limiterGetAllDelegationInformation *rate.Limiter } // NewPublicStakingAPI creates a new API for the RPC interface @@ -38,14 +46,34 @@ func NewPublicStakingAPI(hmy *hmy.Harmony, version Version) rpc.API { Namespace: version.Namespace(), Version: APIVersion, Service: &PublicStakingService{ - hmy: hmy, - version: version, - validatorInfoCache: viCache, + hmy: hmy, + version: version, + validatorInfoCache: viCache, + limiterGetAllValidatorInformation: rate.NewLimiter(1, 3), + limiterGetAllDelegationInformation: rate.NewLimiter(1, 3), }, Public: true, } } +func (s *PublicStakingService) wait(limiter *rate.Limiter, ctx context.Context) error { + if limiter != nil { + deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout) + defer cancel() + if !limiter.Allow() { + strLimit := fmt.Sprintf("%d", int64(limiter.Limit())) + + name := reflect.TypeOf(limiter).Elem().Name() + rpcRateLimitCounterVec.With(prometheus.Labels{ + name: strLimit, + }).Inc() + } + + return limiter.Wait(deadlineCtx) + } + return nil +} + // getBalanceByBlockNumber returns balance by block number at given eth blockNum without checks func (s *PublicStakingService) getBalanceByBlockNumber( ctx context.Context, address string, blockNum rpc.BlockNumber, @@ -208,6 +236,12 @@ func (s *PublicStakingService) GetAllValidatorInformation( timer := DoMetricRPCRequest(GetAllValidatorInformation) defer DoRPCRequestDuration(GetAllValidatorInformation, timer) + err := s.wait(s.limiterGetAllValidatorInformation, ctx) + if err != nil { + DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber) + return nil, err + } + if !isBeaconShard(s.hmy) { DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber) return nil, ErrNotBeaconShard @@ -231,6 +265,12 @@ func (s *PublicStakingService) GetAllValidatorInformationByBlockNumber( timer := DoMetricRPCRequest(GetAllValidatorInformationByBlockNumber) defer DoRPCRequestDuration(GetAllValidatorInformationByBlockNumber, timer) + err := s.wait(s.limiterGetAllValidatorInformation, ctx) + if err != nil { + DoMetricRPCQueryInfo(GetAllValidatorInformationByBlockNumber, FailedNumber) + return nil, err + } + // Process number based on version blockNum := blockNumber.EthBlockNumber() @@ -466,6 +506,12 @@ func (s *PublicStakingService) GetAllDelegationInformation( timer := DoMetricRPCRequest(GetAllDelegationInformation) defer DoRPCRequestDuration(GetAllDelegationInformation, timer) + err := s.wait(s.limiterGetAllDelegationInformation, ctx) + if err != nil { + DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber) + return nil, err + } + if !isBeaconShard(s.hmy) { DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber) return nil, ErrNotBeaconShard @@ -495,7 +541,6 @@ func (s *PublicStakingService) GetAllDelegationInformation( // Fetch all delegations validators := make([][]StructuredResponse, validatorsNum) - var err error for i := start; i < start+validatorsNum; i++ { validators[i-start], err = s.GetDelegationsByValidator(ctx, addresses[i].String()) if err != nil {