[rpc] Add Rate Limiter and cache for RPCs (#3711)

* Fix 1.0.4 migration func for rate limiter
pull/3718/head
MathxH Chen 4 years ago committed by GitHub
parent ce6e4c1f4c
commit 5e09b89322
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cmd/harmony/config.go
  2. 8
      cmd/harmony/config_migrations.go
  3. 1
      cmd/harmony/config_migrations_test.go
  4. 2
      cmd/harmony/default.go
  5. 21
      cmd/harmony/flags.go
  6. 34
      cmd/harmony/flags_test.go
  7. 2
      cmd/harmony/main.go
  8. 1
      go.mod
  9. 3
      internal/configs/node/config.go
  10. 5
      internal/configs/node/network.go
  11. 110
      rpc/blockchain.go
  12. 10
      rpc/rpc.go

@ -132,6 +132,8 @@ type wsConfig struct {
type rpcOptConfig struct { type rpcOptConfig struct {
DebugEnabled bool // Enables PrivateDebugService APIs, including the EVM tracer DebugEnabled bool // Enables PrivateDebugService APIs, including the EVM tracer
RateLimterEnabled bool // Enable Rate limiter for RPC
RequestsPerSecond int // for RPC rate limiter
} }
type devnetConfig struct { type devnetConfig struct {

@ -90,6 +90,14 @@ func init() {
confTree.Set("HTTP.RosettaPort", defaultConfig.HTTP.RosettaPort) confTree.Set("HTTP.RosettaPort", defaultConfig.HTTP.RosettaPort)
} }
if confTree.Get("RPCOpt.RateLimterEnabled") == nil {
confTree.Set("RPCOpt.RateLimterEnabled", defaultConfig.RPCOpt.RateLimterEnabled)
}
if confTree.Get("RPCOpt.RequestsPerSecond") == nil {
confTree.Set("RPCOpt.RequestsPerSecond", defaultConfig.RPCOpt.RequestsPerSecond)
}
if confTree.Get("P2P.IP") == nil { if confTree.Get("P2P.IP") == nil {
confTree.Set("P2P.IP", defaultConfig.P2P.IP) confTree.Set("P2P.IP", defaultConfig.P2P.IP)
} }

@ -135,7 +135,6 @@ Version = "1.0.3"
Enabled = true Enabled = true
IP = "127.0.0.1" IP = "127.0.0.1"
Port = 9800 Port = 9800
`) `)
V1_0_4ConfigDefault = []byte(` V1_0_4ConfigDefault = []byte(`

@ -39,6 +39,8 @@ var defaultConfig = harmonyConfig{
}, },
RPCOpt: rpcOptConfig{ RPCOpt: rpcOptConfig{
DebugEnabled: false, DebugEnabled: false,
RateLimterEnabled: true,
RequestsPerSecond: nodeconfig.DefaultRPCRateLimit,
}, },
BLSKeys: blsConfig{ BLSKeys: blsConfig{
KeyDir: "./.hmy/blskeys", KeyDir: "./.hmy/blskeys",

@ -74,6 +74,8 @@ var (
rpcOptFlags = []cli.Flag{ rpcOptFlags = []cli.Flag{
rpcDebugEnabledFlag, rpcDebugEnabledFlag,
rpcRateLimiterEnabledFlag,
rpcRateLimitFlag,
} }
blsFlags = append(newBLSFlags, legacyBLSFlags...) blsFlags = append(newBLSFlags, legacyBLSFlags...)
@ -628,12 +630,31 @@ var (
DefValue: defaultConfig.RPCOpt.DebugEnabled, DefValue: defaultConfig.RPCOpt.DebugEnabled,
Hidden: true, Hidden: true,
} }
rpcRateLimiterEnabledFlag = cli.BoolFlag{
Name: "rpc.ratelimiter",
Usage: "enable rate limiter for RPCs",
DefValue: defaultConfig.RPCOpt.RateLimterEnabled,
}
rpcRateLimitFlag = cli.IntFlag{
Name: "rpc.ratelimit",
Usage: "the number of requests per second for RPCs",
DefValue: defaultConfig.RPCOpt.RequestsPerSecond,
}
) )
func applyRPCOptFlags(cmd *cobra.Command, config *harmonyConfig) { func applyRPCOptFlags(cmd *cobra.Command, config *harmonyConfig) {
if cli.IsFlagChanged(cmd, rpcDebugEnabledFlag) { if cli.IsFlagChanged(cmd, rpcDebugEnabledFlag) {
config.RPCOpt.DebugEnabled = cli.GetBoolFlagValue(cmd, rpcDebugEnabledFlag) config.RPCOpt.DebugEnabled = cli.GetBoolFlagValue(cmd, rpcDebugEnabledFlag)
} }
if cli.IsFlagChanged(cmd, rpcRateLimiterEnabledFlag) {
config.RPCOpt.RateLimterEnabled = cli.GetBoolFlagValue(cmd, rpcRateLimiterEnabledFlag)
}
if cli.IsFlagChanged(cmd, rpcRateLimitFlag) {
config.RPCOpt.RequestsPerSecond = cli.GetIntFlagValue(cmd, rpcRateLimitFlag)
}
} }
// bls flags // bls flags

@ -66,6 +66,11 @@ func TestHarmonyFlags(t *testing.T) {
RosettaEnabled: false, RosettaEnabled: false,
RosettaPort: 9700, RosettaPort: 9700,
}, },
RPCOpt: rpcOptConfig{
DebugEnabled: false,
RateLimterEnabled: true,
RequestsPerSecond: 300,
},
WS: wsConfig{ WS: wsConfig{
Enabled: true, Enabled: true,
IP: "127.0.0.1", IP: "127.0.0.1",
@ -536,6 +541,35 @@ func TestRPCOptFlags(t *testing.T) {
args: []string{"--rpc.debug"}, args: []string{"--rpc.debug"},
expConfig: rpcOptConfig{ expConfig: rpcOptConfig{
DebugEnabled: true, DebugEnabled: true,
RateLimterEnabled: true,
RequestsPerSecond: 300,
},
},
{
args: []string{},
expConfig: rpcOptConfig{
DebugEnabled: false,
RateLimterEnabled: true,
RequestsPerSecond: 300,
},
},
{
args: []string{"--rpc.ratelimiter", "--rpc.ratelimit", "1000"},
expConfig: rpcOptConfig{
DebugEnabled: false,
RateLimterEnabled: true,
RequestsPerSecond: 1000,
},
},
{
args: []string{"--rpc.ratelimiter=false", "--rpc.ratelimit", "1000"},
expConfig: rpcOptConfig{
DebugEnabled: false,
RateLimterEnabled: false,
RequestsPerSecond: 1000,
}, },
}, },
} }

