add rate limit on more rpcs

pull/3971/head
Rongjian Lan 3 years ago
parent 7e3d507f1e
commit 1ad108dc68
  1. 32
      rpc/pool.go
  2. 13
      rpc/staking.go

@ -2,6 +2,10 @@ package rpc
import ( import (
"context" "context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"reflect"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -27,6 +31,9 @@ import (
type PublicPoolService struct { type PublicPoolService struct {
hmy *hmy.Harmony hmy *hmy.Harmony
version Version version Version
// TEMP SOLUTION to rpc node spamming issue
limiterPendingTransactions *rate.Limiter
} }
// NewPublicPoolAPI creates a new API for the RPC interface // NewPublicPoolAPI creates a new API for the RPC interface
@ -34,11 +41,28 @@ func NewPublicPoolAPI(hmy *hmy.Harmony, version Version) rpc.API {
return rpc.API{ return rpc.API{
Namespace: version.Namespace(), Namespace: version.Namespace(),
Version: APIVersion, Version: APIVersion,
Service: &PublicPoolService{hmy, version}, Service: &PublicPoolService{hmy, version, rate.NewLimiter(5, 10)},
Public: true, Public: true,
} }
} }
func (s *PublicPoolService) wait(limiter *rate.Limiter, ctx context.Context) error {
if limiter != nil {
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel()
if !limiter.Allow() {
strLimit := fmt.Sprintf("%d", int64(limiter.Limit()))
name := reflect.TypeOf(limiter).Elem().Name()
rpcRateLimitCounterVec.With(prometheus.Labels{
name: strLimit,
}).Inc()
}
return limiter.Wait(deadlineCtx)
}
return nil
}
// SendRawTransaction will add the signed transaction to the transaction pool. // SendRawTransaction will add the signed transaction to the transaction pool.
// The sender is responsible for signing the transaction and using the correct nonce. // The sender is responsible for signing the transaction and using the correct nonce.
func (s *PublicPoolService) SendRawTransaction( func (s *PublicPoolService) SendRawTransaction(
@ -190,6 +214,12 @@ func (s *PublicPoolService) PendingTransactions(
timer := DoMetricRPCRequest(PendingTransactions) timer := DoMetricRPCRequest(PendingTransactions)
defer DoRPCRequestDuration(PendingTransactions, timer) defer DoRPCRequestDuration(PendingTransactions, timer)
err := s.wait(s.limiterPendingTransactions, ctx)
if err != nil {
DoMetricRPCQueryInfo(PendingTransactions, FailedNumber)
return nil, err
}
// Fetch all pending transactions (stx & plain tx) // Fetch all pending transactions (stx & plain tx)
pending, err := s.hmy.GetPoolTransactions() pending, err := s.hmy.GetPoolTransactions()
if err != nil { if err != nil {

@ -37,6 +37,7 @@ type PublicStakingService struct {
// TEMP SOLUTION to rpc node spamming issue // TEMP SOLUTION to rpc node spamming issue
limiterGetAllValidatorInformation *rate.Limiter limiterGetAllValidatorInformation *rate.Limiter
limiterGetAllDelegationInformation *rate.Limiter limiterGetAllDelegationInformation *rate.Limiter
limiterGetDelegationsByValidator *rate.Limiter
} }
// NewPublicStakingAPI creates a new API for the RPC interface // NewPublicStakingAPI creates a new API for the RPC interface
@ -51,6 +52,7 @@ func NewPublicStakingAPI(hmy *hmy.Harmony, version Version) rpc.API {
validatorInfoCache: viCache, validatorInfoCache: viCache,
limiterGetAllValidatorInformation: rate.NewLimiter(1, 3), limiterGetAllValidatorInformation: rate.NewLimiter(1, 3),
limiterGetAllDelegationInformation: rate.NewLimiter(1, 3), limiterGetAllDelegationInformation: rate.NewLimiter(1, 3),
limiterGetDelegationsByValidator: rate.NewLimiter(5, 20),
}, },
Public: true, Public: true,
} }
@ -542,7 +544,7 @@ func (s *PublicStakingService) GetAllDelegationInformation(
// Fetch all delegations // Fetch all delegations
validators := make([][]StructuredResponse, validatorsNum) validators := make([][]StructuredResponse, validatorsNum)
for i := start; i < start+validatorsNum; i++ { for i := start; i < start+validatorsNum; i++ {
validators[i-start], err = s.GetDelegationsByValidator(ctx, addresses[i].String()) validators[i-start], err = s.getDelegationByValidatorHelper(addresses[i].String())
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber) DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber)
return nil, err return nil, err
@ -687,6 +689,15 @@ func (s *PublicStakingService) GetDelegationsByValidator(
return nil, ErrNotBeaconShard return nil, ErrNotBeaconShard
} }
err := s.wait(s.limiterGetDelegationsByValidator, ctx)
if err != nil {
DoMetricRPCQueryInfo(GetDelegationsByValidator, FailedNumber)
return nil, err
}
return s.getDelegationByValidatorHelper(address)
}
func (s *PublicStakingService) getDelegationByValidatorHelper(address string) ([]StructuredResponse, error) {
// Fetch delegations // Fetch delegations
validatorAddress, err := internal_common.ParseAddr(address) validatorAddress, err := internal_common.ParseAddr(address)
if err != nil { if err != nil {

Loading…
Cancel
Save