[tracer][explorer] use event to send trace data to explorer

pull/3818/head
peekpi 3 years ago
parent ab8e38f3df
commit 404b861f81
  1. 1
      cmd/harmony/default.go
  2. 2
      cmd/harmony/flags.go
  3. 39
      core/blockchain.go
  4. 6
      core/events.go
  5. 1
      node/node.go
  6. 48
      node/node_explorer.go
  7. 2
      test/deploy.sh

@ -21,6 +21,7 @@ var defaultConfig = harmonyconfig.HarmonyConfig{
IsBeaconArchival: false,
IsOffline: false,
DataDir: "./",
TraceEnable: false,
},
Network: getDefaultNetworkConfig(defNetworkType),
P2P: harmonyconfig.P2pConfig{

@ -277,7 +277,7 @@ var (
taraceFlag = cli.BoolFlag{
Name: "tracing",
Usage: "indicates if full transaction tracing should be enabled",
DefValue: false,
DefValue: defaultConfig.General.TraceEnable,
}
)

@ -49,6 +49,8 @@ import (
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/numeric"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/hmy/tracers"
"github.com/harmony-one/harmony/shard/committee"
"github.com/harmony-one/harmony/staking/apr"
"github.com/harmony-one/harmony/staking/effective"
@ -123,6 +125,8 @@ type BlockChain struct {
gcproc time.Duration // Accumulates canonical block processing for trie dumping
hc *HeaderChain
trace bool // atomic?
traceFeed event.Feed // send trace_block result to explorer
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
@ -1284,13 +1288,7 @@ func (bc *BlockChain) GetMaxGarbageCollectedBlockNumber() int64 {
//
// After insertion is done, all accumulated events will be fired.
func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) {
n, events, logs, err := bc.insertChain(chain, verifyHeaders, nil)
bc.PostChainEvents(events, logs)
return n, err
}
func (bc *BlockChain) InsertAndTraceChain(chain types.Blocks, verifyHeaders bool, tracers []*vm.Config) (int, error) {
n, events, logs, err := bc.insertChain(chain, verifyHeaders, tracers)
n, events, logs, err := bc.insertChain(chain, verifyHeaders)
bc.PostChainEvents(events, logs)
return n, err
}
@ -1298,7 +1296,7 @@ func (bc *BlockChain) InsertAndTraceChain(chain types.Blocks, verifyHeaders bool
// insertChain will execute the actual chain insertion and event aggregation. The
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool, tracers []*vm.Config) (int, []interface{}, []*types.Log, error) {
func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return 0, nil, nil, nil
@ -1426,7 +1424,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool, tracer
if len(winner) > 0 {
// Import all the pruned blocks to make the state available
bc.chainmu.Unlock()
_, evs, logs, err := bc.insertChain(winner, true /* verifyHeaders */, nil)
_, evs, logs, err := bc.insertChain(winner, true /* verifyHeaders */)
bc.chainmu.Lock()
events, coalescedLogs = evs, logs
@ -1453,8 +1451,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool, tracer
return i, events, coalescedLogs, err
}
vmConfig := bc.vmConfig
if len(tracers) > i && tracers[i] != nil {
vmConfig = *tracers[i]
if bc.trace {
vmConfig = vm.Config{
Debug: true,
Tracer: &tracers.ParityBlockTracer{},
}
}
// Process block using the parent state as reference point.
receipts, cxReceipts, logs, usedGas, payout, err := bc.processor.Process(
@ -1481,6 +1483,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool, tracer
if err != nil {
return i, events, coalescedLogs, err
}
if bc.trace {
bc.PostChainEvents([]interface{}{TraceEvent{
Block: block,
Tracer: vmConfig.Tracer.(*tracers.ParityBlockTracer),
}}, nil)
}
logger := utils.Logger().With().
Str("number", block.Number().String()).
Str("hash", block.Hash().Hex()).
@ -1596,6 +1604,9 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
case ChainSideEvent:
bc.chainSideFeed.Send(ev)
case TraceEvent:
bc.traceFeed.Send(ev)
}
}
}
@ -1787,6 +1798,12 @@ func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) even
return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))
}
// SubscribeChainEvent registers a subscription of ChainEvent.
func (bc *BlockChain) SubscribeTraceEvent(ch chan<- TraceEvent) event.Subscription {
bc.trace = true
return bc.scope.Track(bc.traceFeed.Subscribe(ch))
}
// SubscribeChainEvent registers a subscription of ChainEvent.
func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription {
return bc.scope.Track(bc.chainFeed.Subscribe(ch))

@ -19,6 +19,7 @@ package core
import (
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/hmy/tracers"
)
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
@ -42,6 +43,11 @@ type ChainEvent struct {
Logs []*types.Log
}
type TraceEvent struct {
Block *types.Block
Tracer *tracers.ParityBlockTracer
}
// ChainSideEvent is chain side event.
type ChainSideEvent struct {
Block *types.Block

@ -918,6 +918,7 @@ func (node *Node) StartPubSub() error {
}
}()
node.TraceLoopForExplorer()
return nil
}