@ -328,6 +328,8 @@ func setupNodeAndRun(hc harmonyConfig) {
WSIp: hc.WS.IP, WSIp: hc.WS.IP,
WSPort: hc.WS.Port, WSPort: hc.WS.Port,
DebugEnabled: hc.RPCOpt.DebugEnabled, DebugEnabled: hc.RPCOpt.DebugEnabled,
RateLimiterEnabled: hc.RPCOpt.RateLimterEnabled,
RequestsPerSecond: hc.RPCOpt.RequestsPerSecond,
} }
// Parse rosetta config // Parse rosetta config

@ -56,6 +56,7 @@ require (
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/lint v0.0.0-20200302205851-738671d3881b golang.org/x/lint v0.0.0-20200302205851-738671d3881b
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
golang.org/x/tools v0.0.0-20210106214847-113979e3529a golang.org/x/tools v0.0.0-20210106214847-113979e3529a
google.golang.org/grpc v1.33.2 google.golang.org/grpc v1.33.2
google.golang.org/protobuf v1.25.0 google.golang.org/protobuf v1.25.0

@ -107,6 +107,9 @@ type RPCServerConfig struct {
WSPort int WSPort int
DebugEnabled bool DebugEnabled bool
RateLimiterEnabled bool
RequestsPerSecond int
} }
// RosettaServerConfig is the config for the rosetta server // RosettaServerConfig is the config for the rosetta server

