diff --git a/rpc/blockchain.go b/rpc/blockchain.go index eb9d5cbe2..9c83ab792 100644 --- a/rpc/blockchain.go +++ b/rpc/blockchain.go @@ -4,10 +4,15 @@ import ( "context" "fmt" "math/big" + "reflect" "time" + "encoding/hex" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" @@ -38,6 +43,10 @@ type PublicBlockchainService struct { limiter *rate.Limiter rpcBlockFactory rpc_common.BlockFactory helper *bcServiceHelper + // TEMP SOLUTION to rpc node spamming issue + limiterGetStakingNetworkInfo *rate.Limiter + limiterGetSuperCommittees *rate.Limiter + limiterGetCurrentUtilityMetrics *rate.Limiter } const ( @@ -49,7 +58,7 @@ const ( func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API { var limiter *rate.Limiter if limiterEnable { - limiter := rate.NewLimiter(rate.Limit(limit), 1) + limiter = rate.NewLimiter(rate.Limit(limit), limit) strLimit := fmt.Sprintf("%d", int64(limiter.Limit())) rpcRateLimitCounterVec.With(prometheus.Labels{ "rate_limit": strLimit, @@ -57,9 +66,12 @@ func NewPublicBlockchainAPI(hmy *hmy.Harmony, version Version, limiterEnable boo } s := &PublicBlockchainService{ - hmy: hmy, - version: version, - limiter: limiter, + hmy: hmy, + version: version, + limiter: limiter, + limiterGetStakingNetworkInfo: rate.NewLimiter(5, 10), + limiterGetSuperCommittees: rate.NewLimiter(5, 10), + limiterGetCurrentUtilityMetrics: rate.NewLimiter(5, 10), } s.helper = s.newHelper() @@ -140,18 +152,19 @@ func (s *PublicBlockchainService) BlockNumber(ctx context.Context) (interface{}, } } -func (s *PublicBlockchainService) wait(ctx context.Context) error { - if s.limiter != nil { +func (s *PublicBlockchainService) wait(limiter *rate.Limiter, ctx context.Context) error { + if limiter != nil { deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout) defer cancel() - if !s.limiter.Allow() { - strLimit := fmt.Sprintf("%d", int64(s.limiter.Limit())) + if !limiter.Allow() { + strLimit := fmt.Sprintf("%d", int64(limiter.Limit())) + name := reflect.TypeOf(limiter).Elem().Name() rpcRateLimitCounterVec.With(prometheus.Labels{ - "rate_limit": strLimit, + name: strLimit, }).Inc() } - return s.limiter.Wait(deadlineCtx) + return limiter.Wait(deadlineCtx) } return nil } @@ -165,7 +178,7 @@ func (s *PublicBlockchainService) GetBlockByNumber( timer := DoMetricRPCRequest(GetBlockByNumber) defer DoRPCRequestDuration(GetBlockByNumber, timer) - err = s.wait(ctx) + err = s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetBlockByNumber, FailedNumber) return nil, err @@ -218,7 +231,7 @@ func (s *PublicBlockchainService) GetBlockByHash( timer := DoMetricRPCRequest(GetBlockByHash) defer DoRPCRequestDuration(GetBlockByHash, timer) - err = s.wait(ctx) + err = s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetBlockByHash, FailedNumber) return nil, err @@ -508,7 +521,7 @@ func (s *PublicBlockchainService) GetShardingStructure( timer := DoMetricRPCRequest(GetShardingStructure) defer DoRPCRequestDuration(GetShardingStructure, timer) - err := s.wait(ctx) + err := s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetShardingStructure, FailedNumber) return nil, err @@ -565,7 +578,7 @@ func (s *PublicBlockchainService) LatestHeader(ctx context.Context) (StructuredR timer := DoMetricRPCRequest(LatestHeader) defer DoRPCRequestDuration(LatestHeader, timer) - err := s.wait(ctx) + err := s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(LatestHeader, FailedNumber) return nil, err @@ -600,7 +613,7 @@ func (s *PublicBlockchainService) GetLastCrossLinks( timer := DoMetricRPCRequest(GetLastCrossLinks) defer DoRPCRequestDuration(GetLastCrossLinks, timer) - err := s.wait(ctx) + err := s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetLastCrossLinks, FailedNumber) return nil, err @@ -638,7 +651,7 @@ func (s *PublicBlockchainService) GetHeaderByNumber( timer := DoMetricRPCRequest(GetHeaderByNumber) defer DoRPCRequestDuration(GetHeaderByNumber, timer) - err := s.wait(ctx) + err := s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetHeaderByNumber, FailedNumber) return nil, err @@ -663,6 +676,145 @@ func (s *PublicBlockchainService) GetHeaderByNumber( return nil, err } +// Result structs for GetProof +type AccountResult struct { + Address common.Address `json:"address"` + AccountProof []string `json:"accountProof"` + Balance *hexutil.Big `json:"balance"` + CodeHash common.Hash `json:"codeHash"` + Nonce hexutil.Uint64 `json:"nonce"` + StorageHash common.Hash `json:"storageHash"` + StorageProof []StorageResult `json:"storageProof"` +} + +type StorageResult struct { + Key string `json:"key"` + Value *hexutil.Big `json:"value"` + Proof []string `json:"proof"` +} + +// GetHeaderByNumberRLPHex returns block header at given number by `hex(rlp(header))` +func (s *PublicBlockchainService) GetProof( + ctx context.Context, address common.Address, storageKeys []string, blockNumber BlockNumber) (ret *AccountResult, err error) { + timer := DoMetricRPCRequest(GetProof) + defer DoRPCRequestDuration(GetProof, timer) + + defer func() { + if ret == nil || err != nil { + DoMetricRPCQueryInfo(GetProof, FailedNumber) + } + }() + + err = s.wait(s.limiter, ctx) + if err != nil { + return + } + + // Process number based on version + blockNum := blockNumber.EthBlockNumber() + + // Ensure valid block number + if s.version != Eth && isBlockGreaterThanLatest(s.hmy, blockNum) { + err = ErrRequestedBlockTooHigh + return + } + + // Fetch Header + header, err := s.hmy.HeaderByNumber(ctx, blockNum) + if header == nil && err != nil { + return + } + state, err := s.hmy.BeaconChain.StateAt(header.Root()) + if state == nil || err != nil { + return + } + + storageTrie := state.StorageTrie(address) + storageHash := types.EmptyRootHash + codeHash := state.GetCodeHash(address) + storageProof := make([]StorageResult, len(storageKeys)) + + // if we have a storageTrie, (which means the account exists), we can update the storagehash + if storageTrie != nil { + storageHash = storageTrie.Hash() + } else { + // no storageTrie means the account does not exist, so the codeHash is the hash of an empty bytearray. + codeHash = crypto.Keccak256Hash(nil) + } + + // create the proof for the storageKeys + for i, key := range storageKeys { + if storageTrie != nil { + proof, storageError := state.GetStorageProof(address, common.HexToHash(key)) + if storageError != nil { + err = storageError + return + } + storageProof[i] = StorageResult{key, (*hexutil.Big)(state.GetState(address, common.HexToHash(key)).Big()), toHexSlice(proof)} + } else { + storageProof[i] = StorageResult{key, &hexutil.Big{}, []string{}} + } + } + + // create the accountProof + accountProof, err := state.GetProof(address) + if err != nil { + return + } + + ret, err = &AccountResult{ + Address: address, + AccountProof: toHexSlice(accountProof), + Balance: (*hexutil.Big)(state.GetBalance(address)), + CodeHash: codeHash, + Nonce: hexutil.Uint64(state.GetNonce(address)), + StorageHash: storageHash, + StorageProof: storageProof, + }, state.Error() + return +} + +// toHexSlice creates a slice of hex-strings based on []byte. +func toHexSlice(b [][]byte) []string { + r := make([]string, len(b)) + for i := range b { + r[i] = hexutil.Encode(b[i]) + } + return r +} + +// GetHeaderByNumberRLPHex returns block header at given number by `hex(rlp(header))` +func (s *PublicBlockchainService) GetHeaderByNumberRLPHex( + ctx context.Context, blockNumber BlockNumber, +) (string, error) { + timer := DoMetricRPCRequest(GetHeaderByNumberRLPHex) + defer DoRPCRequestDuration(GetHeaderByNumberRLPHex, timer) + + err := s.wait(s.limiter, ctx) + if err != nil { + DoMetricRPCQueryInfo(GetHeaderByNumberRLPHex, FailedNumber) + return "", err + } + + // Process number based on version + blockNum := blockNumber.EthBlockNumber() + + // Ensure valid block number + if s.version != Eth && isBlockGreaterThanLatest(s.hmy, blockNum) { + DoMetricRPCQueryInfo(GetHeaderByNumberRLPHex, FailedNumber) + return "", ErrRequestedBlockTooHigh + } + + // Fetch Header + header, err := s.hmy.HeaderByNumber(ctx, blockNum) + if header != nil && err == nil { + // Response output is the same for all versions + val, _ := rlp.EncodeToBytes(header) + return hex.EncodeToString(val), nil + } + return "", err +} + // GetCurrentUtilityMetrics .. func (s *PublicBlockchainService) GetCurrentUtilityMetrics( ctx context.Context, @@ -670,7 +822,7 @@ func (s *PublicBlockchainService) GetCurrentUtilityMetrics( timer := DoMetricRPCRequest(GetCurrentUtilityMetrics) defer DoRPCRequestDuration(GetCurrentUtilityMetrics, timer) - err := s.wait(ctx) + err := s.wait(s.limiterGetCurrentUtilityMetrics, ctx) if err != nil { DoMetricRPCQueryInfo(GetCurrentUtilityMetrics, FailedNumber) return nil, err @@ -699,7 +851,7 @@ func (s *PublicBlockchainService) GetSuperCommittees( timer := DoMetricRPCRequest(GetSuperCommittees) defer DoRPCRequestDuration(GetSuperCommittees, timer) - err := s.wait(ctx) + err := s.wait(s.limiterGetSuperCommittees, ctx) if err != nil { DoMetricRPCQueryInfo(GetSuperCommittees, FailedNumber) return nil, err @@ -728,7 +880,7 @@ func (s *PublicBlockchainService) GetCurrentBadBlocks( timer := DoMetricRPCRequest(GetCurrentBadBlocks) defer DoRPCRequestDuration(GetCurrentBadBlocks, timer) - err := s.wait(ctx) + err := s.wait(s.limiter, ctx) if err != nil { DoMetricRPCQueryInfo(GetCurrentBadBlocks, FailedNumber) return nil, err @@ -770,7 +922,7 @@ func (s *PublicBlockchainService) GetStakingNetworkInfo( timer := DoMetricRPCRequest(GetStakingNetworkInfo) defer DoRPCRequestDuration(GetStakingNetworkInfo, timer) - err := s.wait(ctx) + err := s.wait(s.limiterGetStakingNetworkInfo, ctx) if err != nil { DoMetricRPCQueryInfo(GetStakingNetworkInfo, FailedNumber) return nil, err diff --git a/rpc/metrics.go b/rpc/metrics.go index 8e06f83be..89985915b 100644 --- a/rpc/metrics.go +++ b/rpc/metrics.go @@ -19,6 +19,8 @@ const ( GetLatestChainHeaders = "GetLatestChainHeaders" GetLastCrossLinks = "GetLastCrossLinks" GetHeaderByNumber = "GetHeaderByNumber" + GetHeaderByNumberRLPHex = "GetHeaderByNumberRLPHex" + GetProof = "GetProof" GetCurrentUtilityMetrics = "GetCurrentUtilityMetrics" GetSuperCommittees = "GetSuperCommittees" GetCurrentBadBlocks = "GetCurrentBadBlocks" diff --git a/rpc/staking.go b/rpc/staking.go index 0be522a4a..41aae8fc4 100644 --- a/rpc/staking.go +++ b/rpc/staking.go @@ -2,7 +2,12 @@ package rpc import ( "context" + "fmt" "math/big" + "reflect" + + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" lru "github.com/hashicorp/golang-lru" @@ -29,6 +34,9 @@ type PublicStakingService struct { version Version validatorInfoCache *lru.Cache // cache for detailed validator information per page and block + // TEMP SOLUTION to rpc node spamming issue + limiterGetAllValidatorInformation *rate.Limiter + limiterGetAllDelegationInformation *rate.Limiter } // NewPublicStakingAPI creates a new API for the RPC interface @@ -38,14 +46,34 @@ func NewPublicStakingAPI(hmy *hmy.Harmony, version Version) rpc.API { Namespace: version.Namespace(), Version: APIVersion, Service: &PublicStakingService{ - hmy: hmy, - version: version, - validatorInfoCache: viCache, + hmy: hmy, + version: version, + validatorInfoCache: viCache, + limiterGetAllValidatorInformation: rate.NewLimiter(1, 3), + limiterGetAllDelegationInformation: rate.NewLimiter(1, 3), }, Public: true, } } +func (s *PublicStakingService) wait(limiter *rate.Limiter, ctx context.Context) error { + if limiter != nil { + deadlineCtx, cancel := context.WithTimeout(ctx, DefaultRateLimiterWaitTimeout) + defer cancel() + if !limiter.Allow() { + strLimit := fmt.Sprintf("%d", int64(limiter.Limit())) + + name := reflect.TypeOf(limiter).Elem().Name() + rpcRateLimitCounterVec.With(prometheus.Labels{ + name: strLimit, + }).Inc() + } + + return limiter.Wait(deadlineCtx) + } + return nil +} + // getBalanceByBlockNumber returns balance by block number at given eth blockNum without checks func (s *PublicStakingService) getBalanceByBlockNumber( ctx context.Context, address string, blockNum rpc.BlockNumber, @@ -208,6 +236,12 @@ func (s *PublicStakingService) GetAllValidatorInformation( timer := DoMetricRPCRequest(GetAllValidatorInformation) defer DoRPCRequestDuration(GetAllValidatorInformation, timer) + err := s.wait(s.limiterGetAllValidatorInformation, ctx) + if err != nil { + DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber) + return nil, err + } + if !isBeaconShard(s.hmy) { DoMetricRPCQueryInfo(GetAllValidatorInformation, FailedNumber) return nil, ErrNotBeaconShard @@ -231,6 +265,12 @@ func (s *PublicStakingService) GetAllValidatorInformationByBlockNumber( timer := DoMetricRPCRequest(GetAllValidatorInformationByBlockNumber) defer DoRPCRequestDuration(GetAllValidatorInformationByBlockNumber, timer) + err := s.wait(s.limiterGetAllValidatorInformation, ctx) + if err != nil { + DoMetricRPCQueryInfo(GetAllValidatorInformationByBlockNumber, FailedNumber) + return nil, err + } + // Process number based on version blockNum := blockNumber.EthBlockNumber() @@ -466,6 +506,12 @@ func (s *PublicStakingService) GetAllDelegationInformation( timer := DoMetricRPCRequest(GetAllDelegationInformation) defer DoRPCRequestDuration(GetAllDelegationInformation, timer) + err := s.wait(s.limiterGetAllDelegationInformation, ctx) + if err != nil { + DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber) + return nil, err + } + if !isBeaconShard(s.hmy) { DoMetricRPCQueryInfo(GetAllDelegationInformation, FailedNumber) return nil, ErrNotBeaconShard @@ -495,7 +541,6 @@ func (s *PublicStakingService) GetAllDelegationInformation( // Fetch all delegations validators := make([][]StructuredResponse, validatorsNum) - var err error for i := start; i < start+validatorsNum; i++ { validators[i-start], err = s.GetDelegationsByValidator(ctx, addresses[i].String()) if err != nil {