From 404b861f81041ee52374822a59121b67cc07e5c8 Mon Sep 17 00:00:00 2001 From: peekpi <894646171@qq.com> Date: Sat, 26 Jun 2021 18:21:27 +0800 Subject: [PATCH] [tracer][explorer] use event to send trace data to explorer --- cmd/harmony/default.go | 1 + cmd/harmony/flags.go | 2 +- core/blockchain.go | 39 ++++++++++++++++++++++++---------- core/events.go | 6 ++++++ node/node.go | 1 + node/node_explorer.go | 48 ++++++++++++++++++++++++------------------ test/deploy.sh | 2 +- 7 files changed, 65 insertions(+), 34 deletions(-) diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 4b76c6203..d5125e178 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -21,6 +21,7 @@ var defaultConfig = harmonyconfig.HarmonyConfig{ IsBeaconArchival: false, IsOffline: false, DataDir: "./", + TraceEnable: false, }, Network: getDefaultNetworkConfig(defNetworkType), P2P: harmonyconfig.P2pConfig{ diff --git a/cmd/harmony/flags.go b/cmd/harmony/flags.go index a4f2c835e..6aa00d6be 100644 --- a/cmd/harmony/flags.go +++ b/cmd/harmony/flags.go @@ -277,7 +277,7 @@ var ( taraceFlag = cli.BoolFlag{ Name: "tracing", Usage: "indicates if full transaction tracing should be enabled", - DefValue: false, + DefValue: defaultConfig.General.TraceEnable, } ) diff --git a/core/blockchain.go b/core/blockchain.go index 89e913ec0..fb4ff59fc 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -49,6 +49,8 @@ import ( "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/numeric" "github.com/harmony-one/harmony/shard" + + "github.com/harmony-one/harmony/hmy/tracers" "github.com/harmony-one/harmony/shard/committee" "github.com/harmony-one/harmony/staking/apr" "github.com/harmony-one/harmony/staking/effective" @@ -123,6 +125,8 @@ type BlockChain struct { gcproc time.Duration // Accumulates canonical block processing for trie dumping hc *HeaderChain + trace bool // atomic? + traceFeed event.Feed // send trace_block result to explorer rmLogsFeed event.Feed chainFeed event.Feed chainSideFeed event.Feed @@ -1284,13 +1288,7 @@ func (bc *BlockChain) GetMaxGarbageCollectedBlockNumber() int64 { // // After insertion is done, all accumulated events will be fired. func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) { - n, events, logs, err := bc.insertChain(chain, verifyHeaders, nil) - bc.PostChainEvents(events, logs) - return n, err -} - -func (bc *BlockChain) InsertAndTraceChain(chain types.Blocks, verifyHeaders bool, tracers []*vm.Config) (int, error) { - n, events, logs, err := bc.insertChain(chain, verifyHeaders, tracers) + n, events, logs, err := bc.insertChain(chain, verifyHeaders) bc.PostChainEvents(events, logs) return n, err } @@ -1298,7 +1296,7 @@ func (bc *BlockChain) InsertAndTraceChain(chain types.Blocks, verifyHeaders bool // insertChain will execute the actual chain insertion and event aggregation. The // only reason this method exists as a separate one is to make locking cleaner // with deferred statements. -func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool, tracers []*vm.Config) (int, []interface{}, []*types.Log, error) { +func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int, []interface{}, []*types.Log, error) { // Sanity check that we have something meaningful to import if len(chain) == 0 { return 0, nil, nil, nil @@ -1426,7 +1424,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool, tracer if len(winner) > 0 { // Import all the pruned blocks to make the state available bc.chainmu.Unlock() - _, evs, logs, err := bc.insertChain(winner, true /* verifyHeaders */, nil) + _, evs, logs, err := bc.insertChain(winner, true /* verifyHeaders */) bc.chainmu.Lock() events, coalescedLogs = evs, logs @@ -1453,8 +1451,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool, tracer return i, events, coalescedLogs, err } vmConfig := bc.vmConfig - if len(tracers) > i && tracers[i] != nil { - vmConfig = *tracers[i] + if bc.trace { + vmConfig = vm.Config{ + Debug: true, + + Tracer: &tracers.ParityBlockTracer{}, + } } // Process block using the parent state as reference point. receipts, cxReceipts, logs, usedGas, payout, err := bc.processor.Process( @@ -1481,6 +1483,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool, tracer if err != nil { return i, events, coalescedLogs, err } + if bc.trace { + bc.PostChainEvents([]interface{}{TraceEvent{ + Block: block, + Tracer: vmConfig.Tracer.(*tracers.ParityBlockTracer), + }}, nil) + } logger := utils.Logger().With(). Str("number", block.Number().String()). Str("hash", block.Hash().Hex()). @@ -1596,6 +1604,9 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { case ChainSideEvent: bc.chainSideFeed.Send(ev) + + case TraceEvent: + bc.traceFeed.Send(ev) } } } @@ -1787,6 +1798,12 @@ func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) even return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch)) } +// SubscribeChainEvent registers a subscription of ChainEvent. +func (bc *BlockChain) SubscribeTraceEvent(ch chan<- TraceEvent) event.Subscription { + bc.trace = true + return bc.scope.Track(bc.traceFeed.Subscribe(ch)) +} + // SubscribeChainEvent registers a subscription of ChainEvent. func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription { return bc.scope.Track(bc.chainFeed.Subscribe(ch)) diff --git a/core/events.go b/core/events.go index 9a61c6243..19a43b82c 100644 --- a/core/events.go +++ b/core/events.go @@ -19,6 +19,7 @@ package core import ( "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/hmy/tracers" ) // NewTxsEvent is posted when a batch of transactions enter the transaction pool. @@ -42,6 +43,11 @@ type ChainEvent struct { Logs []*types.Log } +type TraceEvent struct { + Block *types.Block + Tracer *tracers.ParityBlockTracer +} + // ChainSideEvent is chain side event. type ChainSideEvent struct { Block *types.Block diff --git a/node/node.go b/node/node.go index 5e3ca26e2..763991c9f 100644 --- a/node/node.go +++ b/node/node.go @@ -918,6 +918,7 @@ func (node *Node) StartPubSub() error { } }() + node.TraceLoopForExplorer() return nil } diff --git a/node/node_explorer.go b/node/node_explorer.go index 730a29eb1..bbef291b6 100644 --- a/node/node_explorer.go +++ b/node/node_explorer.go @@ -12,9 +12,8 @@ import ( "github.com/harmony-one/harmony/api/service/explorer" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus/signature" + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/core/vm" - "github.com/harmony-one/harmony/hmy/tracers" "github.com/harmony-one/harmony/internal/utils" "github.com/pkg/errors" ) @@ -122,31 +121,38 @@ func (node *Node) explorerMessageHandler(ctx context.Context, msg *msg_pb.Messag return nil } -// AddNewBlockForExplorer add new block for explorer. -func (node *Node) AddNewBlockForExplorer(block *types.Block) { - utils.Logger().Info().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node") - - vmConfig := &vm.Config{ - Debug: node.NodeConfig.TraceEnable, - Tracer: &tracers.ParityBlockTracer{}, +func (node *Node) TraceLoopForExplorer() { + if !node.HarmonyConfig.General.TraceEnable { + return } - if _, err := node.Blockchain().InsertAndTraceChain([]*types.Block{block}, false, []*vm.Config{vmConfig}); err == nil { - if block.IsLastBlockInEpoch() { - node.Consensus.UpdateConsensusInformation() - } - if vmConfig.Debug { - traceResults := make([]json.RawMessage, 0) - var err error - if block.Transactions().Len() > 0 { - traceResults, err = vmConfig.Tracer.(*tracers.ParityBlockTracer).GetResult() - } - if err == nil { + ch := make(chan core.TraceEvent) + subscribe := node.Blockchain().SubscribeTraceEvent(ch) + go func() { + loop: + select { + case ev := <-ch: + if traceResults, err := ev.Tracer.GetResult(); err == nil { if exp, err := node.getExplorerService(); err == nil { if raw, err := json.Marshal(traceResults); err == nil { - exp.TraceNewBlock(block.Hash(), raw) + exp.TraceNewBlock(ev.Block.Hash(), raw) } } } + goto loop + case <-subscribe.Err(): + //subscribe.Unsubscribe() + break + } + }() +} + +// AddNewBlockForExplorer add new block for explorer. +func (node *Node) AddNewBlockForExplorer(block *types.Block) { + utils.Logger().Info().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node") + + if _, err := node.Blockchain().InsertChain([]*types.Block{block}, false); err == nil { + if block.IsLastBlockInEpoch() { + node.Consensus.UpdateConsensusInformation() } // Clean up the blocks to avoid OOM. node.Consensus.FBFTLog.DeleteBlockByNumber(block.NumberU64()) diff --git a/test/deploy.sh b/test/deploy.sh index 4c3976795..b20d3d6e3 100755 --- a/test/deploy.sh +++ b/test/deploy.sh @@ -125,7 +125,7 @@ function launch_localnet() { esac # Start the node - ${DRYRUN} "${ROOT}/bin/harmony" --trace_dir=sdf "${args[@]}" "${extra_args[@]}" 2>&1 | tee -a "${LOG_FILE}" & + ${DRYRUN} "${ROOT}/bin/harmony" "${args[@]}" "${extra_args[@]}" 2>&1 | tee -a "${LOG_FILE}" & done <"${config}" }