[explorer] adapt trace storage

pull/3818/head
peekpi 3 years ago
parent 158ee1c648
commit 9a3ab9fcaa
  1. 16
      api/service/explorer/schema.go
  2. 5
      api/service/explorer/service.go
  3. 30
      api/service/explorer/storage.go
  4. 205
      hmy/tracers/block_tracer_storage.go
  5. 9
      node/node_explorer.go

@ -37,21 +37,21 @@ func writeCheckpoint(db databaseWriter, bn uint64) error {
return db.Put(blockCheckpoint, []byte{})
}
func getTraceResultKey(hash common.Hash) []byte {
return []byte(fmt.Sprintf("%s_%x", TracePrefix, hash))
func getTraceResultKey(key []byte) []byte {
return append([]byte(TracePrefix), key...)
}
func isTraceResultInDB(db databaseReader, hash common.Hash) (bool, error) {
key := getTraceResultKey(hash)
func isTraceResultInDB(db databaseReader, key []byte) (bool, error) {
key = getTraceResultKey(key)
return db.Has(key)
}
func writeTraceResult(db databaseWriter, hash common.Hash, data []byte) error {
key := getTraceResultKey(hash)
func writeTraceResult(db databaseWriter, key []byte, data []byte) error {
key = getTraceResultKey(key)
return db.Put(key, data)
}
func getTraceResult(db databaseReader, hash common.Hash) ([]byte, error) {
key := getTraceResultKey(hash)
func getTraceResult(db databaseReader, key []byte) ([]byte, error) {
key = getTraceResultKey(key)
return db.Get(key)
}

@ -20,6 +20,7 @@ import (
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/hmy"
"github.com/harmony-one/harmony/hmy/tracers"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/common"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
@ -192,8 +193,8 @@ func (s *Service) GetTraceResultByHash(hash ethCommon.Hash) (json.RawMessage, er
}
// DumpTraceResult instruct the explorer storage to trace data in explorer DB
func (s *Service) DumpTraceResult(hash ethCommon.Hash, data []byte) {
s.storage.DumpTraceResult(hash, data)
func (s *Service) DumpTraceResult(data *tracers.TraceBlockStorage) {
s.storage.DumpTraceResult(data)
}
// DumpNewBlock instruct the explorer storage to dump block data in explorer DB

@ -14,6 +14,7 @@ import (
"github.com/harmony-one/harmony/core"
core2 "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/hmy/tracers"
common2 "github.com/harmony-one/harmony/internal/common"
"github.com/harmony-one/harmony/internal/utils"
staking "github.com/harmony-one/harmony/staking/types"
@ -50,8 +51,7 @@ type (
traceResult struct {
btc batch
hash common.Hash
data []byte
data *tracers.TraceBlockStorage
}
)
@ -82,8 +82,8 @@ func (s *storage) Close() {
close(s.closeC)
}
func (s *storage) DumpTraceResult(hash common.Hash, data []byte) {
s.tm.AddNewTraceTask(hash, data)
func (s *storage) DumpTraceResult(data *tracers.TraceBlockStorage) {
s.tm.AddNewTraceTask(data)
}
func (s *storage) DumpNewBlock(b *types.Block) {
@ -130,7 +130,16 @@ func (s *storage) GetTraceResultByHash(hash common.Hash) (json.RawMessage, error
if !s.available.IsSet() {
return nil, ErrExplorerNotReady
}
return getTraceResult(s.db, hash)
traceStorage := &tracers.TraceBlockStorage{
Hash: hash,
}
err := traceStorage.FromDB(func(key []byte) ([]byte, error) {
return getTraceResult(s.db, key)
})
if err != nil {
return nil, err
}
return traceStorage.ToJson()
}
func (s *storage) run() {
@ -157,7 +166,7 @@ func (s *storage) loop() {
s.log.Error().Err(err).Msg("explorer db failed to write")
}
case res := <-s.resultT:
s.log.Info().Str("block hash", res.hash.Hex()).Msg("writing trace into explorer DB")
s.log.Info().Str("block hash", res.data.Hash.Hex()).Msg("writing trace into explorer DB")
if err := res.btc.Write(); err != nil {
s.log.Error().Err(err).Msg("explorer db failed to write trace data")
}
@ -195,9 +204,8 @@ func (tm *taskManager) AddNewTask(b *types.Block) {
}
}
func (tm *taskManager) AddNewTraceTask(hash common.Hash, data []byte) {
func (tm *taskManager) AddNewTraceTask(data *tracers.TraceBlockStorage) {
tm.T <- &traceResult{
hash: hash,
data: data,
}
}
@ -292,11 +300,13 @@ LOOP:
}
}
case traceResult := <-bc.tm.T:
if exist, err := isTraceResultInDB(bc.db, traceResult.hash); exist || err != nil {
if exist, err := isTraceResultInDB(bc.db, traceResult.data.KeyDB()); exist || err != nil {
continue
}
traceResult.btc = bc.db.NewBatch()
_ = writeTraceResult(traceResult.btc, traceResult.hash, traceResult.data)
traceResult.data.ToDB(func(key, value []byte) {
_ = writeTraceResult(traceResult.btc, key, value)
})
select {
case bc.resultT <- traceResult:
case <-bc.closeC:

@ -18,12 +18,16 @@ package tracers
import (
"encoding/json"
"errors"
"fmt"
"math/big"
"strconv"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/crypto/hash"
"github.com/harmony-one/harmony/internal/utils"
)
type ActionStorage struct {
@ -109,27 +113,46 @@ func (ts *TraceBlockStorage) indexAddress(address common.Address) int {
return index
}
func (ts *TraceBlockStorage) toDB(write func([]byte, []byte)) {
func (ts *TraceBlockStorage) KeyDB() []byte {
return ts.Hash[:]
}
func (ts *TraceBlockStorage) ToDB(write func([]byte, []byte)) {
for index, key := range ts.DataKeyTable {
write(key[:], ts.dataValueTable[index])
}
bytes, _ := rlp.EncodeToBytes(ts)
write(ts.Hash[:], bytes)
write(ts.KeyDB(), bytes)
}
func (ts *TraceBlockStorage) fromDB(read func([]byte) []byte, hash common.Hash) {
bytes := read(hash[:])
rlp.DecodeBytes(bytes, ts)
func (ts *TraceBlockStorage) FromDB(read func([]byte) ([]byte, error)) error {
bytes, err := read(ts.KeyDB())
if err != nil {
return err
}
err = rlp.DecodeBytes(bytes, ts)
if err != nil {
return err
}
for _, key := range ts.DataKeyTable {
ts.dataValueTable = append(ts.dataValueTable, read(key[:]))
data, err := read(key[:])
if err != nil {
return err
}
ts.dataValueTable = append(ts.dataValueTable, data)
}
return nil
}
func (ts *TraceBlockStorage) txJson(index int) []json.RawMessage {
func (ts *TraceBlockStorage) TxJson(index int) ([]json.RawMessage, error) {
var results []json.RawMessage
var txStorage TxStorage
var err error
b := ts.TraceStorages[index]
rlp.DecodeBytes(b, &txStorage)
err = rlp.DecodeBytes(b, &txStorage)
if err != nil {
return nil, err
}
headPiece := fmt.Sprintf(
`"blockNumber":%d,"blockHash":"%s","transactionHash":"%s","transactionPosition":%d`,
@ -142,8 +165,8 @@ func (ts *TraceBlockStorage) txJson(index int) []json.RawMessage {
typStr, acStr, outStr := ac.toJsonStr()
if acStr == nil {
//err = errors.New("tracer internal failure")
return nil
err = errors.New("tracer internal failure")
return nil, err
}
traceStr, _ := json.Marshal(acStorage.TraceAddress)
bodyPiece := fmt.Sprintf(
@ -161,13 +184,167 @@ func (ts *TraceBlockStorage) txJson(index int) []json.RawMessage {
jstr := "{" + headPiece + bodyPiece + resultPiece + "}"
results = append(results, json.RawMessage(jstr))
}
return results
return results, nil
}
func (ts *TraceBlockStorage) toJson() []json.RawMessage {
func (ts *TraceBlockStorage) ToJson() (json.RawMessage, error) {
var results []json.RawMessage
for i := range ts.TraceStorages {
results = append(results, ts.txJson(i)...)
tx, err := ts.TxJson(i)
if err != nil {
return nil, err
}
results = append(results, tx...)
}
return json.Marshal(results)
}
type JsonCallAction struct {
CallType string `json:"callType"`
Value string `json:"value"`
From common.Address `json:"from"`
To common.Address `json:"to"`
Gas string `json:"gas"`
Input string `json:"input"`
}
type JsonCreateAction struct {
From common.Address `json:"from"`
Value string `json:"value"`
Gas string `json:"gas"`
Init string `json:"init"`
}
type JsonSuicideAction struct {
RefundAddress common.Address `json:"refundAddress"`
Balance string `json:"balance"`
Address common.Address `json:"address"`
}
type JsonCallOutput struct {
Output string `json:"output"`
GasUsed string `json:"gasUsed"`
}
type JsonCreateOutput struct {
Address common.Address `json:"address"`
Code string `json:"code"`
GasUsed string `json:"gasUsed"`
}
type JsonTrace struct {
BlockNumber uint64 `json:"blockNumber"`
BlockHash common.Hash `json:"blockHash"`
TransactionHash common.Hash `json:"transactionHash"`
TransactionPosition uint `json:"transactionPosition"`
Subtraces uint `json:"subtraces"`
TraceAddress []uint `json:"traceAddress"`
Typ string `json:"type"`
Action json.RawMessage `json:"action"`
Result json.RawMessage `json:"result"`
Err string `json:"error"`
Revert string `json:"revert"`
}
func (ts *TraceBlockStorage) fromJson(bytes []byte) {
var actionObjs []JsonTrace
var txs []*TxStorage
var tx *TxStorage
json.Unmarshal(bytes, &actionObjs)
for _, obj := range actionObjs {
ac := action{}
if len(obj.Err) > 0 {
ac.err = errors.New(obj.Err)
ac.revert = utils.FromHex(obj.Revert)
}
if obj.Typ == "call" {
callAc := &JsonCallAction{}
err := json.Unmarshal(obj.Action, callAc)
if err != nil {
panic(err)
}
switch callAc.CallType {
case "staticcall":
ac.op = vm.STATICCALL
case "call":
ac.op = vm.CALL
case "callcode":
ac.op = vm.CALLCODE
case "delegatecall":
ac.op = vm.DELEGATECALL
}
ac.from = callAc.From
ac.to = callAc.To
ac.value, _ = new(big.Int).SetString(callAc.Value[2:], 16)
ac.gas, _ = strconv.ParseUint(callAc.Gas, 0, 64)
ac.input = utils.FromHex(callAc.Input)
if ac.err == nil {
callOutput := &JsonCallOutput{}
err = json.Unmarshal(obj.Result, callOutput)
if err != nil {
panic(err)
}
ac.output = utils.FromHex(callOutput.Output)
ac.gasUsed, err = strconv.ParseUint(callOutput.GasUsed, 0, 64)
if err != nil {
panic(err)
}
}
}
if obj.Typ == "create" {
ac.op = vm.CREATE
callAc := &JsonCreateAction{}
err := json.Unmarshal(obj.Action, callAc)
if err != nil {
panic(err)
}
callOutput := &JsonCreateOutput{}
err = json.Unmarshal(obj.Result, callOutput)
if err != nil {
panic(err)
}
ac.from = callAc.From
ac.value, _ = new(big.Int).SetString(callAc.Value[2:], 16)
ac.gas, _ = strconv.ParseUint(callAc.Gas, 0, 64)
ac.input = utils.FromHex(callAc.Init)
if ac.err == nil {
ac.to = callOutput.Address
ac.output = utils.FromHex(callOutput.Code)
ac.gasUsed, _ = strconv.ParseUint(callOutput.GasUsed, 0, 64)
}
}
if obj.Typ == "suicide" {
ac.op = vm.SELFDESTRUCT
callAc := &JsonSuicideAction{}
err := json.Unmarshal(obj.Action, callAc)
if err != nil {
panic(err)
}
ac.from = callAc.Address
ac.to = callAc.RefundAddress
ac.value, _ = new(big.Int).SetString(callAc.Balance[2:], 16)
}
ts.Hash = obj.BlockHash
ts.Number = obj.BlockNumber
if tx == nil || tx.Hash != obj.TransactionHash {
tx = &TxStorage{
Hash: obj.TransactionHash,
}
txs = append(txs, tx)
}
acStorage := ac.toStorage(ts)
acStorage.Subtraces = obj.Subtraces
acStorage.TraceAddress = obj.TraceAddress
tx.Storages = append(tx.Storages, acStorage)
}
for _, tx := range txs {
b, err := rlp.EncodeToBytes(tx)
if err != nil {
panic(err)
}
ts.TraceStorages = append(ts.TraceStorages, b)
}
return results
}

@ -131,12 +131,9 @@ func (node *Node) TraceLoopForExplorer() {
loop:
select {
case ev := <-ch:
if traceResults, err := ev.Tracer.GetResult(); err == nil {
if exp, err := node.getExplorerService(); err == nil {
if raw, err := json.Marshal(traceResults); err == nil {
exp.DumpTraceResult(ev.Block.Hash(), raw)
}
}
if exp, err := node.getExplorerService(); err == nil {
storage := ev.Tracer.GetStorage()
exp.DumpTraceResult(storage)
}
goto loop
case <-subscribe.Err():

Loading…
Cancel
Save