Merge pull request #3927 from rlan35/rate_limiter

Add individual rate limit on costly rpcs
pull/3968/head
Soph 3 years ago committed by GitHub
commit 231729f116
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 49
      rpc/blockchain.go
  2. 53
      rpc/staking.go

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"math/big" "math/big"
"reflect"
"time" "time"
"encoding/hex" "encoding/hex"
@ -42,6 +43,10 @@ type PublicBlockchainService struct {
limiter *rate.Limiter limiter *rate.Limiter
rpcBlockFactory rpc_common.BlockFactory rpcBlockFactory rpc_common.BlockFactory
helper *bcServiceHelper helper *bcServiceHelper
// TEMP SOLUTION to rpc node spamming issue
limiterGetStakingNetworkInfo *rate.Limiter
limiterGetSuperCommittees *rate.Limiter
limiterGetCurrentUtilityMetrics *rate.Limiter
} }
const ( const (
@ -53,7 +58,7 @@ const (
func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API { func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API {
var limiter *rate.Limiter var limiter *rate.Limiter
if limiterEnable { if limiterEnable {
limiter := rate.NewLimiter(rate.Limit(limit), 1) limiter = rate.NewLimiter(rate.Limit(limit), limit)
strLimit := fmt.Sprintf("%d", int64(limiter.Limit())) strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
rpcRateLimitCounterVec.With(prometheus.Labels{ rpcRateLimitCounterVec.With(prometheus.Labels{
"rate_limit": strLimit, "rate_limit": strLimit,
@ -61,9 +66,12 @@ func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable boo
} }
s := &PublicBlockchainService{ s := &PublicBlockchainService{
hmy: hmy, hmy: hmy,
version: version, version: version,
limiter: limiter, limiter: limiter,
limiterGetStakingNetworkInfo: rate.NewLimiter(5, 10),
limiterGetSuperCommittees: rate.NewLimiter(5, 10),
limiterGetCurrentUtilityMetrics: rate.NewLimiter(5, 10),
} }
s.helper = s.newHelper() s.helper = s.newHelper()
@ -144,18 +152,19 @@ func (s *PublicBlockchainService) BlockNumber(ctx context.Context) (interface{},
} }
} }
func (s *PublicBlockchainService) wait(ctx context.Context) error { func (s *PublicBlockchainService) wait(limiter *rate.Limiter, ctx context.Context) error {
if s.limiter != nil { if limiter != nil {
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout) deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel() defer cancel()
if !s.limiter.Allow() { if !limiter.Allow() {
strLimit := fmt.Sprintf("%d", int64(s.limiter.Limit())) strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{ rpcRateLimitCounterVec.With(prometheus.Labels{
"rate_limit": strLimit, name: strLimit,
}).Inc() }).Inc()
} }
return s.limiter.Wait(deadlineCtx) return limiter.Wait(deadlineCtx)
} }
return nil return nil
} }
@ -169,7 +178,7 @@ func (s *PublicBlockchainService) GetBlockByNumber(
timer := DoMetricRPCRequest(GetBlockByNumber) timer := DoMetricRPCRequest(GetBlockByNumber)
defer DoRPCRequestDuration(GetBlockByNumber, timer) defer DoRPCRequestDuration(GetBlockByNumber, timer)
err = s.wait(ctx) err = s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetBlockByNumber, FailedNumber) DoMetricRPCQueryInfo(GetBlockByNumber, FailedNumber)
return nil, err return nil, err
@ -222,7 +231,7 @@ func (s *PublicBlockchainService) GetBlockByHash(
timer := DoMetricRPCRequest(GetBlockByHash) timer := DoMetricRPCRequest(GetBlockByHash)
defer DoRPCRequestDuration(GetBlockByHash, timer) defer DoRPCRequestDuration(GetBlockByHash, timer)
err = s.wait(ctx) err = s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetBlockByHash, FailedNumber) DoMetricRPCQueryInfo(GetBlockByHash, FailedNumber)
return nil, err return nil, err
@ -512,7 +521,7 @@ func (s *PublicBlockchainService) GetShardingStructure(
timer := DoMetricRPCRequest(GetShardingStructure) timer := DoMetricRPCRequest(GetShardingStructure)
defer DoRPCRequestDuration(GetShardingStructure, timer) defer DoRPCRequestDuration(GetShardingStructure, timer)
err := s.wait(ctx) err := s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetShardingStructure, FailedNumber) DoMetricRPCQueryInfo(GetShardingStructure, FailedNumber)
return nil, err return nil, err
@ -569,7 +578,7 @@ func (s *PublicBlockchainService) LatestHeader(ctx context.Context) (StructuredR
timer := DoMetricRPCRequest(LatestHeader) timer := DoMetricRPCRequest(LatestHeader)
defer DoRPCRequestDuration(LatestHeader, timer) defer DoRPCRequestDuration(LatestHeader, timer)
err := s.wait(ctx) err := s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(LatestHeader, FailedNumber) DoMetricRPCQueryInfo(LatestHeader, FailedNumber)
return nil, err return nil, err
@ -604,7 +613,7 @@ func (s *PublicBlockchainService) GetLastCrossLinks(
timer := DoMetricRPCRequest(GetLastCrossLinks) timer := DoMetricRPCRequest(GetLastCrossLinks)
defer DoRPCRequestDuration(GetLastCrossLinks, timer) defer DoRPCRequestDuration(GetLastCrossLinks, timer)
err := s.wait(ctx) err := s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetLastCrossLinks, FailedNumber) DoMetricRPCQueryInfo(GetLastCrossLinks, FailedNumber)
return nil, err return nil, err
@ -642,7 +651,7 @@ func (s *PublicBlockchainService) GetHeaderByNumber(
timer := DoMetricRPCRequest(GetHeaderByNumber) timer := DoMetricRPCRequest(GetHeaderByNumber)
defer DoRPCRequestDuration(GetHeaderByNumber, timer) defer DoRPCRequestDuration(GetHeaderByNumber, timer)
err := s.wait(ctx) err := s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetHeaderByNumber, FailedNumber) DoMetricRPCQueryInfo(GetHeaderByNumber, FailedNumber)
return nil, err return nil, err
@ -813,7 +822,7 @@ func (s *PublicBlockchainService) GetCurrentUtilityMetrics(
timer := DoMetricRPCRequest(GetCurrentUtilityMetrics) timer := DoMetricRPCRequest(GetCurrentUtilityMetrics)
defer DoRPCRequestDuration(GetCurrentUtilityMetrics, timer) defer DoRPCRequestDuration(GetCurrentUtilityMetrics, timer)
err := s.wait(ctx) err := s.wait(s.limiterGetCurrentUtilityMetrics, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetCurrentUtilityMetrics, FailedNumber) DoMetricRPCQueryInfo(GetCurrentUtilityMetrics, FailedNumber)
return nil, err return nil, err
@ -842,7 +851,7 @@ func (s *PublicBlockchainService) GetSuperCommittees(
timer := DoMetricRPCRequest(GetSuperCommittees) timer := DoMetricRPCRequest(GetSuperCommittees)
defer DoRPCRequestDuration(GetSuperCommittees, timer) defer DoRPCRequestDuration(GetSuperCommittees, timer)
err := s.wait(ctx) err := s.wait(s.limiterGetSuperCommittees, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetSuperCommittees, FailedNumber) DoMetricRPCQueryInfo(GetSuperCommittees, FailedNumber)
return nil, err return nil, err
@ -871,7 +880,7 @@ func (s *PublicBlockchainService) GetCurrentBadBlocks(
timer := DoMetricRPCRequest(GetCurrentBadBlocks) timer := DoMetricRPCRequest(GetCurrentBadBlocks)
defer DoRPCRequestDuration(GetCurrentBadBlocks, timer) defer DoRPCRequestDuration(GetCurrentBadBlocks, timer)
err := s.wait(ctx) err := s.wait(s.limiter, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetCurrentBadBlocks, FailedNumber) DoMetricRPCQueryInfo(GetCurrentBadBlocks, FailedNumber)
return nil, err return nil, err
@ -913,7 +922,7 @@ func (s *PublicBlockchainService) GetStakingNetworkInfo(
timer := DoMetricRPCRequest(GetStakingNetworkInfo) timer := DoMetricRPCRequest(GetStakingNetworkInfo)
defer DoRPCRequestDuration(GetStakingNetworkInfo, timer) defer DoRPCRequestDuration(GetStakingNetworkInfo, timer)
err := s.wait(ctx) err := s.wait(s.limiterGetStakingNetworkInfo, ctx)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetStakingNetworkInfo, FailedNumber) DoMetricRPCQueryInfo(GetStakingNetworkInfo, FailedNumber)
return nil, err return nil, err

@ -2,7 +2,12 @@ package rpc
import ( import (
"context" "context"
"fmt"
"math/big" "math/big"
"reflect"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
@ -29,6 +34,9 @@ type PublicStakingService struct {
version Version version Version
validatorInfoCache *lru.Cache // cache for detailed validator information per page and block validatorInfoCache *lru.Cache // cache for detailed validator information per page and block
// TEMP SOLUTION to rpc node spamming issue
limiterGetAllValidatorInformation *rate.Limiter
limiterGetAllDelegationInformation *rate.Limiter
} }
// NewPublicStakingAPI creates a new API for the RPC interface // NewPublicStakingAPI creates a new API for the RPC interface
@ -38,14 +46,34 @@ func NewPublicStakingAPI(hmy *hmy.Harmony, version Version) rpc.API {
Namespace: version.Namespace(), Namespace: version.Namespace(),
Version: APIVersion, Version: APIVersion,
Service: &PublicStakingService{ Service: &PublicStakingService{
hmy: hmy, hmy: hmy,
version: version, version: version,
validatorInfoCache: viCache, validatorInfoCache: viCache,
limiterGetAllValidatorInformation: rate.NewLimiter(1, 3),
limiterGetAllDelegationInformation: rate.NewLimiter(1, 3),
}, },
Public: true, Public: true,
} }
} }
func (s *PublicStakingService) wait(limiter *rate.Limiter, ctx context.Context) error {
if limiter != nil {
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel()
if !limiter.Allow() {
strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{
name: strLimit,
}).Inc()
}
return limiter.Wait(deadlineCtx)
}
return nil
}
// getBalanceByBlockNumber returns balance by block number at given eth blockNum without checks // getBalanceByBlockNumber returns balance by block number at given eth blockNum without checks
func (s *PublicStakingService) getBalanceByBlockNumber( func (s *PublicStakingService) getBalanceByBlockNumber(
ctx context.Context, address string, blockNum rpc.BlockNumber, ctx context.Context, address string, blockNum rpc.BlockNumber,
@ -208,6 +236,12 @@ func (s *PublicStakingService) GetAllValidatorInformation(
timer := DoMetricRPCRequest(GetAllValidatorInformation) timer := DoMetricRPCRequest(GetAllValidatorInformation)
defer DoRPCRequestDuration(GetAllValidatorInformation, timer) defer DoRPCRequestDuration(GetAllValidatorInformation, timer)
err := s.wait(s.limiterGetAllValidatorInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber)
return nil, err
}
if !isBeaconShard(s.hmy) { if !isBeaconShard(s.hmy) {
DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber) DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber)
return nil, ErrNotBeaconShard return nil, ErrNotBeaconShard
@ -231,6 +265,12 @@ func (s *PublicStakingService) GetAllValidatorInformationByBlockNumber(
timer := DoMetricRPCRequest(GetAllValidatorInformationByBlockNumber) timer := DoMetricRPCRequest(GetAllValidatorInformationByBlockNumber)
defer DoRPCRequestDuration(GetAllValidatorInformationByBlockNumber, timer) defer DoRPCRequestDuration(GetAllValidatorInformationByBlockNumber, timer)
err := s.wait(s.limiterGetAllValidatorInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllValidatorInformationByBlockNumber, FailedNumber)
return nil, err
}
// Process number based on version // Process number based on version
blockNum := blockNumber.EthBlockNumber() blockNum := blockNumber.EthBlockNumber()
@ -466,6 +506,12 @@ func (s *PublicStakingService) GetAllDelegationInformation(
timer := DoMetricRPCRequest(GetAllDelegationInformation) timer := DoMetricRPCRequest(GetAllDelegationInformation)
defer DoRPCRequestDuration(GetAllDelegationInformation, timer) defer DoRPCRequestDuration(GetAllDelegationInformation, timer)
err := s.wait(s.limiterGetAllDelegationInformation, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber)
return nil, err
}
if !isBeaconShard(s.hmy) { if !isBeaconShard(s.hmy) {
DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber) DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber)
return nil, ErrNotBeaconShard return nil, ErrNotBeaconShard
@ -495,7 +541,6 @@ func (s *PublicStakingService) GetAllDelegationInformation(
// Fetch all delegations // Fetch all delegations
validators := make([][]StructuredResponse, validatorsNum) validators := make([][]StructuredResponse, validatorsNum)
var err error
for i := start; i < start+validatorsNum; i++ { for i := start; i < start+validatorsNum; i++ {
validators[i-start], err = s.GetDelegationsByValidator(ctx, addresses[i].String()) validators[i-start], err = s.GetDelegationsByValidator(ctx, addresses[i].String())
if err != nil { if err != nil {

Loading…
Cancel
Save