From 0425c7c1df4d50083751b00bd33305507a2729d3 Mon Sep 17 00:00:00 2001 From: Diego Nava <8563843+diego1q2w@users.noreply.github.com> Date: Wed, 14 Sep 2022 11:33:39 +0200 Subject: [PATCH] Fix/filter new tikv (#4275) * fix: elastic rpc new filters fix * fix: add a small delay to give readers time to catch up * fix: run the subscribe in a go routine * fix: generate the filter id before * fix: rpl not able to marshal negative big int * fix: use json to get the message --- core/blockchain_impl.go | 2 +- internal/tikv/redis_helper/pubsub.go | 75 ++++++++++++++++++++++++++-- node/api.go | 13 ++++- node/node.go | 9 ++-- rpc/filters/api.go | 43 ++++++++++++++-- rpc/filters/filter_system.go | 42 +++++++++++----- 6 files changed, 154 insertions(+), 30 deletions(-) diff --git a/core/blockchain_impl.go b/core/blockchain_impl.go index 9ec3802c2..0e24dde84 100644 --- a/core/blockchain_impl.go +++ b/core/blockchain_impl.go @@ -1601,7 +1601,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i if bc.isInitTiKV() { err = redis_helper.PublishShardUpdate(bc.ShardID(), block.NumberU64(), logs) if err != nil { - utils.Logger().Info().Err(err).Msg("redis publish shard update error") + utils.Logger().Warn().Err(err).Msg("redis publish shard update error") } } diff --git a/internal/tikv/redis_helper/pubsub.go b/internal/tikv/redis_helper/pubsub.go index 86fbea01f..f334f8d5b 100644 --- a/internal/tikv/redis_helper/pubsub.go +++ b/internal/tikv/redis_helper/pubsub.go @@ -2,9 +2,11 @@ package redis_helper import ( "context" + "encoding/json" "fmt" "io" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" @@ -15,7 +17,13 @@ import ( // BlockUpdate block update event type BlockUpdate struct { BlkNum uint64 - Logs []*types.Log + Logs []*types.LogForStorage // types.Log will cut some fields when rpl encoding/decoding look at core/types/log.go:83 +} + +// NewFilterUpdated new filter update event +type NewFilterUpdated struct { + ID string + FilterCriteria ethereum.FilterQuery } // SubscribeShardUpdate subscribe block update event @@ -25,18 +33,37 @@ func SubscribeShardUpdate(shardID uint32, cb func(blkNum uint64, logs []*types.L block := &BlockUpdate{} err := rlp.DecodeBytes([]byte(message.Payload), block) if err != nil { - utils.Logger().Info().Err(err).Msg("redis subscribe shard update error") + utils.Logger().Warn().Err(err).Msg("redis subscribe shard update error") continue } - cb(block.BlkNum, block.Logs) + logs := make([]*types.Log, len(block.Logs)) + for i, l := range block.Logs { + if l != nil { + ls := types.Log(*l) + logs[i] = &ls + } else { + logs[i] = nil + } + } + + cb(block.BlkNum, logs) } } // PublishShardUpdate publish block update event func PublishShardUpdate(shardID uint32, blkNum uint64, logs []*types.Log) error { + logsForStorage := make([]*types.LogForStorage, len(logs)) + for i, l := range logs { + if l != nil { + ls := types.LogForStorage(*l) + logsForStorage[i] = &ls + } else { + logsForStorage[i] = nil + } + } msg, err := rlp.EncodeToBytes(&BlockUpdate{ BlkNum: blkNum, - Logs: logs, + Logs: logsForStorage, }) if err != nil { return err @@ -44,6 +71,44 @@ func PublishShardUpdate(shardID uint32, blkNum uint64, logs []*types.Log) error return redisInstance.Publish(context.Background(), fmt.Sprintf("shard_update_%d", shardID), msg).Err() } +// SubscribeNewFilterLogEvent subscribe new filter log event from other readers +func SubscribeNewFilterLogEvent(shardID uint32, namespace string, cb func(id string, crit ethereum.FilterQuery)) { + if redisInstance == nil { + return + } + pubsub := redisInstance. + Subscribe(context.Background(), fmt.Sprintf("%s_new_filter_log_%d", namespace, shardID)) + for message := range pubsub.Channel() { + query := NewFilterUpdated{} + + if err := json.Unmarshal([]byte(message.Payload), &query); err != nil { + utils.Logger().Warn().Err(err).Msg("redis subscribe new_filter_log error") + continue + } + + cb(query.ID, query.FilterCriteria) + } +} + +// PublishNewFilterLogEvent publish new filter log event from other readers +func PublishNewFilterLogEvent(shardID uint32, namespace, id string, crit ethereum.FilterQuery) error { + if redisInstance == nil { + return nil + } + + ev := NewFilterUpdated{ + ID: id, + FilterCriteria: crit, + } + msg, err := json.Marshal(ev) + if err != nil { + return err + } + + return redisInstance. + Publish(context.Background(), fmt.Sprintf("%s_new_filter_log_%d", namespace, shardID), msg).Err() +} + //TxPoolUpdate tx pool update event type TxPoolUpdate struct { typ string @@ -112,7 +177,7 @@ func SubscribeTxPoolUpdate(shardID uint32, cb func(tx types.PoolTransaction, loc txu := &TxPoolUpdate{} err := rlp.DecodeBytes([]byte(message.Payload), &txu) if err != nil { - utils.Logger().Info().Err(err).Msg("redis subscribe shard update error") + utils.Logger().Warn().Err(err).Msg("redis subscribe txpool update error") continue } cb(txu.Tx, txu.Local) diff --git a/node/api.go b/node/api.go index caad39307..131a7c806 100644 --- a/node/api.go +++ b/node/api.go @@ -5,6 +5,7 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/eth/rpc" "github.com/harmony-one/harmony/hmy" + "github.com/harmony-one/harmony/internal/tikv" "github.com/harmony-one/harmony/rosetta" hmy_rpc "github.com/harmony-one/harmony/rpc" rpc_common "github.com/harmony-one/harmony/rpc/common" @@ -92,14 +93,22 @@ func (node *Node) StopRosetta() error { // APIs return the collection of local RPC services. // NOTE, some of these services probably need to be moved to somewhere else. func (node *Node) APIs(harmony *hmy.Harmony) []rpc.API { + hmyFilter := filters.NewPublicFilterAPI(harmony, false, "hmy", harmony.ShardID) + ethFilter := filters.NewPublicFilterAPI(harmony, false, "eth", harmony.ShardID) + + if node.HarmonyConfig.General.RunElasticMode && node.HarmonyConfig.TiKV.Role == tikv.RoleReader { + hmyFilter.Service.(*filters.PublicFilterAPI).SyncNewFilterFromOtherReaders() + ethFilter.Service.(*filters.PublicFilterAPI).SyncNewFilterFromOtherReaders() + } + // Append all the local APIs and return return []rpc.API{ hmy_rpc.NewPublicNetAPI(node.host, harmony.ChainID, hmy_rpc.V1), hmy_rpc.NewPublicNetAPI(node.host, harmony.ChainID, hmy_rpc.V2), hmy_rpc.NewPublicNetAPI(node.host, harmony.ChainID, hmy_rpc.Eth), hmy_rpc.NewPublicWeb3API(), - filters.NewPublicFilterAPI(harmony, false, "hmy"), - filters.NewPublicFilterAPI(harmony, false, "eth"), + hmyFilter, + ethFilter, } } diff --git a/node/node.go b/node/node.go index a38a58ce9..3951f160f 100644 --- a/node/node.go +++ b/node/node.go @@ -1090,7 +1090,7 @@ func New( if node.Blockchain().IsTikvWriterMaster() { err := redis_helper.PublishTxPoolUpdate(uint32(harmonyconfig.General.ShardID), tx, local) if err != nil { - utils.Logger().Info().Err(err).Msg("redis publish txpool update error") + utils.Logger().Warn().Err(err).Msg("redis publish txpool update error") } } } @@ -1436,13 +1436,13 @@ func (node *Node) syncFromTiKVWriter() { select { case <-doneChan: return - case <-time.After(5 * time.Minute): + case <-time.After(2 * time.Minute): buf := bytes.NewBuffer(nil) err := pprof.Lookup("goroutine").WriteTo(buf, 1) if err != nil { panic(err) } - err = ioutil.WriteFile(fmt.Sprintf("/tmp/%s", time.Now().Format("hmy_0102150405.error.log")), buf.Bytes(), 0644) + err = ioutil.WriteFile(fmt.Sprintf("/local/%s", time.Now().Format("hmy_0102150405.error.log")), buf.Bytes(), 0644) if err != nil { panic(err) } @@ -1452,8 +1452,7 @@ func (node *Node) syncFromTiKVWriter() { }() defer close(doneChan) - err := bc.SyncFromTiKVWriter(blkNum, logs) - if err != nil { + if err := bc.SyncFromTiKVWriter(blkNum, logs); err != nil { utils.Logger().Warn(). Err(err). Msg("cannot sync block from tikv writer") diff --git a/rpc/filters/api.go b/rpc/filters/api.go index 22eef3a8f..84c171cc1 100644 --- a/rpc/filters/api.go +++ b/rpc/filters/api.go @@ -6,6 +6,10 @@ import ( "sync" "time" + "github.com/harmony-one/harmony/internal/utils" + + "github.com/harmony-one/harmony/internal/tikv/redis_helper" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/block" @@ -42,15 +46,17 @@ type PublicFilterAPI struct { filtersMu sync.Mutex filters map[rpc.ID]*filter namespace string + shardID uint32 } // NewPublicFilterAPI returns a new PublicFilterAPI instance. -func NewPublicFilterAPI(backend Backend, lightMode bool, namespace string) rpc.API { +func NewPublicFilterAPI(backend Backend, lightMode bool, namespace string, shardID uint32) rpc.API { api := &PublicFilterAPI{ backend: backend, events: NewEventSystem(backend, lightMode, namespace == "eth"), filters: make(map[rpc.ID]*filter), namespace: namespace, + shardID: shardID, } go api.timeoutLoop() @@ -306,7 +312,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc matchedLogs = make(chan []*types.Log) ) - logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs) + logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs, nil) if err != nil { return nil, err } @@ -349,14 +355,30 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { timer := hmy_rpc.DoMetricRPCRequest(hmy_rpc.NewFilter) defer hmy_rpc.DoRPCRequestDuration(hmy_rpc.NewFilter, timer) + id := rpc.NewID() + if err := redis_helper.PublishNewFilterLogEvent(api.shardID, api.namespace, string(id), ethereum.FilterQuery(crit)); err != nil { + return "", fmt.Errorf("sending filter logs to other readers: %w", err) + } + + time.Sleep(time.Millisecond * 70) // to give time to the other readers to catch up + return api.createFilter(ethereum.FilterQuery(crit), id) +} + +func (api *PublicFilterAPI) createFilter(crit ethereum.FilterQuery, customId rpc.ID) (rpc.ID, error) { logs := make(chan []*types.Log) - logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs) + logsSub, err := api.events.SubscribeLogs(crit, logs, &customId) if err != nil { - return rpc.ID(""), err + return "", err } api.filtersMu.Lock() - api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub} + api.filters[logsSub.ID] = &filter{ + typ: LogsSubscription, + crit: FilterCriteria(crit), + deadline: time.NewTimer(deadline), + logs: make([]*types.Log, 0), + s: logsSub, + } api.filtersMu.Unlock() go func() { @@ -380,6 +402,17 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { return logsSub.ID, nil } +func (api *PublicFilterAPI) SyncNewFilterFromOtherReaders() { + go redis_helper.SubscribeNewFilterLogEvent(api.shardID, api.namespace, func(id string, crit ethereum.FilterQuery) { + if _, err := api.createFilter(crit, rpc.ID(id)); err != nil { + utils.Logger().Warn(). + Uint32("shardID", api.shardID). + Err(err). + Msg("unable to sync filters from other readers") + } + }) +} + // GetLogs returns logs matching the given argument that are stored within the state. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs diff --git a/rpc/filters/filter_system.go b/rpc/filters/filter_system.go index 915e6ec46..592ea4713 100644 --- a/rpc/filters/filter_system.go +++ b/rpc/filters/filter_system.go @@ -193,7 +193,7 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription { // SubscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. Default value for the from and to // block is "latest". If the fromBlock > toBlock an error is returned. -func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) { +func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log, customID *rpc.ID) (*Subscription, error) { var from, to rpc.BlockNumber if crit.FromBlock == nil { from = rpc.LatestBlockNumber @@ -208,32 +208,38 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ // only interested in pending logs if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber { - return es.subscribePendingLogs(crit, logs), nil + return es.subscribePendingLogs(crit, logs, customID), nil } // only interested in new mined logs if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber { - return es.subscribeLogs(crit, logs), nil + return es.subscribeLogs(crit, logs, customID), nil } // only interested in mined logs within a specific block range if from >= 0 && to >= 0 && to >= from { - return es.subscribeLogs(crit, logs), nil + return es.subscribeLogs(crit, logs, customID), nil } // interested in mined logs from a specific block number, new logs and pending logs if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber { - return es.subscribeMinedPendingLogs(crit, logs), nil + return es.subscribeMinedPendingLogs(crit, logs, customID), nil } // interested in logs from a specific block number to new mined blocks if from >= 0 && to == rpc.LatestBlockNumber { - return es.subscribeLogs(crit, logs), nil + return es.subscribeLogs(crit, logs, customID), nil } return nil, fmt.Errorf("invalid from and to block combination: from > to") } // subscribeMinedPendingLogs creates a subscription that returned mined and // pending logs that match the given criteria. -func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { +func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log, customID *rpc.ID) *Subscription { + var id rpc.ID + if customID != nil { + id = *customID + } else { + id = rpc.NewID() + } sub := &subscription{ - id: rpc.NewID(), + id: id, typ: MinedAndPendingLogsSubscription, logsCrit: crit, created: time.Now(), @@ -248,9 +254,15 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs // subscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. -func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { +func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log, customID *rpc.ID) *Subscription { + var id rpc.ID + if customID != nil { + id = *customID + } else { + id = rpc.NewID() + } sub := &subscription{ - id: rpc.NewID(), + id: id, typ: LogsSubscription, logsCrit: crit, created: time.Now(), @@ -265,9 +277,15 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ // subscribePendingLogs creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. -func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { +func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log, customID *rpc.ID) *Subscription { + var id rpc.ID + if customID != nil { + id = *customID + } else { + id = rpc.NewID() + } sub := &subscription{ - id: rpc.NewID(), + id: id, typ: PendingLogsSubscription, logsCrit: crit, created: time.Now(),