@ -12,9 +12,8 @@ import (
"github.com/harmony-one/harmony/api/service/explorer"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/signature"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/hmy/tracers"
"github.com/harmony-one/harmony/internal/utils"
"github.com/pkg/errors"
)
@ -122,31 +121,38 @@ func (node *Node) explorerMessageHandler(ctx context.Context, msg *msg_pb.Messag
return nil
}
// AddNewBlockForExplorer add new block for explorer.
func (node *Node) AddNewBlockForExplorer(block *types.Block) {
utils.Logger().Info().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node")
vmConfig := &vm.Config{
Debug: node.NodeConfig.TraceEnable,
Tracer: &tracers.ParityBlockTracer{},
func (node *Node) TraceLoopForExplorer() {
if !node.HarmonyConfig.General.TraceEnable {
return
}
if _, err := node.Blockchain().InsertAndTraceChain([]*types.Block{block}, false, []*vm.Config{vmConfig}); err == nil {
if block.IsLastBlockInEpoch() {
node.Consensus.UpdateConsensusInformation()
}
if vmConfig.Debug {
traceResults := make([]json.RawMessage, 0)
var err error
if block.Transactions().Len() > 0 {
traceResults, err = vmConfig.Tracer.(*tracers.ParityBlockTracer).GetResult()
}
if err == nil {
ch := make(chan core.TraceEvent)
subscribe := node.Blockchain().SubscribeTraceEvent(ch)
go func() {
loop:
select {
case ev := <-ch:
if traceResults, err := ev.Tracer.GetResult(); err == nil {
if exp, err := node.getExplorerService(); err == nil {
if raw, err := json.Marshal(traceResults); err == nil {
exp.TraceNewBlock(block.Hash(), raw)
exp.TraceNewBlock(ev.Block.Hash(), raw)
}
}
}
goto loop
case <-subscribe.Err():
//subscribe.Unsubscribe()
break
}
}()
}
// AddNewBlockForExplorer add new block for explorer.
func (node *Node) AddNewBlockForExplorer(block *types.Block) {
utils.Logger().Info().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node")
if _, err := node.Blockchain().InsertChain([]*types.Block{block}, false); err == nil {
if block.IsLastBlockInEpoch() {
node.Consensus.UpdateConsensusInformation()
}
// Clean up the blocks to avoid OOM.
node.Consensus.FBFTLog.DeleteBlockByNumber(block.NumberU64())

@ -125,7 +125,7 @@ function launch_localnet() {
esac
# Start the node
${DRYRUN} "${ROOT}/bin/harmony" --trace_dir=sdf "${args[@]}" "${extra_args[@]}" 2>&1 | tee -a "${LOG_FILE}" &
${DRYRUN} "${ROOT}/bin/harmony" "${args[@]}" "${extra_args[@]}" 2>&1 | tee -a "${LOG_FILE}" &
done <"${config}"
}

Loading…
Cancel
Save