[explorer][tracer] store trace result into explorer db

pull/3818/head
peekpi 3 years ago
parent 698e363f0a
commit 054ba5d712
  1. 19
      api/service/explorer/schema.go
  2. 5
      api/service/explorer/service.go
  3. 47
      api/service/explorer/storage.go
  4. 14
      cmd/harmony/flags.go
  5. 4
      cmd/harmony/main.go
  6. 39
      core/blockchain.go
  7. 2
      hmy/hmy.go
  8. 2
      internal/configs/harmony/harmony.go
  9. 2
      internal/configs/node/config.go
  10. 6
      internal/shardchain/shardchains.go
  11. 3
      node/node.go
  12. 29
      node/node_explorer.go
  13. 3
      rpc/tracerParity.go

@ -16,6 +16,7 @@ import (
const (
LegAddressPrefix = "ad_"
CheckpointPrefix = "dc"
TracePrefix = "tr_"
oneAddrByteLen = 42 // byte size of string "one1..."
)
@ -36,6 +37,24 @@ func writeCheckpoint(db databaseWriter, bn uint64) error {
return db.Put(blockCheckpoint, []byte{})
}
func GetTraceDataKey(hash common.Hash) []byte {
return []byte(fmt.Sprintf("%s_%x", TracePrefix, hash))
}
func isTraceDataInDB(db databaseReader, hash common.Hash) (bool, error) {
key := GetTraceDataKey(hash)
return db.Has(key)
}
func writeTraceData(db databaseWriter, hash common.Hash, data []byte) error {
key := GetTraceDataKey(hash)
return db.Put(key, data)
}
func getTraceData(db databaseReader, hash common.Hash) ([]byte, error) {
key := GetTraceDataKey(hash)
return db.Get(key)
}
// New schema
var (

@ -187,6 +187,11 @@ func (s *Service) GetStakingTxHashesByAccount(address string) ([]ethCommon.Hash,
return s.storage.GetStakingTxsByAddress(address)
}
// TraceNewBlock instruct the explorer storage to trace data in explorer DB
func (s *Service) TraceNewBlock(hash ethCommon.Hash, data []byte) {
s.storage.TraceNewBlock(hash, data)
}
// DumpNewBlock instruct the explorer storage to dump block data in explorer DB
func (s *Service) DumpNewBlock(b *types.Block) {
s.storage.DumpNewBlock(b)

@ -2,6 +2,7 @@ package explorer
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
@ -35,6 +36,7 @@ type (
// TODO: optimize this with priority queue
tm *taskManager
resultC chan blockResult
resultT chan *traceResult
available *abool.AtomicBool
closeC chan struct{}
@ -45,6 +47,12 @@ type (
btc batch
bn uint64
}
traceResult struct {
btc batch
hash common.Hash
data []byte
}
)
func newStorage(bc *core.BlockChain, dbPath string) (*storage, error) {
@ -59,6 +67,7 @@ func newStorage(bc *core.BlockChain, dbPath string) (*storage, error) {
bc: bc,
tm: newTaskManager(),
resultC: make(chan blockResult, numWorker),
resultT: make(chan *traceResult, numWorker),
available: abool.New(),
closeC: make(chan struct{}),
log: utils.Logger().With().Str("module", "explorer storage").Logger(),
@ -73,6 +82,10 @@ func (s *storage) Close() {
close(s.closeC)
}
func (s *storage) TraceNewBlock(hash common.Hash, data []byte) {
s.tm.AddNewTraceTask(hash, data)
}
func (s *storage) DumpNewBlock(b *types.Block) {
s.tm.AddNewTask(b)
}
@ -113,6 +126,13 @@ func (s *storage) GetStakingTxsByAddress(addr string) ([]common.Hash, []TxType,
return getStakingTxnHashesByAccount(s.db, oneAddress(addr))
}
func (s *storage) GetTraceDataByHash(hash common.Hash) (json.RawMessage, error) {
if !s.available.IsSet() {
return nil, ErrExplorerNotReady
}
return getTraceData(s.db, hash)
}
func (s *storage) run() {
if is, err := isVersionV100(s.db); !is || err != nil {
s.available.UnSet()
@ -136,6 +156,11 @@ func (s *storage) loop() {
if err := res.btc.Write(); err != nil {
s.log.Error().Err(err).Msg("explorer db failed to write")
}
case res := <-s.resultT:
s.log.Info().Str("block hash", res.hash.Hex()).Msg("writing trace into explorer DB")
if err := res.btc.Write(); err != nil {
s.log.Error().Err(err).Msg("explorer db failed to write trace data")
}
case <-s.closeC:
return
@ -149,6 +174,7 @@ type taskManager struct {
lock sync.Mutex
C chan struct{}
T chan *traceResult
}
func newTaskManager() *taskManager {
@ -168,6 +194,13 @@ func (tm *taskManager) AddNewTask(b *types.Block) {
}
}
func (tm *taskManager) AddNewTraceTask(hash common.Hash, data []byte) {
tm.T <- &traceResult{
hash: hash,
data: data,
}
}
func (tm *taskManager) AddCatchupTask(b *types.Block) {
tm.lock.Lock()
defer tm.lock.Unlock()
@ -211,6 +244,7 @@ func (s *storage) makeWorkersAndStart() {
db: s.db,
bc: s.bc,
resultC: s.resultC,
resultT: s.resultT,
closeC: s.closeC,
log: s.log.With().Int("worker", i).Logger(),
})
@ -225,6 +259,7 @@ type blockComputer struct {
db database
bc blockChainTxIndexer
resultC chan blockResult
resultT chan *traceResult
closeC chan struct{}
log zerolog.Logger
}
@ -255,7 +290,17 @@ LOOP:
return
}
}
case traceData := <-bc.tm.T:
if exist, err := isTraceDataInDB(bc.db, traceData.hash); exist || err != nil {
break
}
traceData.btc = bc.db.NewBatch()
_ = writeTraceData(traceData.btc, traceData.hash, traceData.data)
select {
case bc.resultT <- traceData:
case <-bc.closeC:
return
}
case <-bc.closeC:
return
}

@ -29,7 +29,7 @@ var (
legacyIsArchiveFlag,
legacyDataDirFlag,
taraceDataDirFlag,
taraceFlag,
}
dnsSyncFlags = []cli.Flag{
@ -274,10 +274,10 @@ var (
Deprecated: "use --datadir",
}
taraceDataDirFlag = cli.StringFlag{
Name: "trace_dir",
Usage: "trace block database directory",
DefValue: "",
taraceFlag = cli.BoolFlag{
Name: "tracing",
Usage: "indicates if full transaction tracing should be enabled",
DefValue: false,
}
)
@ -351,8 +351,8 @@ func applyGeneralFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig)
config.General.IsOffline = cli.GetBoolFlagValue(cmd, isOfflineFlag)
}
if cli.IsFlagChanged(cmd, taraceDataDirFlag) {
config.General.TraceDir = cli.GetStringFlagValue(cmd, taraceDataDirFlag)
if cli.IsFlagChanged(cmd, taraceFlag) {
config.General.TraceEnable = cli.GetBoolFlagValue(cmd, taraceFlag)
}
}

@ -612,7 +612,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType,
nodeConfig.NtpServer = hc.Sys.NtpServer
nodeConfig.TraceDir = hc.General.TraceDir
nodeConfig.TraceEnable = hc.General.TraceEnable
return nodeConfig, nil
}
@ -655,7 +655,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
// Current node.
chainDBFactory := &shardchain.LDBFactory{RootDir: nodeConfig.DBDir}
currentNode := node.New(myHost, currentConsensus, chainDBFactory, nodeConfig.TraceDir, blacklist, nodeConfig.ArchiveModes(), &hc)
currentNode := node.New(myHost, currentConsensus, chainDBFactory, blacklist, nodeConfig.ArchiveModes(), &hc)
if hc.Legacy != nil && hc.Legacy.TPBroadcastInvalidTxn != nil {
currentNode.BroadcastInvalidTx = *hc.Legacy.TPBroadcastInvalidTxn

@ -45,7 +45,6 @@ import (
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/hmy/tracers"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/numeric"
@ -167,7 +166,6 @@ type BlockChain struct {
processor Processor // block processor interface
validator Validator // block and state validator interface
vmConfig vm.Config
traceDB string
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
pendingSlashes slash.Records
@ -179,7 +177,7 @@ type BlockChain struct {
// Processor.
func NewBlockChain(
db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig,
engine consensus_engine.Engine, vmConfig vm.Config, traceDB string,
engine consensus_engine.Engine, vmConfig vm.Config,
shouldPreserve func(block *types.Block) bool,
) (*BlockChain, error) {
if cacheConfig == nil {
@ -230,7 +228,6 @@ func NewBlockChain(
blockAccumulatorCache: blockAccumulatorCache,
engine: engine,
vmConfig: vmConfig,
traceDB: traceDB,
badBlocks: badBlocks,
pendingSlashes: slash.Records{},
maxGarbCollectedBlkNum: -1,
@ -258,8 +255,6 @@ func NewBlockChain(
return bc, nil
}
var randname = time.Now().Nanosecond()
// ValidateNewBlock validates new block.
func (bc *BlockChain) ValidateNewBlock(block *types.Block) error {
state, err := state.New(bc.CurrentBlock().Root(), bc.stateCache)
@ -1289,7 +1284,13 @@ func (bc *BlockChain) GetMaxGarbageCollectedBlockNumber() int64 {
//
// After insertion is done, all accumulated events will be fired.
func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) {
n, events, logs, err := bc.insertChain(chain, verifyHeaders)
n, events, logs, err := bc.insertChain(chain, verifyHeaders, nil)
bc.PostChainEvents(events, logs)
return n, err
}
func (bc *BlockChain) InsertAndTraceChain(chain types.Blocks, verifyHeaders bool, tracers []*vm.Config) (int, error) {
n, events, logs, err := bc.insertChain(chain, verifyHeaders, tracers)
bc.PostChainEvents(events, logs)
return n, err
}
@ -1297,7 +1298,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int,
// insertChain will execute the actual chain insertion and event aggregation. The
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int, []interface{}, []*types.Log, error) {
func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool, tracers []*vm.Config) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return 0, nil, nil, nil
@ -1425,7 +1426,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
if len(winner) > 0 {
// Import all the pruned blocks to make the state available
bc.chainmu.Unlock()
_, evs, logs, err := bc.insertChain(winner, true /* verifyHeaders */)
_, evs, logs, err := bc.insertChain(winner, true /* verifyHeaders */, nil)
bc.chainmu.Lock()
events, coalescedLogs = evs, logs
@ -1452,11 +1453,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
return i, events, coalescedLogs, err
}
vmConfig := bc.vmConfig
if bc.traceDB != "" {
vmConfig = vm.Config{
Debug: true,
Tracer: &tracers.ParityBlockTracer{},
}
if len(tracers) > i && tracers[i] != nil {
vmConfig = *tracers[i]
}
// Process block using the parent state as reference point.
receipts, cxReceipts, logs, usedGas, payout, err := bc.processor.Process(
@ -1480,19 +1478,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
status, err := bc.WriteBlockWithState(
block, receipts, cxReceipts, payout, state,
)
if bc.traceDB != "" {
if tracer, ok := vmConfig.Tracer.(*tracers.ParityBlockTracer); ok {
result := make([]json.RawMessage, 0)
var err error
if block.Transactions().Len() > 0 {
result, err = tracer.GetResult()
}
if err == nil {
b, _ := json.Marshal(result)
fmt.Println(block.NumberU64(), string(b))
}
}
}
if err != nil {
return i, events, coalescedLogs, err
}

@ -2,6 +2,7 @@ package hmy
import (
"context"
"encoding/json"
"math/big"
"github.com/ethereum/go-ethereum/common"
@ -86,6 +87,7 @@ type NodeAPI interface {
GetStakingTransactionsHistory(address, txType, order string) ([]common.Hash, error)
GetTransactionsCount(address, txType string) (uint64, error)
GetStakingTransactionsCount(address, txType string) (uint64, error)
GetTraceResultByHash(hash common.Hash) (json.RawMessage, error)
IsCurrentlyLeader() bool
IsOutOfSync(shardID uint32) bool
SyncStatus(shardID uint32) (bool, uint64, uint64)

@ -59,7 +59,7 @@ type GeneralConfig struct {
IsBeaconArchival bool
IsOffline bool
DataDir string
TraceDir string
TraceEnable bool
}
type ConsensusConfig struct {

@ -94,7 +94,7 @@ type ConfigType struct {
WebHooks struct {
Hooks *webhooks.Hooks
}
TraceDir string
TraceEnable bool
}
// RPCServerConfig is the config for rpc listen addresses

@ -40,7 +40,6 @@ type CollectionImpl struct {
pool map[uint32]*core.BlockChain
disableCache map[uint32]bool
chainConfig *params.ChainConfig
traceDB string
}
// NewCollection creates and returns a new shard chain collection.
@ -50,7 +49,7 @@ type CollectionImpl struct {
// dbInit is the shard chain initializer to use when the database returned by
// the factory is brand new (empty).
func NewCollection(
dbFactory DBFactory, dbInit DBInitializer, engine engine.Engine, traceDB string,
dbFactory DBFactory, dbInit DBInitializer, engine engine.Engine,
chainConfig *params.ChainConfig,
) *CollectionImpl {
return &CollectionImpl{
@ -60,7 +59,6 @@ func NewCollection(
pool: make(map[uint32]*core.BlockChain),
disableCache: make(map[uint32]bool),
chainConfig: chainConfig,
traceDB: traceDB,
}
}
@ -108,7 +106,7 @@ func (sc *CollectionImpl) ShardChain(shardID uint32) (*core.BlockChain, error) {
chainConfig.EthCompatibleChainID = big.NewInt(chainConfig.EthCompatibleShard0ChainID.Int64())
}
bc, err := core.NewBlockChain(
db, cacheConfig, &chainConfig, sc.engine, vm.Config{}, sc.traceDB, nil,
db, cacheConfig, &chainConfig, sc.engine, vm.Config{}, nil,
)
if err != nil {
return nil, errors.Wrapf(err, "cannot create blockchain")

@ -938,7 +938,6 @@ func New(
host p2p.Host,
consensusObj *consensus.Consensus,
chainDBFactory shardchain.DBFactory,
traceDB string,
blacklist map[common.Address]struct{},
isArchival map[uint32]bool,
harmonyconfig *harmonyconfig.HarmonyConfig,
@ -967,7 +966,7 @@ func New(
engine := chain.NewEngine()
collection := shardchain.NewCollection(
chainDBFactory, &genesisInitializer{&node}, engine, traceDB, &chainConfig,
chainDBFactory, &genesisInitializer{&node}, engine, &chainConfig,
)
for shardID, archival := range isArchival {

@ -2,6 +2,7 @@ package node
import (
"context"
"encoding/json"
"sync"
"github.com/ethereum/go-ethereum/common"
@ -12,6 +13,8 @@ import (
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/signature"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/hmy/tracers"
"github.com/harmony-one/harmony/internal/utils"
"github.com/pkg/errors"
)
@ -122,10 +125,29 @@ func (node *Node) explorerMessageHandler(ctx context.Context, msg *msg_pb.Messag
// AddNewBlockForExplorer add new block for explorer.
func (node *Node) AddNewBlockForExplorer(block *types.Block) {
utils.Logger().Info().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node")
if _, err := node.Blockchain().InsertChain([]*types.Block{block}, false); err == nil {
vmConfig := &vm.Config{
Debug: node.NodeConfig.TraceEnable,
Tracer: &tracers.ParityBlockTracer{},
}
if _, err := node.Blockchain().InsertAndTraceChain([]*types.Block{block}, false, []*vm.Config{vmConfig}); err == nil {
if block.IsLastBlockInEpoch() {
node.Consensus.UpdateConsensusInformation()
}
if vmConfig.Debug {
traceResults := make([]json.RawMessage, 0)
var err error
if block.Transactions().Len() > 0 {
traceResults, err = vmConfig.Tracer.(*tracers.ParityBlockTracer).GetResult()
}
if err == nil {
if exp, err := node.getExplorerService(); err == nil {
if raw, err := json.Marshal(traceResults); err == nil {
exp.TraceNewBlock(block.Hash(), raw)
}
}
}
}
// Clean up the blocks to avoid OOM.
node.Consensus.FBFTLog.DeleteBlockByNumber(block.NumberU64())
// Do dump all blocks from state syncing for explorer one time
@ -245,6 +267,11 @@ func (node *Node) GetStakingTransactionsCount(address, txType string) (uint64, e
return count, nil
}
// GetStakingTransactionsCount returns the number of staking transactions hashes of address for input type.
func (node *Node) GetTraceResultByHash(hash common.Hash) (json.RawMessage, error) {
return node.GetTraceResultByHash(hash)
}
func (node *Node) getExplorerService() (*explorer.Service, error) {
rawService := node.serviceManager.GetService(service.SupportExplorer)
if rawService == nil {

@ -28,6 +28,9 @@ func (s *PublicParityTracerService) Block(ctx context.Context, number rpc.BlockN
if block == nil {
return nil, nil
}
if results, err := s.hmy.NodeAPI.GetTraceResultByHash(block.Hash()); err == nil {
return results, nil
}
results, err := s.hmy.TraceBlock(ctx, block, &hmy.TraceConfig{Tracer: &parityTraceGO})
if err != nil {
return results, err

Loading…
Cancel
Save