You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
434 lines
10 KiB
434 lines
10 KiB
package explorer
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/harmony-one/abool"
|
|
"github.com/harmony-one/harmony/core"
|
|
core2 "github.com/harmony-one/harmony/core"
|
|
"github.com/harmony-one/harmony/core/types"
|
|
common2 "github.com/harmony-one/harmony/internal/common"
|
|
"github.com/harmony-one/harmony/internal/utils"
|
|
staking "github.com/harmony-one/harmony/staking/types"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
const (
|
|
numWorker = 8
|
|
)
|
|
|
|
// ErrExplorerNotReady is the error when querying explorer db data when
|
|
// explorer db is doing migration and unavailable
|
|
var ErrExplorerNotReady = errors.New("explorer db not ready")
|
|
|
|
type (
|
|
storage struct {
|
|
db database
|
|
bc *core.BlockChain
|
|
|
|
// TODO: optimize this with priority queue
|
|
tm *taskManager
|
|
resultC chan blockResult
|
|
resultT chan *traceResult
|
|
|
|
available *abool.AtomicBool
|
|
closeC chan struct{}
|
|
log zerolog.Logger
|
|
}
|
|
|
|
blockResult struct {
|
|
btc batch
|
|
bn uint64
|
|
}
|
|
|
|
traceResult struct {
|
|
btc batch
|
|
hash common.Hash
|
|
data []byte
|
|
}
|
|
)
|
|
|
|
func newStorage(bc *core.BlockChain, dbPath string) (*storage, error) {
|
|
utils.Logger().Info().Msg("explorer storage folder: " + dbPath)
|
|
db, err := newLvlDB(dbPath)
|
|
if err != nil {
|
|
utils.Logger().Error().Err(err).Msg("Failed to create new database")
|
|
return nil, err
|
|
}
|
|
return &storage{
|
|
db: db,
|
|
bc: bc,
|
|
tm: newTaskManager(),
|
|
resultC: make(chan blockResult, numWorker),
|
|
resultT: make(chan *traceResult, numWorker),
|
|
available: abool.New(),
|
|
closeC: make(chan struct{}),
|
|
log: utils.Logger().With().Str("module", "explorer storage").Logger(),
|
|
}, nil
|
|
}
|
|
|
|
func (s *storage) Start() {
|
|
go s.run()
|
|
}
|
|
|
|
func (s *storage) Close() {
|
|
close(s.closeC)
|
|
}
|
|
|
|
func (s *storage) TraceNewBlock(hash common.Hash, data []byte) {
|
|
s.tm.AddNewTraceTask(hash, data)
|
|
}
|
|
|
|
func (s *storage) DumpNewBlock(b *types.Block) {
|
|
s.tm.AddNewTask(b)
|
|
}
|
|
|
|
func (s *storage) DumpCatchupBlock(b *types.Block) {
|
|
for s.tm.HasPendingTasks() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
s.tm.AddCatchupTask(b)
|
|
}
|
|
|
|
func (s *storage) GetAddresses(size int, startAddress oneAddress) ([]string, error) {
|
|
if !s.available.IsSet() {
|
|
return nil, ErrExplorerNotReady
|
|
}
|
|
addrs, err := getAddressesInRange(s.db, startAddress, size)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
display := make([]string, 0, len(addrs))
|
|
for _, addr := range addrs {
|
|
display = append(display, string(addr))
|
|
}
|
|
return display, nil
|
|
}
|
|
|
|
func (s *storage) GetNormalTxsByAddress(addr string) ([]common.Hash, []TxType, error) {
|
|
if !s.available.IsSet() {
|
|
return nil, nil, ErrExplorerNotReady
|
|
}
|
|
return getNormalTxnHashesByAccount(s.db, oneAddress(addr))
|
|
}
|
|
|
|
func (s *storage) GetStakingTxsByAddress(addr string) ([]common.Hash, []TxType, error) {
|
|
if !s.available.IsSet() {
|
|
return nil, nil, ErrExplorerNotReady
|
|
}
|
|
return getStakingTxnHashesByAccount(s.db, oneAddress(addr))
|
|
}
|
|
|
|
func (s *storage) GetTraceDataByHash(hash common.Hash) (json.RawMessage, error) {
|
|
if !s.available.IsSet() {
|
|
return nil, ErrExplorerNotReady
|
|
}
|
|
return getTraceData(s.db, hash)
|
|
}
|
|
|
|
func (s *storage) run() {
|
|
if is, err := isVersionV100(s.db); !is || err != nil {
|
|
s.available.UnSet()
|
|
err := s.migrateToV100()
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msg("Failed to migrate explorer DB!")
|
|
fmt.Println("Failed to migrate explorer DB:", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
s.available.Set()
|
|
go s.loop()
|
|
}
|
|
|
|
func (s *storage) loop() {
|
|
s.makeWorkersAndStart()
|
|
for {
|
|
select {
|
|
case res := <-s.resultC:
|
|
s.log.Info().Uint64("block number", res.bn).Msg("writing explorer DB")
|
|
if err := res.btc.Write(); err != nil {
|
|
s.log.Error().Err(err).Msg("explorer db failed to write")
|
|
}
|
|
case res := <-s.resultT:
|
|
s.log.Info().Str("block hash", res.hash.Hex()).Msg("writing trace into explorer DB")
|
|
if err := res.btc.Write(); err != nil {
|
|
s.log.Error().Err(err).Msg("explorer db failed to write trace data")
|
|
}
|
|
|
|
case <-s.closeC:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type taskManager struct {
|
|
blocksHP []*types.Block // blocks with high priorities
|
|
blocksLP []*types.Block // blocks with low priorities
|
|
lock sync.Mutex
|
|
|
|
C chan struct{}
|
|
T chan *traceResult
|
|
}
|
|
|
|
func newTaskManager() *taskManager {
|
|
return &taskManager{
|
|
C: make(chan struct{}, numWorker),
|
|
T: make(chan *traceResult, numWorker),
|
|
}
|
|
}
|
|
|
|
func (tm *taskManager) AddNewTask(b *types.Block) {
|
|
tm.lock.Lock()
|
|
defer tm.lock.Unlock()
|
|
|
|
tm.blocksHP = append(tm.blocksHP, b)
|
|
select {
|
|
case tm.C <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (tm *taskManager) AddNewTraceTask(hash common.Hash, data []byte) {
|
|
tm.T <- &traceResult{
|
|
hash: hash,
|
|
data: data,
|
|
}
|
|
}
|
|
|
|
func (tm *taskManager) AddCatchupTask(b *types.Block) {
|
|
tm.lock.Lock()
|
|
defer tm.lock.Unlock()
|
|
|
|
tm.blocksLP = append(tm.blocksLP, b)
|
|
select {
|
|
case tm.C <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (tm *taskManager) HasPendingTasks() bool {
|
|
tm.lock.Lock()
|
|
defer tm.lock.Unlock()
|
|
|
|
return len(tm.blocksLP) != 0 || len(tm.blocksHP) != 0
|
|
}
|
|
|
|
func (tm *taskManager) PullTask() *types.Block {
|
|
tm.lock.Lock()
|
|
defer tm.lock.Unlock()
|
|
|
|
if len(tm.blocksHP) != 0 {
|
|
b := tm.blocksHP[0]
|
|
tm.blocksHP = tm.blocksHP[1:]
|
|
return b
|
|
}
|
|
if len(tm.blocksLP) != 0 {
|
|
b := tm.blocksLP[0]
|
|
tm.blocksLP = tm.blocksLP[1:]
|
|
return b
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *storage) makeWorkersAndStart() {
|
|
workers := make([]*blockComputer, 0, numWorker)
|
|
for i := 0; i != numWorker; i++ {
|
|
workers = append(workers, &blockComputer{
|
|
tm: s.tm,
|
|
db: s.db,
|
|
bc: s.bc,
|
|
resultC: s.resultC,
|
|
resultT: s.resultT,
|
|
closeC: s.closeC,
|
|
log: s.log.With().Int("worker", i).Logger(),
|
|
})
|
|
}
|
|
for _, worker := range workers {
|
|
go worker.loop()
|
|
}
|
|
}
|
|
|
|
type blockComputer struct {
|
|
tm *taskManager
|
|
db database
|
|
bc blockChainTxIndexer
|
|
resultC chan blockResult
|
|
resultT chan *traceResult
|
|
closeC chan struct{}
|
|
log zerolog.Logger
|
|
}
|
|
|
|
func (bc *blockComputer) loop() {
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case <-bc.tm.C:
|
|
for {
|
|
b := bc.tm.PullTask()
|
|
if b == nil {
|
|
goto LOOP
|
|
}
|
|
res, err := bc.computeBlock(b)
|
|
if err != nil {
|
|
bc.log.Error().Err(err).Str("hash", b.Hash().String()).
|
|
Uint64("number", b.NumberU64()).
|
|
Msg("explorer failed to handle block")
|
|
continue
|
|
}
|
|
if res == nil {
|
|
continue
|
|
}
|
|
select {
|
|
case bc.resultC <- *res:
|
|
case <-bc.closeC:
|
|
return
|
|
}
|
|
}
|
|
case traceData := <-bc.tm.T:
|
|
if exist, err := isTraceDataInDB(bc.db, traceData.hash); exist || err != nil {
|
|
continue
|
|
}
|
|
traceData.btc = bc.db.NewBatch()
|
|
_ = writeTraceData(traceData.btc, traceData.hash, traceData.data)
|
|
select {
|
|
case bc.resultT <- traceData:
|
|
case <-bc.closeC:
|
|
return
|
|
}
|
|
case <-bc.closeC:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (bc *blockComputer) computeBlock(b *types.Block) (*blockResult, error) {
|
|
is, err := isBlockComputedInDB(bc.db, b.NumberU64())
|
|
if is || err != nil {
|
|
return nil, err
|
|
}
|
|
btc := bc.db.NewBatch()
|
|
|
|
for _, tx := range b.Transactions() {
|
|
bc.computeNormalTx(btc, b, tx)
|
|
}
|
|
for _, stk := range b.StakingTransactions() {
|
|
bc.computeStakingTx(btc, b, stk)
|
|
}
|
|
_ = writeCheckpoint(btc, b.NumberU64())
|
|
return &blockResult{
|
|
btc: btc,
|
|
bn: b.NumberU64(),
|
|
}, nil
|
|
}
|
|
|
|
func (bc *blockComputer) computeNormalTx(btc batch, b *types.Block, tx *types.Transaction) {
|
|
ethFrom, _ := tx.SenderAddress()
|
|
from := ethToOneAddress(ethFrom)
|
|
|
|
_ = writeAddressEntry(btc, from)
|
|
|
|
_, bn, index := bc.bc.ReadTxLookupEntry(tx.HashByType())
|
|
_ = writeNormalTxnIndex(btc, normalTxnIndex{
|
|
addr: from,
|
|
blockNumber: bn,
|
|
txnIndex: index,
|
|
txnHash: tx.HashByType(),
|
|
}, txSent)
|
|
|
|
ethTo := tx.To()
|
|
if ethTo != nil { // Skip for contract creation
|
|
to := ethToOneAddress(*ethTo)
|
|
_ = writeAddressEntry(btc, to)
|
|
_ = writeNormalTxnIndex(btc, normalTxnIndex{
|
|
addr: to,
|
|
blockNumber: bn,
|
|
txnIndex: index,
|
|
txnHash: tx.HashByType(),
|
|
}, txReceived)
|
|
}
|
|
// Not very sure how this 1000 come from. But it is the logic before migration
|
|
t := b.Time().Uint64() * 1000
|
|
tr := &TxRecord{tx.HashByType(), time.Unix(int64(t), 0)}
|
|
_ = writeTxn(btc, tx.HashByType(), tr)
|
|
}
|
|
|
|
func (bc *blockComputer) computeStakingTx(btc batch, b *types.Block, tx *staking.StakingTransaction) {
|
|
ethFrom, _ := tx.SenderAddress()
|
|
from := ethToOneAddress(ethFrom)
|
|
_ = writeAddressEntry(btc, from)
|
|
_, bn, index := bc.bc.ReadTxLookupEntry(tx.Hash())
|
|
_ = writeStakingTxnIndex(btc, stakingTxnIndex{
|
|
addr: from,
|
|
blockNumber: bn,
|
|
txnIndex: index,
|
|
txnHash: tx.Hash(),
|
|
}, txSent)
|
|
t := b.Time().Uint64() * 1000
|
|
tr := &TxRecord{tx.Hash(), time.Unix(int64(t), 0)}
|
|
_ = writeTxn(btc, tx.Hash(), tr)
|
|
|
|
ethTo, _ := toFromStakingTx(tx, b)
|
|
if ethTo == (common.Address{}) {
|
|
return
|
|
}
|
|
to := ethToOneAddress(ethTo)
|
|
_ = writeAddressEntry(btc, to)
|
|
_ = writeStakingTxnIndex(btc, stakingTxnIndex{
|
|
addr: to,
|
|
blockNumber: bn,
|
|
txnIndex: index,
|
|
txnHash: tx.Hash(),
|
|
}, txReceived)
|
|
}
|
|
|
|
func ethToOneAddress(ethAddr common.Address) oneAddress {
|
|
raw, _ := common2.AddressToBech32(ethAddr)
|
|
return oneAddress(raw)
|
|
}
|
|
|
|
func toFromStakingTx(tx *staking.StakingTransaction, addressBlock *types.Block) (common.Address, error) {
|
|
msg, err := core2.StakingToMessage(tx, addressBlock.Header().Number())
|
|
if err != nil {
|
|
utils.Logger().Error().Err(err).Msg("Error when parsing tx into message")
|
|
return common.Address{}, err
|
|
}
|
|
|
|
switch tx.StakingType() {
|
|
case staking.DirectiveDelegate:
|
|
stkMsg, err := staking.RLPDecodeStakeMsg(tx.Data(), staking.DirectiveDelegate)
|
|
if err != nil {
|
|
return common.Address{}, err
|
|
}
|
|
if _, ok := stkMsg.(*staking.Delegate); !ok {
|
|
return common.Address{}, core2.ErrInvalidMsgForStakingDirective
|
|
}
|
|
delegateMsg := stkMsg.(*staking.Delegate)
|
|
if !bytes.Equal(msg.From().Bytes()[:], delegateMsg.DelegatorAddress.Bytes()[:]) {
|
|
return common.Address{}, core2.ErrInvalidSender
|
|
}
|
|
return delegateMsg.ValidatorAddress, nil
|
|
|
|
case staking.DirectiveUndelegate:
|
|
stkMsg, err := staking.RLPDecodeStakeMsg(tx.Data(), staking.DirectiveUndelegate)
|
|
if err != nil {
|
|
return common.Address{}, err
|
|
}
|
|
if _, ok := stkMsg.(*staking.Undelegate); !ok {
|
|
return common.Address{}, core2.ErrInvalidMsgForStakingDirective
|
|
}
|
|
undelegateMsg := stkMsg.(*staking.Undelegate)
|
|
if !bytes.Equal(msg.From().Bytes()[:], undelegateMsg.DelegatorAddress.Bytes()[:]) {
|
|
return common.Address{}, core2.ErrInvalidSender
|
|
}
|
|
return undelegateMsg.ValidatorAddress, nil
|
|
default:
|
|
return common.Address{}, nil
|
|
}
|
|
}
|
|
|