Fix/filter new tikv (#4275)

* fix: elastic rpc new filters fix

* fix: add a small delay to give readers time to catch up

* fix: run the subscribe in a go routine

* fix: generate the filter id before

* fix: rpl not able to marshal negative big int

* fix: use json to get the message
pull/4279/head
Diego Nava 2 years ago committed by GitHub
parent 44c1d52e83
commit 0425c7c1df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      core/blockchain_impl.go
  2. 75
      internal/tikv/redis_helper/pubsub.go
  3. 13
      node/api.go
  4. 9
      node/node.go
  5. 43
      rpc/filters/api.go
  6. 42
      rpc/filters/filter_system.go

@ -1601,7 +1601,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
if bc.isInitTiKV() {
err = redis_helper.PublishShardUpdate(bc.ShardID(), block.NumberU64(), logs)
if err != nil {
utils.Logger().Info().Err(err).Msg("redis publish shard update error")
utils.Logger().Warn().Err(err).Msg("redis publish shard update error")
}
}

@ -2,9 +2,11 @@ package redis_helper
import (
"context"
"encoding/json"
"fmt"
"io"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
@ -15,7 +17,13 @@ import (
// BlockUpdate block update event
type BlockUpdate struct {
BlkNum uint64
Logs []*types.Log
Logs []*types.LogForStorage // types.Log will cut some fields when rpl encoding/decoding look at core/types/log.go:83
}
// NewFilterUpdated new filter update event
type NewFilterUpdated struct {
ID string
FilterCriteria ethereum.FilterQuery
}
// SubscribeShardUpdate subscribe block update event
@ -25,18 +33,37 @@ func SubscribeShardUpdate(shardID uint32, cb func(blkNum uint64, logs []*types.L
block := &BlockUpdate{}
err := rlp.DecodeBytes([]byte(message.Payload), block)
if err != nil {
utils.Logger().Info().Err(err).Msg("redis subscribe shard update error")
utils.Logger().Warn().Err(err).Msg("redis subscribe shard update error")
continue
}
cb(block.BlkNum, block.Logs)
logs := make([]*types.Log, len(block.Logs))
for i, l := range block.Logs {
if l != nil {
ls := types.Log(*l)
logs[i] = &ls
} else {
logs[i] = nil
}
}
cb(block.BlkNum, logs)
}
}
// PublishShardUpdate publish block update event
func PublishShardUpdate(shardID uint32, blkNum uint64, logs []*types.Log) error {
logsForStorage := make([]*types.LogForStorage, len(logs))
for i, l := range logs {
if l != nil {
ls := types.LogForStorage(*l)
logsForStorage[i] = &ls
} else {
logsForStorage[i] = nil
}
}
msg, err := rlp.EncodeToBytes(&BlockUpdate{
BlkNum: blkNum,
Logs: logs,
Logs: logsForStorage,
})
if err != nil {
return err
@ -44,6 +71,44 @@ func PublishShardUpdate(shardID uint32, blkNum uint64, logs []*types.Log) error
return redisInstance.Publish(context.Background(), fmt.Sprintf("shard_update_%d", shardID), msg).Err()
}
// SubscribeNewFilterLogEvent subscribe new filter log event from other readers
func SubscribeNewFilterLogEvent(shardID uint32, namespace string, cb func(id string, crit ethereum.FilterQuery)) {
if redisInstance == nil {
return
}
pubsub := redisInstance.
Subscribe(context.Background(), fmt.Sprintf("%s_new_filter_log_%d", namespace, shardID))
for message := range pubsub.Channel() {
query := NewFilterUpdated{}
if err := json.Unmarshal([]byte(message.Payload), &query); err != nil {
utils.Logger().Warn().Err(err).Msg("redis subscribe new_filter_log error")
continue
}
cb(query.ID, query.FilterCriteria)
}
}
// PublishNewFilterLogEvent publish new filter log event from other readers
func PublishNewFilterLogEvent(shardID uint32, namespace, id string, crit ethereum.FilterQuery) error {
if redisInstance == nil {
return nil
}
ev := NewFilterUpdated{
ID: id,
FilterCriteria: crit,
}
msg, err := json.Marshal(ev)
if err != nil {
return err
}
return redisInstance.
Publish(context.Background(), fmt.Sprintf("%s_new_filter_log_%d", namespace, shardID), msg).Err()
}
//TxPoolUpdate tx pool update event
type TxPoolUpdate struct {
typ string
@ -112,7 +177,7 @@ func SubscribeTxPoolUpdate(shardID uint32, cb func(tx types.PoolTransaction, loc
txu := &TxPoolUpdate{}
err := rlp.DecodeBytes([]byte(message.Payload), &txu)
if err != nil {
utils.Logger().Info().Err(err).Msg("redis subscribe shard update error")
utils.Logger().Warn().Err(err).Msg("redis subscribe txpool update error")
continue
}
cb(txu.Tx, txu.Local)

@ -5,6 +5,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/eth/rpc"
"github.com/harmony-one/harmony/hmy"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/harmony-one/harmony/rosetta"
hmy_rpc "github.com/harmony-one/harmony/rpc"
rpc_common "github.com/harmony-one/harmony/rpc/common"
@ -92,14 +93,22 @@ func (node *Node) StopRosetta() error {
// APIs return the collection of local RPC services.
// NOTE, some of these services probably need to be moved to somewhere else.
func (node *Node) APIs(harmony *hmy.Harmony) []rpc.API {
hmyFilter := filters.NewPublicFilterAPI(harmony, false, "hmy", harmony.ShardID)
ethFilter := filters.NewPublicFilterAPI(harmony, false, "eth", harmony.ShardID)
if node.HarmonyConfig.General.RunElasticMode && node.HarmonyConfig.TiKV.Role == tikv.RoleReader {
hmyFilter.Service.(*filters.PublicFilterAPI).SyncNewFilterFromOtherReaders()
ethFilter.Service.(*filters.PublicFilterAPI).SyncNewFilterFromOtherReaders()
}
// Append all the local APIs and return
return []rpc.API{
hmy_rpc.NewPublicNetAPI(node.host, harmony.ChainID, hmy_rpc.V1),
hmy_rpc.NewPublicNetAPI(node.host, harmony.ChainID, hmy_rpc.V2),
hmy_rpc.NewPublicNetAPI(node.host, harmony.ChainID, hmy_rpc.Eth),
hmy_rpc.NewPublicWeb3API(),
filters.NewPublicFilterAPI(harmony, false, "hmy"),
filters.NewPublicFilterAPI(harmony, false, "eth"),
hmyFilter,
ethFilter,
}
}

@ -1090,7 +1090,7 @@ func New(
if node.Blockchain().IsTikvWriterMaster() {
err := redis_helper.PublishTxPoolUpdate(uint32(harmonyconfig.General.ShardID), tx, local)
if err != nil {
utils.Logger().Info().Err(err).Msg("redis publish txpool update error")
utils.Logger().Warn().Err(err).Msg("redis publish txpool update error")
}
}
}
@ -1436,13 +1436,13 @@ func (node *Node) syncFromTiKVWriter() {
select {
case <-doneChan:
return
case <-time.After(5 * time.Minute):
case <-time.After(2 * time.Minute):
buf := bytes.NewBuffer(nil)
err := pprof.Lookup("goroutine").WriteTo(buf, 1)
if err != nil {
panic(err)
}
err = ioutil.WriteFile(fmt.Sprintf("/tmp/%s", time.Now().Format("hmy_0102150405.error.log")), buf.Bytes(), 0644)
err = ioutil.WriteFile(fmt.Sprintf("/local/%s", time.Now().Format("hmy_0102150405.error.log")), buf.Bytes(), 0644)
if err != nil {
panic(err)
}
@ -1452,8 +1452,7 @@ func (node *Node) syncFromTiKVWriter() {
}()
defer close(doneChan)
err := bc.SyncFromTiKVWriter(blkNum, logs)
if err != nil {
if err := bc.SyncFromTiKVWriter(blkNum, logs); err != nil {
utils.Logger().Warn().
Err(err).
Msg("cannot sync block from tikv writer")

@ -6,6 +6,10 @@ import (
"sync"
"time"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/tikv/redis_helper"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/block"
@ -42,15 +46,17 @@ type PublicFilterAPI struct {
filtersMu sync.Mutex
filters map[rpc.ID]*filter
namespace string
shardID uint32
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, lightMode bool, namespace string) rpc.API {
func NewPublicFilterAPI(backend Backend, lightMode bool, namespace string, shardID uint32) rpc.API {
api := &PublicFilterAPI{
backend: backend,
events: NewEventSystem(backend, lightMode, namespace == "eth"),
filters: make(map[rpc.ID]*filter),
namespace: namespace,
shardID: shardID,
}
go api.timeoutLoop()
@ -306,7 +312,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
matchedLogs = make(chan []*types.Log)
)
logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs)
logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs, nil)
if err != nil {
return nil, err
}
@ -349,14 +355,30 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
timer := hmy_rpc.DoMetricRPCRequest(hmy_rpc.NewFilter)
defer hmy_rpc.DoRPCRequestDuration(hmy_rpc.NewFilter, timer)
id := rpc.NewID()
if err := redis_helper.PublishNewFilterLogEvent(api.shardID, api.namespace, string(id), ethereum.FilterQuery(crit)); err != nil {
return "", fmt.Errorf("sending filter logs to other readers: %w", err)
}
time.Sleep(time.Millisecond * 70) // to give time to the other readers to catch up
return api.createFilter(ethereum.FilterQuery(crit), id)
}
func (api *PublicFilterAPI) createFilter(crit ethereum.FilterQuery, customId rpc.ID) (rpc.ID, error) {
logs := make(chan []*types.Log)
logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs)
logsSub, err := api.events.SubscribeLogs(crit, logs, &customId)
if err != nil {
return rpc.ID(""), err
return "", err
}
api.filtersMu.Lock()
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub}
api.filters[logsSub.ID] = &filter{
typ: LogsSubscription,
crit: FilterCriteria(crit),
deadline: time.NewTimer(deadline),
logs: make([]*types.Log, 0),
s: logsSub,
}
api.filtersMu.Unlock()
go func() {
@ -380,6 +402,17 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
return logsSub.ID, nil
}
func (api *PublicFilterAPI) SyncNewFilterFromOtherReaders() {
go redis_helper.SubscribeNewFilterLogEvent(api.shardID, api.namespace, func(id string, crit ethereum.FilterQuery) {
if _, err := api.createFilter(crit, rpc.ID(id)); err != nil {
utils.Logger().Warn().
Uint32("shardID", api.shardID).
Err(err).
Msg("unable to sync filters from other readers")
}
})
}
// GetLogs returns logs matching the given argument that are stored within the state.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs

@ -193,7 +193,7 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription {
// SubscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel. Default value for the from and to
// block is "latest". If the fromBlock > toBlock an error is returned.
func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) {
func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log, customID *rpc.ID) (*Subscription, error) {
var from, to rpc.BlockNumber
if crit.FromBlock == nil {
from = rpc.LatestBlockNumber
@ -208,32 +208,38 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// only interested in pending logs
if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber {
return es.subscribePendingLogs(crit, logs), nil
return es.subscribePendingLogs(crit, logs, customID), nil
}
// only interested in new mined logs
if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
return es.subscribeLogs(crit, logs, customID), nil
}
// only interested in mined logs within a specific block range
if from >= 0 && to >= 0 && to >= from {
return es.subscribeLogs(crit, logs), nil
return es.subscribeLogs(crit, logs, customID), nil
}
// interested in mined logs from a specific block number, new logs and pending logs
if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber {
return es.subscribeMinedPendingLogs(crit, logs), nil
return es.subscribeMinedPendingLogs(crit, logs, customID), nil
}
// interested in logs from a specific block number to new mined blocks
if from >= 0 && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
return es.subscribeLogs(crit, logs, customID), nil
}
return nil, fmt.Errorf("invalid from and to block combination: from > to")
}
// subscribeMinedPendingLogs creates a subscription that returned mined and
// pending logs that match the given criteria.
func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log, customID *rpc.ID) *Subscription {
var id rpc.ID
if customID != nil {
id = *customID
} else {
id = rpc.NewID()
}
sub := &subscription{
id: rpc.NewID(),
id: id,
typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
@ -248,9 +254,15 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
// subscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log, customID *rpc.ID) *Subscription {
var id rpc.ID
if customID != nil {
id = *customID
} else {
id = rpc.NewID()
}
sub := &subscription{
id: rpc.NewID(),
id: id,
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
@ -265,9 +277,15 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// subscribePendingLogs creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log, customID *rpc.ID) *Subscription {
var id rpc.ID
if customID != nil {
id = *customID
} else {
id = rpc.NewID()
}
sub := &subscription{
id: rpc.NewID(),
id: id,
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),

Loading…
Cancel
Save