From 5e09b89322aa074e3872399bd2163480f19de62f Mon Sep 17 00:00:00 2001 From: MathxH Chen Date: Fri, 21 May 2021 10:51:05 +0800 Subject: [PATCH] [rpc] Add Rate Limiter and cache for RPCs (#3711) * Fix 1.0.4 migration func for rate limiter --- cmd/harmony/config.go | 4 +- cmd/harmony/config_migrations.go | 8 ++ cmd/harmony/config_migrations_test.go | 1 - cmd/harmony/default.go | 4 +- cmd/harmony/flags.go | 21 +++++ cmd/harmony/flags_test.go | 36 +++++++- cmd/harmony/main.go | 16 ++-- go.mod | 1 + internal/configs/node/config.go | 3 + internal/configs/node/network.go | 5 ++ rpc/blockchain.go | 122 ++++++++++++++++++++++++-- rpc/rpc.go | 10 +-- 12 files changed, 206 insertions(+), 25 deletions(-) diff --git a/cmd/harmony/config.go b/cmd/harmony/config.go index 92513fb21..eed778962 100644 --- a/cmd/harmony/config.go +++ b/cmd/harmony/config.go @@ -131,7 +131,9 @@ type wsConfig struct { } type rpcOptConfig struct { - DebugEnabled bool // Enables PrivateDebugService APIs, including the EVM tracer + DebugEnabled bool // Enables PrivateDebugService APIs, including the EVM tracer + RateLimterEnabled bool // Enable Rate limiter for RPC + RequestsPerSecond int // for RPC rate limiter } type devnetConfig struct { diff --git a/cmd/harmony/config_migrations.go b/cmd/harmony/config_migrations.go index d98b928c6..6800a6c66 100644 --- a/cmd/harmony/config_migrations.go +++ b/cmd/harmony/config_migrations.go @@ -90,6 +90,14 @@ func init() { confTree.Set("HTTP.RosettaPort", defaultConfig.HTTP.RosettaPort) } + if confTree.Get("RPCOpt.RateLimterEnabled") == nil { + confTree.Set("RPCOpt.RateLimterEnabled", defaultConfig.RPCOpt.RateLimterEnabled) + } + + if confTree.Get("RPCOpt.RequestsPerSecond") == nil { + confTree.Set("RPCOpt.RequestsPerSecond", defaultConfig.RPCOpt.RequestsPerSecond) + } + if confTree.Get("P2P.IP") == nil { confTree.Set("P2P.IP", defaultConfig.P2P.IP) } diff --git a/cmd/harmony/config_migrations_test.go b/cmd/harmony/config_migrations_test.go index 07763f0a4..81efd5105 100644 --- a/cmd/harmony/config_migrations_test.go +++ b/cmd/harmony/config_migrations_test.go @@ -135,7 +135,6 @@ Version = "1.0.3" Enabled = true IP = "127.0.0.1" Port = 9800 - `) V1_0_4ConfigDefault = []byte(` diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 829cc62ae..cb2f4f51d 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -38,7 +38,9 @@ var defaultConfig = harmonyConfig{ Port: nodeconfig.DefaultWSPort, }, RPCOpt: rpcOptConfig{ - DebugEnabled: false, + DebugEnabled: false, + RateLimterEnabled: true, + RequestsPerSecond: nodeconfig.DefaultRPCRateLimit, }, BLSKeys: blsConfig{ KeyDir: "./.hmy/blskeys", diff --git a/cmd/harmony/flags.go b/cmd/harmony/flags.go index 9e9ff27f9..22b6d2b17 100644 --- a/cmd/harmony/flags.go +++ b/cmd/harmony/flags.go @@ -74,6 +74,8 @@ var ( rpcOptFlags = []cli.Flag{ rpcDebugEnabledFlag, + rpcRateLimiterEnabledFlag, + rpcRateLimitFlag, } blsFlags = append(newBLSFlags, legacyBLSFlags...) @@ -628,12 +630,31 @@ var ( DefValue: defaultConfig.RPCOpt.DebugEnabled, Hidden: true, } + + rpcRateLimiterEnabledFlag = cli.BoolFlag{ + Name: "rpc.ratelimiter", + Usage: "enable rate limiter for RPCs", + DefValue: defaultConfig.RPCOpt.RateLimterEnabled, + } + + rpcRateLimitFlag = cli.IntFlag{ + Name: "rpc.ratelimit", + Usage: "the number of requests per second for RPCs", + DefValue: defaultConfig.RPCOpt.RequestsPerSecond, + } ) func applyRPCOptFlags(cmd *cobra.Command, config *harmonyConfig) { if cli.IsFlagChanged(cmd, rpcDebugEnabledFlag) { config.RPCOpt.DebugEnabled = cli.GetBoolFlagValue(cmd, rpcDebugEnabledFlag) } + if cli.IsFlagChanged(cmd, rpcRateLimiterEnabledFlag) { + config.RPCOpt.RateLimterEnabled = cli.GetBoolFlagValue(cmd, rpcRateLimiterEnabledFlag) + } + if cli.IsFlagChanged(cmd, rpcRateLimitFlag) { + config.RPCOpt.RequestsPerSecond = cli.GetIntFlagValue(cmd, rpcRateLimitFlag) + } + } // bls flags diff --git a/cmd/harmony/flags_test.go b/cmd/harmony/flags_test.go index e747aff60..ea55a0b15 100644 --- a/cmd/harmony/flags_test.go +++ b/cmd/harmony/flags_test.go @@ -66,6 +66,11 @@ func TestHarmonyFlags(t *testing.T) { RosettaEnabled: false, RosettaPort: 9700, }, + RPCOpt: rpcOptConfig{ + DebugEnabled: false, + RateLimterEnabled: true, + RequestsPerSecond: 300, + }, WS: wsConfig{ Enabled: true, IP: "127.0.0.1", @@ -535,7 +540,36 @@ func TestRPCOptFlags(t *testing.T) { { args: []string{"--rpc.debug"}, expConfig: rpcOptConfig{ - DebugEnabled: true, + DebugEnabled: true, + RateLimterEnabled: true, + RequestsPerSecond: 300, + }, + }, + + { + args: []string{}, + expConfig: rpcOptConfig{ + DebugEnabled: false, + RateLimterEnabled: true, + RequestsPerSecond: 300, + }, + }, + + { + args: []string{"--rpc.ratelimiter", "--rpc.ratelimit", "1000"}, + expConfig: rpcOptConfig{ + DebugEnabled: false, + RateLimterEnabled: true, + RequestsPerSecond: 1000, + }, + }, + + { + args: []string{"--rpc.ratelimiter=false", "--rpc.ratelimit", "1000"}, + expConfig: rpcOptConfig{ + DebugEnabled: false, + RateLimterEnabled: false, + RequestsPerSecond: 1000, }, }, } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 7f10b4c66..a72d24066 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -321,13 +321,15 @@ func setupNodeAndRun(hc harmonyConfig) { // Parse RPC config nodeConfig.RPCServer = nodeconfig.RPCServerConfig{ - HTTPEnabled: hc.HTTP.Enabled, - HTTPIp: hc.HTTP.IP, - HTTPPort: hc.HTTP.Port, - WSEnabled: hc.WS.Enabled, - WSIp: hc.WS.IP, - WSPort: hc.WS.Port, - DebugEnabled: hc.RPCOpt.DebugEnabled, + HTTPEnabled: hc.HTTP.Enabled, + HTTPIp: hc.HTTP.IP, + HTTPPort: hc.HTTP.Port, + WSEnabled: hc.WS.Enabled, + WSIp: hc.WS.IP, + WSPort: hc.WS.Port, + DebugEnabled: hc.RPCOpt.DebugEnabled, + RateLimiterEnabled: hc.RPCOpt.RateLimterEnabled, + RequestsPerSecond: hc.RPCOpt.RequestsPerSecond, } // Parse rosetta config diff --git a/go.mod b/go.mod index ac20a8918..78289cb00 100644 --- a/go.mod +++ b/go.mod @@ -56,6 +56,7 @@ require ( golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 golang.org/x/lint v0.0.0-20200302205851-738671d3881b golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/tools v0.0.0-20210106214847-113979e3529a google.golang.org/grpc v1.33.2 google.golang.org/protobuf v1.25.0 diff --git a/internal/configs/node/config.go b/internal/configs/node/config.go index 4897c6e43..070abb9c8 100644 --- a/internal/configs/node/config.go +++ b/internal/configs/node/config.go @@ -107,6 +107,9 @@ type RPCServerConfig struct { WSPort int DebugEnabled bool + + RateLimiterEnabled bool + RequestsPerSecond int } // RosettaServerConfig is the config for the rosetta server diff --git a/internal/configs/node/network.go b/internal/configs/node/network.go index 154f31b94..8bc6577c8 100644 --- a/internal/configs/node/network.go +++ b/internal/configs/node/network.go @@ -56,6 +56,11 @@ const ( DefaultPrometheusPort = 9900 ) +const ( + // DefaultRateLimit for RPC, the number of requests per second + DefaultRPCRateLimit = 300 +) + const ( // rpcHTTPPortOffset is the port offset for RPC HTTP requests rpcHTTPPortOffset = 500 diff --git a/rpc/blockchain.go b/rpc/blockchain.go index c6cbfc839..e347b5e9a 100644 --- a/rpc/blockchain.go +++ b/rpc/blockchain.go @@ -4,10 +4,12 @@ import ( "context" "fmt" "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/time/rate" "github.com/harmony-one/harmony/hmy" "github.com/harmony-one/harmony/internal/chain" @@ -21,23 +23,43 @@ import ( v2 "github.com/harmony-one/harmony/rpc/v2" "github.com/harmony-one/harmony/shard" stakingReward "github.com/harmony-one/harmony/staking/reward" + + lru "github.com/hashicorp/golang-lru" ) // PublicBlockchainService provides an API to access the Harmony blockchain. // It offers only methods that operate on public data that is freely available to anyone. type PublicBlockchainService struct { - hmy *hmy.Harmony - version Version + hmy *hmy.Harmony + version Version + limiter *rate.Limiter + blockCache *lru.Cache } +const ( + DefaultRateLimiterWaitTimeout = 5 * time.Second + blockCacheLimit = 256 +) + // NewPublicBlockchainAPI creates a new API for the RPC interface -func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version) rpc.API { - return rpc.API{ - Namespace: version.Namespace(), - Version: APIVersion, - Service: &PublicBlockchainService{hmy, version}, - Public: true, +func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API { + blockCache, _ := lru.New(blockCacheLimit) + if limiterEnable { + return rpc.API{ + Namespace: version.Namespace(), + Version: APIVersion, + Service: &PublicBlockchainService{hmy, version, rate.NewLimiter(rate.Limit(limit), 1), blockCache}, + Public: true, + } + } else { + return rpc.API{ + Namespace: version.Namespace(), + Version: APIVersion, + Service: &PublicBlockchainService{hmy, version, nil, blockCache}, + Public: true, + } } + } // ChainId returns the chain id of the chain - required by MetaMask @@ -122,14 +144,33 @@ func (s *PublicBlockchainService) BlockNumber(ctx context.Context) (interface{}, } } +func (s *PublicBlockchainService) wait(ctx context.Context) error { + if s.limiter != nil { + deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout) + defer cancel() + return s.limiter.Wait(deadlineCtx) + } + return nil +} + // GetBlockByNumber returns the requested block. When blockNum is -1 the chain head is returned. When fullTx is true all // transactions in the block are returned in full detail, otherwise only the transaction hash is returned. // When withSigners in BlocksArgs is true it shows block signers for this block in list of one addresses. func (s *PublicBlockchainService) GetBlockByNumber( ctx context.Context, blockNumber BlockNumber, opts interface{}, ) (response StructuredResponse, err error) { - // Process arguments based on version + blockNum := blockNumber.EthBlockNumber() + if block, ok := s.blockCache.Get(uint64(blockNum)); ok { + return block.(StructuredResponse), nil + } + + err = s.wait(ctx) + if err != nil { + return nil, err + } + + // Process arguments based on version var blockArgs *rpc_common.BlockArgs blockArgs, ok := opts.(*rpc_common.BlockArgs) if !ok { @@ -186,6 +227,7 @@ func (s *PublicBlockchainService) GetBlockByNumber( } } + s.blockCache.Add(uint64(blockNum), response) return response, err } @@ -198,6 +240,12 @@ func (s *PublicBlockchainService) GetBlockByNumber( func (s *PublicBlockchainService) GetBlockByHash( ctx context.Context, blockHash common.Hash, opts interface{}, ) (response StructuredResponse, err error) { + + err = s.wait(ctx) + if err != nil { + return nil, err + } + // Process arguments based on version var blockArgs *rpc_common.BlockArgs blockArgs, ok := opts.(*rpc_common.BlockArgs) @@ -260,6 +308,12 @@ func (s *PublicBlockchainService) GetBlocks( ctx context.Context, blockNumberStart BlockNumber, blockNumberEnd BlockNumber, blockArgs *rpc_common.BlockArgs, ) ([]StructuredResponse, error) { + + err := s.wait(ctx) + if err != nil { + return nil, err + } + blockStart := blockNumberStart.Int64() blockEnd := blockNumberEnd.Int64() @@ -469,6 +523,11 @@ func (s *PublicBlockchainService) GetLeader(ctx context.Context) (string, error) func (s *PublicBlockchainService) GetShardingStructure( ctx context.Context, ) ([]StructuredResponse, error) { + err := s.wait(ctx) + if err != nil { + return nil, err + } + // Get header and number of shards. epoch := s.hmy.CurrentBlock().Epoch() numShard := shard.Schedule.InstanceForEpoch(epoch).NumShards() @@ -487,6 +546,12 @@ func (s *PublicBlockchainService) GetShardID(ctx context.Context) (int, error) { func (s *PublicBlockchainService) GetBalanceByBlockNumber( ctx context.Context, address string, blockNumber BlockNumber, ) (interface{}, error) { + + err := s.wait(ctx) + if err != nil { + return nil, err + } + // Process number based on version blockNum := blockNumber.EthBlockNumber() @@ -512,6 +577,12 @@ func (s *PublicBlockchainService) GetBalanceByBlockNumber( // LatestHeader returns the latest header information func (s *PublicBlockchainService) LatestHeader(ctx context.Context) (StructuredResponse, error) { + + err := s.wait(ctx) + if err != nil { + return nil, err + } + // Fetch Header header, err := s.hmy.HeaderByNumber(ctx, rpc.LatestBlockNumber) if err != nil { @@ -535,6 +606,12 @@ func (s *PublicBlockchainService) GetLatestChainHeaders( func (s *PublicBlockchainService) GetLastCrossLinks( ctx context.Context, ) ([]StructuredResponse, error) { + + err := s.wait(ctx) + if err != nil { + return nil, err + } + if !isBeaconShard(s.hmy) { return nil, ErrNotBeaconShard } @@ -561,6 +638,11 @@ func (s *PublicBlockchainService) GetLastCrossLinks( func (s *PublicBlockchainService) GetHeaderByNumber( ctx context.Context, blockNumber BlockNumber, ) (StructuredResponse, error) { + err := s.wait(ctx) + if err != nil { + return nil, err + } + // Process number based on version blockNum := blockNumber.EthBlockNumber() @@ -583,6 +665,11 @@ func (s *PublicBlockchainService) GetHeaderByNumber( func (s *PublicBlockchainService) GetCurrentUtilityMetrics( ctx context.Context, ) (StructuredResponse, error) { + err := s.wait(ctx) + if err != nil { + return nil, err + } + if !isBeaconShard(s.hmy) { return nil, ErrNotBeaconShard } @@ -601,6 +688,11 @@ func (s *PublicBlockchainService) GetCurrentUtilityMetrics( func (s *PublicBlockchainService) GetSuperCommittees( ctx context.Context, ) (StructuredResponse, error) { + err := s.wait(ctx) + if err != nil { + return nil, err + } + if !isBeaconShard(s.hmy) { return nil, ErrNotBeaconShard } @@ -619,6 +711,12 @@ func (s *PublicBlockchainService) GetSuperCommittees( func (s *PublicBlockchainService) GetCurrentBadBlocks( ctx context.Context, ) ([]StructuredResponse, error) { + + err := s.wait(ctx) + if err != nil { + return nil, err + } + // Fetch bad blocks and format badBlocks := []StructuredResponse{} for _, blk := range s.hmy.GetCurrentBadBlocks() { @@ -651,6 +749,12 @@ func (s *PublicBlockchainService) GetCirculatingSupply( func (s *PublicBlockchainService) GetStakingNetworkInfo( ctx context.Context, ) (StructuredResponse, error) { + + err := s.wait(ctx) + if err != nil { + return nil, err + } + if !isBeaconShard(s.hmy) { return nil, ErrNotBeaconShard } diff --git a/rpc/rpc.go b/rpc/rpc.go index 3cf3b1da2..7eb33d2a9 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -70,7 +70,7 @@ func (n Version) Namespace() string { // StartServers starts the http & ws servers func StartServers(hmy *hmy.Harmony, apis []rpc.API, config nodeconfig.RPCServerConfig) error { - apis = append(apis, getAPIs(hmy, config.DebugEnabled)...) + apis = append(apis, getAPIs(hmy, config.DebugEnabled, config.RateLimiterEnabled, config.RequestsPerSecond)...) if config.HTTPEnabled { httpEndpoint = fmt.Sprintf("%v:%v", config.HTTPIp, config.HTTPPort) @@ -121,15 +121,15 @@ func StopServers() error { } // getAPIs returns all the API methods for the RPC interface -func getAPIs(hmy *hmy.Harmony, debugEnable bool) []rpc.API { +func getAPIs(hmy *hmy.Harmony, debugEnable bool, rateLimiterEnable bool, ratelimit int) []rpc.API { publicAPIs := []rpc.API{ // Public methods NewPublicHarmonyAPI(hmy, V1), NewPublicHarmonyAPI(hmy, V2), NewPublicHarmonyAPI(hmy, Eth), - NewPublicBlockchainAPI(hmy, V1), - NewPublicBlockchainAPI(hmy, V2), - NewPublicBlockchainAPI(hmy, Eth), + NewPublicBlockchainAPI(hmy, V1, rateLimiterEnable, ratelimit), + NewPublicBlockchainAPI(hmy, V2, rateLimiterEnable, ratelimit), + NewPublicBlockchainAPI(hmy, Eth, rateLimiterEnable, ratelimit), NewPublicContractAPI(hmy, V1), NewPublicContractAPI(hmy, V2), NewPublicContractAPI(hmy, Eth),