@ -56,6 +56,11 @@ const (
DefaultPrometheusPort = 9900 DefaultPrometheusPort = 9900
) )
const (
// DefaultRateLimit for RPC, the number of requests per second
DefaultRPCRateLimit = 300
)
const ( const (
// rpcHTTPPortOffset is the port offset for RPC HTTP requests // rpcHTTPPortOffset is the port offset for RPC HTTP requests
rpcHTTPPortOffset = 500 rpcHTTPPortOffset = 500

@ -4,10 +4,12 @@ import (
"context" "context"
"fmt" "fmt"
"math/big" "math/big"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"golang.org/x/time/rate"
"github.com/harmony-one/harmony/hmy" "github.com/harmony-one/harmony/hmy"
"github.com/harmony-one/harmony/internal/chain" "github.com/harmony-one/harmony/internal/chain"
@ -21,6 +23,8 @@ import (
v2 "github.com/harmony-one/harmony/rpc/v2" v2 "github.com/harmony-one/harmony/rpc/v2"
"github.com/harmony-one/harmony/shard" "github.com/harmony-one/harmony/shard"
stakingReward "github.com/harmony-one/harmony/staking/reward" stakingReward "github.com/harmony-one/harmony/staking/reward"
lru "github.com/hashicorp/golang-lru"
) )
// PublicBlockchainService provides an API to access the Harmony blockchain. // PublicBlockchainService provides an API to access the Harmony blockchain.
@ -28,18 +32,36 @@ import (
type PublicBlockchainService struct { type PublicBlockchainService struct {
hmy *hmy.Harmony hmy *hmy.Harmony
version Version version Version
limiter *rate.Limiter
blockCache *lru.Cache
} }
const (
DefaultRateLimiterWaitTimeout = 5 * time.Second
blockCacheLimit = 256
)
// NewPublicBlockchainAPI creates a new API for the RPC interface // NewPublicBlockchainAPI creates a new API for the RPC interface
func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version) rpc.API { func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API {
blockCache, _ := lru.New(blockCacheLimit)
if limiterEnable {
return rpc.API{
Namespace: version.Namespace(),
Version: APIVersion,
Service: &PublicBlockchainService{hmy, version, rate.NewLimiter(rate.Limit(limit), 1), blockCache},
Public: true,
}
} else {
return rpc.API{ return rpc.API{
Namespace: version.Namespace(), Namespace: version.Namespace(),
Version: APIVersion, Version: APIVersion,
Service: &PublicBlockchainService{hmy, version}, Service: &PublicBlockchainService{hmy, version, nil, blockCache},
Public: true, Public: true,
} }
} }
}
// ChainId returns the chain id of the chain - required by MetaMask // ChainId returns the chain id of the chain - required by MetaMask
func (s *PublicBlockchainService) ChainId(ctx context.Context) (interface{}, error) { func (s *PublicBlockchainService) ChainId(ctx context.Context) (interface{}, error) {
// Format return base on version // Format return base on version
@ -122,14 +144,33 @@ func (s *PublicBlockchainService) BlockNumber(ctx context.Context) (interface{},
} }
} }
func (s *PublicBlockchainService) wait(ctx context.Context) error {
if s.limiter != nil {
deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout)
defer cancel()
return s.limiter.Wait(deadlineCtx)
}
return nil
}
// GetBlockByNumber returns the requested block. When blockNum is -1 the chain head is returned. When fullTx is true all // GetBlockByNumber returns the requested block. When blockNum is -1 the chain head is returned. When fullTx is true all
// transactions in the block are returned in full detail, otherwise only the transaction hash is returned. // transactions in the block are returned in full detail, otherwise only the transaction hash is returned.
// When withSigners in BlocksArgs is true it shows block signers for this block in list of one addresses. // When withSigners in BlocksArgs is true it shows block signers for this block in list of one addresses.
func (s *PublicBlockchainService) GetBlockByNumber( func (s *PublicBlockchainService) GetBlockByNumber(
ctx context.Context, blockNumber BlockNumber, opts interface{}, ctx context.Context, blockNumber BlockNumber, opts interface{},
) (response StructuredResponse, err error) { ) (response StructuredResponse, err error) {
// Process arguments based on version
blockNum := blockNumber.EthBlockNumber() blockNum := blockNumber.EthBlockNumber()
if block, ok := s.blockCache.Get(uint64(blockNum)); ok {
return block.(StructuredResponse), nil
}
err = s.wait(ctx)
if err != nil {
return nil, err
}
// Process arguments based on version
var blockArgs *rpc_common.BlockArgs var blockArgs *rpc_common.BlockArgs
blockArgs, ok := opts.(*rpc_common.BlockArgs) blockArgs, ok := opts.(*rpc_common.BlockArgs)
if !ok { if !ok {
@ -186,6 +227,7 @@ func (s *PublicBlockchainService) GetBlockByNumber(
} }
} }
s.blockCache.Add(uint64(blockNum), response)
return response, err return response, err
} }
@ -198,6 +240,12 @@ func (s *PublicBlockchainService) GetBlockByNumber(
func (s *PublicBlockchainService) GetBlockByHash( func (s *PublicBlockchainService) GetBlockByHash(
ctx context.Context, blockHash common.Hash, opts interface{}, ctx context.Context, blockHash common.Hash, opts interface{},
) (response StructuredResponse, err error) { ) (response StructuredResponse, err error) {
err = s.wait(ctx)
if err != nil {
return nil, err
}
// Process arguments based on version // Process arguments based on version
var blockArgs *rpc_common.BlockArgs var blockArgs *rpc_common.BlockArgs
blockArgs, ok := opts.(*rpc_common.BlockArgs) blockArgs, ok := opts.(*rpc_common.BlockArgs)
@ -260,6 +308,12 @@ func (s *PublicBlockchainService) GetBlocks(
ctx context.Context, blockNumberStart BlockNumber, ctx context.Context, blockNumberStart BlockNumber,
blockNumberEnd BlockNumber, blockArgs *rpc_common.BlockArgs, blockNumberEnd BlockNumber, blockArgs *rpc_common.BlockArgs,
) ([]StructuredResponse, error) { ) ([]StructuredResponse, error) {
err := s.wait(ctx)
if err != nil {
return nil, err
}
blockStart := blockNumberStart.Int64() blockStart := blockNumberStart.Int64()
blockEnd := blockNumberEnd.Int64() blockEnd := blockNumberEnd.Int64()
@ -469,6 +523,11 @@ func (s *PublicBlockchainService) GetLeader(ctx context.Context) (string, error)
func (s *PublicBlockchainService) GetShardingStructure( func (s *PublicBlockchainService) GetShardingStructure(
ctx context.Context, ctx context.Context,
) ([]StructuredResponse, error) { ) ([]StructuredResponse, error) {
err := s.wait(ctx)
if err != nil {
return nil, err
}
// Get header and number of shards. // Get header and number of shards.
epoch := s.hmy.CurrentBlock().Epoch() epoch := s.hmy.CurrentBlock().Epoch()
numShard := shard.Schedule.InstanceForEpoch(epoch).NumShards() numShard := shard.Schedule.InstanceForEpoch(epoch).NumShards()
@ -487,6 +546,12 @@ func (s *PublicBlockchainService) GetShardID(ctx context.Context) (int, error) {
func (s *PublicBlockchainService) GetBalanceByBlockNumber( func (s *PublicBlockchainService) GetBalanceByBlockNumber(
ctx context.Context, address string, blockNumber BlockNumber, ctx context.Context, address string, blockNumber BlockNumber,
) (interface{}, error) { ) (interface{}, error) {
err := s.wait(ctx)
if err != nil {
return nil, err
}
// Process number based on version // Process number based on version
blockNum := blockNumber.EthBlockNumber() blockNum := blockNumber.EthBlockNumber()
@ -512,6 +577,12 @@ func (s *PublicBlockchainService) GetBalanceByBlockNumber(
// LatestHeader returns the latest header information // LatestHeader returns the latest header information
func (s *PublicBlockchainService) LatestHeader(ctx context.Context) (StructuredResponse, error) { func (s *PublicBlockchainService) LatestHeader(ctx context.Context) (StructuredResponse, error) {
err := s.wait(ctx)
if err != nil {
return nil, err
}
// Fetch Header // Fetch Header
header, err := s.hmy.HeaderByNumber(ctx, rpc.LatestBlockNumber) header, err := s.hmy.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if err != nil { if err != nil {
@ -535,6 +606,12 @@ func (s *PublicBlockchainService) GetLatestChainHeaders(
func (s *PublicBlockchainService) GetLastCrossLinks( func (s *PublicBlockchainService) GetLastCrossLinks(
ctx context.Context, ctx context.Context,
) ([]StructuredResponse, error) { ) ([]StructuredResponse, error) {
err := s.wait(ctx)
if err != nil {
return nil, err
}
if !isBeaconShard(s.hmy) { if !isBeaconShard(s.hmy) {
return nil, ErrNotBeaconShard return nil, ErrNotBeaconShard
} }
@ -561,6 +638,11 @@ func (s *PublicBlockchainService) GetLastCrossLinks(
func (s *PublicBlockchainService) GetHeaderByNumber( func (s *PublicBlockchainService) GetHeaderByNumber(
ctx context.Context, blockNumber BlockNumber, ctx context.Context, blockNumber BlockNumber,
) (StructuredResponse, error) { ) (StructuredResponse, error) {
err := s.wait(ctx)
if err != nil {
return nil, err
}
// Process number based on version // Process number based on version
blockNum := blockNumber.EthBlockNumber() blockNum := blockNumber.EthBlockNumber()
@ -583,6 +665,11 @@ func (s *PublicBlockchainService) GetHeaderByNumber(
func (s *PublicBlockchainService) GetCurrentUtilityMetrics( func (s *PublicBlockchainService) GetCurrentUtilityMetrics(
ctx context.Context, ctx context.Context,
) (StructuredResponse, error) { ) (StructuredResponse, error) {
err := s.wait(ctx)
if err != nil {
return nil, err
}
if !isBeaconShard(s.hmy) { if !isBeaconShard(s.hmy) {
return nil, ErrNotBeaconShard return nil, ErrNotBeaconShard
} }
@ -601,6 +688,11 @@ func (s *PublicBlockchainService) GetCurrentUtilityMetrics(
func (s *PublicBlockchainService) GetSuperCommittees( func (s *PublicBlockchainService) GetSuperCommittees(
ctx context.Context, ctx context.Context,
) (StructuredResponse, error) { ) (StructuredResponse, error) {
err := s.wait(ctx)
if err != nil {
return nil, err
}
if !isBeaconShard(s.hmy) { if !isBeaconShard(s.hmy) {
return nil, ErrNotBeaconShard return nil, ErrNotBeaconShard
} }
@ -619,6 +711,12 @@ func (s *PublicBlockchainService) GetSuperCommittees(
func (s *PublicBlockchainService) GetCurrentBadBlocks( func (s *PublicBlockchainService) GetCurrentBadBlocks(
ctx context.Context, ctx context.Context,
) ([]StructuredResponse, error) { ) ([]StructuredResponse, error) {
err := s.wait(ctx)
if err != nil {
return nil, err
}
// Fetch bad blocks and format // Fetch bad blocks and format
badBlocks := []StructuredResponse{} badBlocks := []StructuredResponse{}
for _, blk := range s.hmy.GetCurrentBadBlocks() { for _, blk := range s.hmy.GetCurrentBadBlocks() {
@ -651,6 +749,12 @@ func (s *PublicBlockchainService) GetCirculatingSupply(
func (s *PublicBlockchainService) GetStakingNetworkInfo( func (s *PublicBlockchainService) GetStakingNetworkInfo(
ctx context.Context, ctx context.Context,
) (StructuredResponse, error) { ) (StructuredResponse, error) {
err := s.wait(ctx)
if err != nil {
return nil, err
}
if !isBeaconShard(s.hmy) { if !isBeaconShard(s.hmy) {
return nil, ErrNotBeaconShard return nil, ErrNotBeaconShard
} }

@ -70,7 +70,7 @@ func (n Version) Namespace() string {
// StartServers starts the http & ws servers // StartServers starts the http & ws servers
func StartServers(hmy *hmy.Harmony, apis []rpc.API, config nodeconfig.RPCServerConfig) error { func StartServers(hmy *hmy.Harmony, apis []rpc.API, config nodeconfig.RPCServerConfig) error {
apis = append(apis, getAPIs(hmy, config.DebugEnabled)...) apis = append(apis, getAPIs(hmy, config.DebugEnabled, config.RateLimiterEnabled, config.RequestsPerSecond)...)
if config.HTTPEnabled { if config.HTTPEnabled {
httpEndpoint = fmt.Sprintf("%v:%v", config.HTTPIp, config.HTTPPort) httpEndpoint = fmt.Sprintf("%v:%v", config.HTTPIp, config.HTTPPort)
@ -121,15 +121,15 @@ func StopServers() error {
} }
// getAPIs returns all the API methods for the RPC interface // getAPIs returns all the API methods for the RPC interface
func getAPIs(hmy *hmy.Harmony, debugEnable bool) []rpc.API { func getAPIs(hmy *hmy.Harmony, debugEnable bool, rateLimiterEnable bool, ratelimit int) []rpc.API {
publicAPIs := []rpc.API{ publicAPIs := []rpc.API{
// Public methods // Public methods
NewPublicHarmonyAPI(hmy, V1), NewPublicHarmonyAPI(hmy, V1),
NewPublicHarmonyAPI(hmy, V2), NewPublicHarmonyAPI(hmy, V2),
NewPublicHarmonyAPI(hmy, Eth), NewPublicHarmonyAPI(hmy, Eth),
NewPublicBlockchainAPI(hmy, V1), NewPublicBlockchainAPI(hmy, V1, rateLimiterEnable, ratelimit),
NewPublicBlockchainAPI(hmy, V2), NewPublicBlockchainAPI(hmy, V2, rateLimiterEnable, ratelimit),
NewPublicBlockchainAPI(hmy, Eth), NewPublicBlockchainAPI(hmy, Eth, rateLimiterEnable, ratelimit),
NewPublicContractAPI(hmy, V1), NewPublicContractAPI(hmy, V1),
NewPublicContractAPI(hmy, V2), NewPublicContractAPI(hmy, V2),
NewPublicContractAPI(hmy, Eth), NewPublicContractAPI(hmy, Eth),

Loading…
Cancel
Save