You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
195 lines
5.2 KiB
195 lines
5.2 KiB
package redis_helper
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/ethereum/go-ethereum"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
"github.com/harmony-one/harmony/core/types"
|
|
"github.com/harmony-one/harmony/internal/utils"
|
|
stakingTypes "github.com/harmony-one/harmony/staking/types"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// BlockUpdate block update event
|
|
type BlockUpdate struct {
|
|
BlkNum uint64
|
|
Logs []*types.LogForStorage // types.Log will cut some fields when rpl encoding/decoding look at core/types/log.go:83
|
|
}
|
|
|
|
// NewFilterUpdated new filter update event
|
|
type NewFilterUpdated struct {
|
|
ID string
|
|
FilterCriteria ethereum.FilterQuery
|
|
}
|
|
|
|
// SubscribeShardUpdate subscribe block update event
|
|
func SubscribeShardUpdate(shardID uint32, cb func(blkNum uint64, logs []*types.Log)) {
|
|
pubsub := redisInstance.Subscribe(context.Background(), fmt.Sprintf("shard_update_%d", shardID))
|
|
for message := range pubsub.Channel() {
|
|
block := &BlockUpdate{}
|
|
err := rlp.DecodeBytes([]byte(message.Payload), block)
|
|
if err != nil {
|
|
utils.Logger().Warn().Err(err).Msg("redis subscribe shard update error")
|
|
continue
|
|
}
|
|
logs := make([]*types.Log, len(block.Logs))
|
|
for i, l := range block.Logs {
|
|
if l != nil {
|
|
ls := types.Log(*l)
|
|
logs[i] = &ls
|
|
} else {
|
|
logs[i] = nil
|
|
}
|
|
}
|
|
|
|
cb(block.BlkNum, logs)
|
|
}
|
|
}
|
|
|
|
// PublishShardUpdate publish block update event
|
|
func PublishShardUpdate(shardID uint32, blkNum uint64, logs []*types.Log) error {
|
|
logsForStorage := make([]*types.LogForStorage, len(logs))
|
|
for i, l := range logs {
|
|
if l != nil {
|
|
ls := types.LogForStorage(*l)
|
|
logsForStorage[i] = &ls
|
|
} else {
|
|
logsForStorage[i] = nil
|
|
}
|
|
}
|
|
msg, err := rlp.EncodeToBytes(&BlockUpdate{
|
|
BlkNum: blkNum,
|
|
Logs: logsForStorage,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return redisInstance.Publish(context.Background(), fmt.Sprintf("shard_update_%d", shardID), msg).Err()
|
|
}
|
|
|
|
// SubscribeNewFilterLogEvent subscribe new filter log event from other readers
|
|
func SubscribeNewFilterLogEvent(shardID uint32, namespace string, cb func(id string, crit ethereum.FilterQuery)) {
|
|
if redisInstance == nil {
|
|
return
|
|
}
|
|
pubsub := redisInstance.
|
|
Subscribe(context.Background(), fmt.Sprintf("%s_new_filter_log_%d", namespace, shardID))
|
|
for message := range pubsub.Channel() {
|
|
query := NewFilterUpdated{}
|
|
|
|
if err := json.Unmarshal([]byte(message.Payload), &query); err != nil {
|
|
utils.Logger().Warn().Err(err).Msg("redis subscribe new_filter_log error")
|
|
continue
|
|
}
|
|
|
|
cb(query.ID, query.FilterCriteria)
|
|
}
|
|
}
|
|
|
|
// PublishNewFilterLogEvent publish new filter log event from other readers
|
|
func PublishNewFilterLogEvent(shardID uint32, namespace, id string, crit ethereum.FilterQuery) error {
|
|
if redisInstance == nil {
|
|
return nil
|
|
}
|
|
|
|
ev := NewFilterUpdated{
|
|
ID: id,
|
|
FilterCriteria: crit,
|
|
}
|
|
msg, err := json.Marshal(ev)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return redisInstance.
|
|
Publish(context.Background(), fmt.Sprintf("%s_new_filter_log_%d", namespace, shardID), msg).Err()
|
|
}
|
|
|
|
//TxPoolUpdate tx pool update event
|
|
type TxPoolUpdate struct {
|
|
typ string
|
|
Local bool
|
|
Tx types.PoolTransaction
|
|
}
|
|
|
|
// DecodeRLP decode struct from binary stream
|
|
func (t *TxPoolUpdate) DecodeRLP(stream *rlp.Stream) error {
|
|
if err := stream.Decode(&t.typ); err != nil {
|
|
return err
|
|
}
|
|
if err := stream.Decode(&t.Local); err != nil {
|
|
return err
|
|
}
|
|
|
|
switch t.typ {
|
|
case "types.EthTransaction":
|
|
var tmp = &types.EthTransaction{}
|
|
if err := stream.Decode(tmp); err != nil {
|
|
return err
|
|
}
|
|
t.Tx = tmp
|
|
case "types.Transaction":
|
|
var tmp = &types.Transaction{}
|
|
if err := stream.Decode(tmp); err != nil {
|
|
return err
|
|
}
|
|
t.Tx = tmp
|
|
case "stakingTypes.StakingTransaction":
|
|
var tmp = &stakingTypes.StakingTransaction{}
|
|
if err := stream.Decode(tmp); err != nil {
|
|
return err
|
|
}
|
|
t.Tx = tmp
|
|
default:
|
|
return errors.New("unknown txpool type")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// EncodeRLP encode struct to binary stream
|
|
func (t *TxPoolUpdate) EncodeRLP(w io.Writer) error {
|
|
switch t.Tx.(type) {
|
|
case *types.EthTransaction:
|
|
t.typ = "types.EthTransaction"
|
|
case *types.Transaction:
|
|
t.typ = "types.Transaction"
|
|
case *stakingTypes.StakingTransaction:
|
|
t.typ = "stakingTypes.StakingTransaction"
|
|
}
|
|
|
|
if err := rlp.Encode(w, t.typ); err != nil {
|
|
return err
|
|
}
|
|
if err := rlp.Encode(w, t.Local); err != nil {
|
|
return err
|
|
}
|
|
return rlp.Encode(w, t.Tx)
|
|
}
|
|
|
|
// SubscribeTxPoolUpdate subscribe tx pool update event
|
|
func SubscribeTxPoolUpdate(shardID uint32, cb func(tx types.PoolTransaction, local bool)) {
|
|
pubsub := redisInstance.Subscribe(context.Background(), fmt.Sprintf("txpool_update_%d", shardID))
|
|
for message := range pubsub.Channel() {
|
|
txu := &TxPoolUpdate{}
|
|
err := rlp.DecodeBytes([]byte(message.Payload), &txu)
|
|
if err != nil {
|
|
utils.Logger().Warn().Err(err).Msg("redis subscribe txpool update error")
|
|
continue
|
|
}
|
|
cb(txu.Tx, txu.Local)
|
|
}
|
|
}
|
|
|
|
// PublishTxPoolUpdate publish tx pool update event
|
|
func PublishTxPoolUpdate(shardID uint32, tx types.PoolTransaction, local bool) error {
|
|
txu := &TxPoolUpdate{Local: local, Tx: tx}
|
|
msg, err := rlp.EncodeToBytes(txu)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return redisInstance.Publish(context.Background(), fmt.Sprintf("txpool_update_%d", shardID), msg).Err()
|
|
}
|
|
|