Merge pull request #2254 from flicker-harmony/pr_exp_storage

Refactor explorer service and add internal addresses fetching
pull/2380/head
Leo Chen 5 years ago committed by GitHub
commit 285e142364
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 93
      api/service/explorer/service.go
  2. 113
      api/service/explorer/storage.go
  3. 81
      api/service/explorer/storage_test.go
  4. 60
      api/service/explorer/structs.go
  5. 2
      node/node_explorer.go
  6. 2
      node/service_setup.go

@ -4,16 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"math/big"
"net"
"net/http"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/mux"
libp2p_peer "github.com/libp2p/go-libp2p-peer"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/reward"
@ -25,6 +22,8 @@ import (
// Constants for explorer service.
const (
explorerPortDifference = 4000
defaultPageSize = "1000"
maxAddresses = 100000
totalSupply = 12600000000
)
@ -39,33 +38,14 @@ type Service struct {
router *mux.Router
IP string
Port string
GetNodeIDs func() []libp2p_peer.ID
ShardID uint32
Storage *Storage
server *http.Server
messageChan chan *msg_pb.Message
GetAccountBalance func(common.Address) (*big.Int, error)
}
// New returns explorer service.
func New(selfPeer *p2p.Peer, shardID uint32, GetNodeIDs func() []libp2p_peer.ID, GetAccountBalance func(common.Address) (*big.Int, error)) *Service {
return &Service{
IP: selfPeer.IP,
Port: selfPeer.Port,
ShardID: shardID,
GetNodeIDs: GetNodeIDs,
GetAccountBalance: GetAccountBalance,
}
}
// ServiceAPI is rpc api.
type ServiceAPI struct {
Service *Service
}
// NewServiceAPI returns explorer service api.
func NewServiceAPI(explorerService *Service) *ServiceAPI {
return &ServiceAPI{Service: explorerService}
func New(selfPeer *p2p.Peer) *Service {
return &Service{IP: selfPeer.IP, Port: selfPeer.Port}
}
// StartService starts explorer service.
@ -106,6 +86,12 @@ func (s *Service) Run() *http.Server {
s.router = mux.NewRouter()
// Set up router for addresses.
// Fetch addresses request, accepts parameter size: how much addresses to read,
// parameter prefix: from which address prefix start
s.router.Path("/addresses").Queries("size", "{[0-9]*?}", "prefix", "{[a-zA-Z0-9]*?}").HandlerFunc(s.GetAddresses).Methods("GET")
s.router.Path("/addresses").HandlerFunc(s.GetAddresses)
// Set up router for node count.
s.router.Path("/circulating-supply").Queries().HandlerFunc(s.GetCirculatingSupply).Methods("GET")
s.router.Path("/circulating-supply").HandlerFunc(s.GetCirculatingSupply)
@ -125,46 +111,34 @@ func (s *Service) Run() *http.Server {
return server
}
// GetExplorerNodeCount serves /nodes end-point.
func (s *Service) GetExplorerNodeCount(w http.ResponseWriter, r *http.Request) {
// GetAddresses serves end-point /addresses, returns size of addresses from address with prefix.
func (s *Service) GetAddresses(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(len(s.GetNodeIDs())); err != nil {
utils.Logger().Warn().Msg("cannot JSON-encode node count")
w.WriteHeader(http.StatusInternalServerError)
sizeStr := r.FormValue("size")
prefix := r.FormValue("prefix")
if sizeStr == "" {
sizeStr = defaultPageSize
}
data := &Data{}
defer func() {
if err := json.NewEncoder(w).Encode(data.Addresses); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot JSON-encode addresses")
}
}()
// GetExplorerNodeCount rpc end-point.
func (s *ServiceAPI) GetExplorerNodeCount(ctx context.Context) int {
return len(s.Service.GetNodeIDs())
size, err := strconv.Atoi(sizeStr)
if err != nil || size > maxAddresses {
w.WriteHeader(http.StatusBadRequest)
return
}
// GetExplorerShard serves /shard end-point.
func (s *Service) GetExplorerShard(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var nodes []Node
for _, nodeID := range s.GetNodeIDs() {
nodes = append(nodes, Node{
ID: libp2p_peer.IDB58Encode(nodeID),
})
}
if err := json.NewEncoder(w).Encode(Shard{Nodes: nodes}); err != nil {
utils.Logger().Warn().Msg("cannot JSON-encode shard info")
data.Addresses, err = s.Storage.GetAddresses(size, prefix)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
utils.Logger().Warn().Err(err).Msg("wasn't able to fetch addresses from storage")
return
}
}
// GetExplorerShard rpc end-point.
func (s *ServiceAPI) GetExplorerShard(ctx context.Context) *Shard {
var nodes []Node
for _, nodeID := range s.Service.GetNodeIDs() {
nodes = append(nodes, Node{
ID: libp2p_peer.IDB58Encode(nodeID),
})
}
return &Shard{Nodes: nodes}
}
// GetCirculatingSupply serves /circulating-supply end-point.
func (s *Service) GetCirculatingSupply(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
@ -197,12 +171,5 @@ func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "explorer",
Version: "1.0",
Service: NewServiceAPI(s),
Public: true,
},
}
return nil
}

@ -3,51 +3,34 @@ package explorer
import (
"fmt"
"os"
"strconv"
"sync"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
// Constants for storage.
const (
BlockHeightKey = "bh"
BlockInfoPrefix = "bi"
BlockPrefix = "b"
TXPrefix = "tx"
AddressPrefix = "ad"
PrefixLen = 3
)
// GetBlockInfoKey ...
func GetBlockInfoKey(id int) string {
return fmt.Sprintf("%s_%d", BlockInfoPrefix, id)
}
// GetAddressKey ...
func GetAddressKey(address string) string {
return fmt.Sprintf("%s_%s", AddressPrefix, address)
}
// GetBlockKey ...
func GetBlockKey(id int) string {
return fmt.Sprintf("%s_%d", BlockPrefix, id)
}
// GetTXKey ...
func GetTXKey(hash string) string {
return fmt.Sprintf("%s_%s", TXPrefix, hash)
}
var storage *Storage
var once sync.Once
// Storage dump the block info into leveldb.
type Storage struct {
db *ethdb.LDBDatabase
db *leveldb.DB
}
// GetStorageInstance returns attack model by using singleton pattern.
@ -69,65 +52,43 @@ func (storage *Storage) Init(ip, port string, remove bool) {
utils.Logger().Error().Err(err).Msg("Failed to remove existing database files")
}
}
if storage.db, err = ethdb.NewLDBDatabase(dbFileName, 0, 0); err != nil {
// https://github.com/ethereum/go-ethereum/blob/master/ethdb/leveldb/leveldb.go#L98 options.
// We had 0 for handles and cache params before, so set 0s for all of them. Filter opt is the same.
options := &opt.Options{
OpenFilesCacheCapacity: 0,
BlockCacheCapacity: 0,
WriteBuffer: 0,
Filter: filter.NewBloomFilter(10),
}
if storage.db, err = leveldb.OpenFile(dbFileName, options); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to create new database")
}
}
// GetDB returns the LDBDatabase of the storage.
func (storage *Storage) GetDB() *ethdb.LDBDatabase {
func (storage *Storage) GetDB() *leveldb.DB {
return storage.db
}
// Dump extracts information from block and index them into lvdb for explorer.
func (storage *Storage) Dump(block *types.Block, height uint64) {
//utils.Logger().Debug().Uint64("block height", height).Msg("Dumping block")
if block == nil {
return
}
batch := storage.db.NewBatch()
// Update block height.
if err := batch.Put([]byte(BlockHeightKey), []byte(strconv.Itoa(int(height)))); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot batch block height")
}
// Store block.
blockData, err := rlp.EncodeToBytes(block)
if err == nil {
if err := batch.Put([]byte(GetBlockKey(int(height))), blockData); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot batch block data")
}
} else {
utils.Logger().Error().Err(err).Msg("Failed to serialize block")
}
batch := new(leveldb.Batch)
// Store txs
for _, tx := range block.Transactions() {
explorerTransaction := GetTransaction(tx, block)
storage.UpdateTXStorage(batch, explorerTransaction, tx)
storage.UpdateAddress(batch, explorerTransaction, tx)
}
if err := batch.Write(); err != nil {
if err := storage.db.Write(batch, nil); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot write batch")
}
}
// UpdateTXStorage ...
func (storage *Storage) UpdateTXStorage(batch ethdb.Batch, explorerTransaction *Transaction, tx *types.Transaction) {
if data, err := rlp.EncodeToBytes(explorerTransaction); err == nil {
key := GetTXKey(tx.Hash().Hex())
if err := batch.Put([]byte(key), data); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot batch TX")
}
} else {
utils.Logger().Error().Msg("EncodeRLP transaction error")
}
}
// UpdateAddress ...
// TODO: deprecate this logic
func (storage *Storage) UpdateAddress(batch ethdb.Batch, explorerTransaction *Transaction, tx *types.Transaction) {
func (storage *Storage) UpdateAddress(batch *leveldb.Batch, explorerTransaction *Transaction, tx *types.Transaction) {
explorerTransaction.Type = Received
if explorerTransaction.To != "" {
storage.UpdateAddressStorage(batch, explorerTransaction.To, explorerTransaction, tx)
@ -137,12 +98,10 @@ func (storage *Storage) UpdateAddress(batch ethdb.Batch, explorerTransaction *Tr
}
// UpdateAddressStorage updates specific addr Address.
// TODO: deprecate this logic
func (storage *Storage) UpdateAddressStorage(batch ethdb.Batch, addr string, explorerTransaction *Transaction, tx *types.Transaction) {
key := GetAddressKey(addr)
func (storage *Storage) UpdateAddressStorage(batch *leveldb.Batch, addr string, explorerTransaction *Transaction, tx *types.Transaction) {
var address Address
if data, err := storage.db.Get([]byte(key)); err == nil {
key := GetAddressKey(addr)
if data, err := storage.db.Get([]byte(key), nil); err == nil {
if err = rlp.DecodeBytes(data, &address); err != nil {
utils.Logger().Error().Err(err).Msg("Failed due to error")
}
@ -151,10 +110,32 @@ func (storage *Storage) UpdateAddressStorage(batch ethdb.Batch, addr string, exp
address.TXs = append(address.TXs, explorerTransaction)
encoded, err := rlp.EncodeToBytes(address)
if err == nil {
if err := batch.Put([]byte(key), encoded); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot batch address")
}
batch.Put([]byte(key), encoded)
} else {
utils.Logger().Error().Err(err).Msg("cannot encode address")
}
}
// GetAddresses returns size of addresses from address with prefix.
func (storage *Storage) GetAddresses(size int, prefix string) ([]string, error) {
db := storage.GetDB()
key := GetAddressKey(prefix)
iterator := db.NewIterator(&util.Range{Start: []byte(key)}, nil)
addresses := make([]string, 0)
read := 0
for iterator.Next() && read < size {
address := string(iterator.Key())
read++
if len(address) < PrefixLen {
utils.Logger().Info().Msgf("address len < 3 %s", address)
continue
}
addresses = append(addresses, address[PrefixLen:])
}
iterator.Release()
if err := iterator.Error(); err != nil {
utils.Logger().Error().Err(err).Msg("iterator error")
return nil, err
}
return addresses, nil
}

@ -2,98 +2,23 @@ package explorer
import (
"bytes"
"math/big"
"strconv"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
blockfactory "github.com/harmony-one/harmony/block/factory"
"github.com/harmony-one/harmony/core/types"
"github.com/stretchr/testify/assert"
)
// Test for GetBlockInfoKey
func TestGetBlockInfoKey(t *testing.T) {
assert.Equal(t, GetBlockInfoKey(3), "bi_3", "error")
}
// Test for GetAddressKey
func TestGetAddressKey(t *testing.T) {
assert.Equal(t, GetAddressKey("abcd"), "ad_abcd", "error")
}
// Test for GetBlockKey
func TestGetBlockKey(t *testing.T) {
assert.Equal(t, GetBlockKey(3), "b_3", "error")
}
// Test for GetTXKey
func TestGetTXKey(t *testing.T) {
assert.Equal(t, GetTXKey("abcd"), "tx_abcd", "error")
}
// TestInit ..
func TestInit(t *testing.T) {
ins := GetStorageInstance("1.1.1.1", "3333", true)
if err := ins.GetDB().Put([]byte{1}, []byte{2}); err != nil {
if err := ins.GetDB().Put([]byte{1}, []byte{2}, nil); err != nil {
t.Fatal("(*LDBDatabase).Put failed:", err)
}
value, err := ins.GetDB().Get([]byte{1})
value, err := ins.GetDB().Get([]byte{1}, nil)
assert.Equal(t, bytes.Compare(value, []byte{2}), 0, "value should be []byte{2}")
assert.Nil(t, err, "error should be nil")
}
func TestDump(t *testing.T) {
tx1 := types.NewTransaction(1, common.BytesToAddress([]byte{0x11}), 0, big.NewInt(111), 1111, big.NewInt(11111), []byte{0x11, 0x11, 0x11})
tx2 := types.NewTransaction(2, common.BytesToAddress([]byte{0x22}), 0, big.NewInt(222), 2222, big.NewInt(22222), []byte{0x22, 0x22, 0x22})
tx3 := types.NewTransaction(3, common.BytesToAddress([]byte{0x33}), 0, big.NewInt(333), 3333, big.NewInt(33333), []byte{0x33, 0x33, 0x33})
txs := []*types.Transaction{tx1, tx2, tx3}
block := types.NewBlock(blockfactory.NewTestHeader().With().Number(big.NewInt(314)).Header(), txs, types.Receipts{&types.Receipt{}, &types.Receipt{}, &types.Receipt{}}, nil, nil, nil)
ins := GetStorageInstance("1.1.1.1", "3333", true)
ins.Dump(block, uint64(1))
db := ins.GetDB()
res, err := db.Get([]byte(BlockHeightKey))
if err == nil {
toInt, err := strconv.Atoi(string(res))
assert.Equal(t, toInt, 1, "error")
assert.Nil(t, err, "error")
} else {
t.Error("Error")
}
data, err := db.Get([]byte(GetBlockKey(1)))
assert.Nil(t, err, "should be nil")
blockData, err := rlp.EncodeToBytes(block)
assert.Nil(t, err, "should be nil")
assert.Equal(t, bytes.Compare(data, blockData), 0, "should be equal")
}
func TestUpdateAddressStorage(t *testing.T) {
tx1 := types.NewTransaction(1, common.BytesToAddress([]byte{0x11}), 0, big.NewInt(111), 1111, big.NewInt(11111), []byte{0x11, 0x11, 0x11})
tx2 := types.NewTransaction(2, common.BytesToAddress([]byte{0x22}), 0, big.NewInt(222), 2222, big.NewInt(22222), []byte{0x22, 0x22, 0x22})
tx3 := types.NewTransaction(3, common.BytesToAddress([]byte{0x33}), 0, big.NewInt(333), 3333, big.NewInt(33333), []byte{0x33, 0x33, 0x33})
txs := []*types.Transaction{tx1, tx2, tx3}
block := types.NewBlock(blockfactory.NewTestHeader().With().Number(big.NewInt(314)).Header(), txs, types.Receipts{&types.Receipt{}, &types.Receipt{}, &types.Receipt{}}, nil, nil, nil)
ins := GetStorageInstance("1.1.1.1", "3333", true)
ins.Dump(block, uint64(1))
db := ins.GetDB()
res, err := db.Get([]byte(BlockHeightKey))
if err == nil {
toInt, err := strconv.Atoi(string(res))
assert.Equal(t, toInt, 1, "error")
assert.Nil(t, err, "error")
} else {
t.Error("Error")
}
data, err := db.Get([]byte(GetBlockKey(1)))
assert.Nil(t, err, "should be nil")
blockData, err := rlp.EncodeToBytes(block)
assert.Nil(t, err, "should be nil")
assert.Equal(t, bytes.Compare(data, blockData), 0, "should be equal")
}

@ -22,10 +22,7 @@ const (
// Data ...
type Data struct {
Blocks []*Block `json:"blocks"`
// Block Block `json:"block"`
Address Address `json:"Address"`
TX Transaction
Addresses []string `json:"Addresses"`
}
// Address ...
@ -35,12 +32,6 @@ type Address struct {
TXs []*Transaction `json:"txs"`
}
// Validator contains harmony validator node address and its balance.
type Validator struct {
Address string `json:"address"`
Balance *big.Int `json:"balance"`
}
// Transaction ...
type Transaction struct {
ID string `json:"id"`
@ -56,55 +47,6 @@ type Transaction struct {
Type string `json:"type"`
}
// Block ...
type Block struct {
Height string `json:"height"`
ID string `json:"id"`
TXCount string `json:"txCount"`
Timestamp string `json:"timestamp"`
BlockTime int64 `json:"blockTime"`
MerkleRoot string `json:"merkleRoot"`
PrevBlock RefBlock `json:"prevBlock"`
Bytes string `json:"bytes"`
NextBlock RefBlock `json:"nextBlock"`
TXs []*Transaction `json:"txs"`
Signers []string `json:"signers"`
Epoch uint64 `json:"epoch"`
ExtraData string `json:"extra_data"`
}
// RefBlock ...
type RefBlock struct {
ID string `json:"id"`
Height string `json:"height"`
}
// Node ...
type Node struct {
ID string `json:"id"`
}
// Shard ...
type Shard struct {
Nodes []Node `json:"nodes"`
}
// NewBlock ...
func NewBlock(block *types.Block, height int) *Block {
// TODO(ricl): use block.Header().CommitBitmap and GetPubKeyFromMask
return &Block{
Height: strconv.Itoa(height),
ID: block.Hash().Hex(),
TXCount: strconv.Itoa(block.Transactions().Len()),
Timestamp: strconv.Itoa(int(block.Time().Int64() * 1000)),
MerkleRoot: block.Root().Hex(),
Bytes: strconv.Itoa(int(block.Size())),
Signers: []string{},
Epoch: block.Epoch().Uint64(),
ExtraData: string(block.Extra()),
}
}
// GetTransaction ...
func GetTransaction(tx *types.Transaction, addressBlock *types.Block) *Transaction {
msg, err := tx.AsMessage(types.NewEIP155Signer(tx.ChainID()))

@ -150,7 +150,7 @@ func (node *Node) commitBlockForExplorer(block *types.Block) {
func (node *Node) GetTransactionsHistory(address, txType, order string) ([]common.Hash, error) {
addressData := &explorer.Address{}
key := explorer.GetAddressKey(address)
bytes, err := explorer.GetStorageInstance(node.SelfPeer.IP, node.SelfPeer.Port, false).GetDB().Get([]byte(key))
bytes, err := explorer.GetStorageInstance(node.SelfPeer.IP, node.SelfPeer.Port, false).GetDB().Get([]byte(key), nil)
if err != nil {
return make([]common.Hash, 0), nil
}

@ -72,7 +72,7 @@ func (node *Node) setupForExplorerNode() {
// Register networkinfo service.
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.MustNew(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil, node.networkInfoDHTPath()))
// Register explorer service.
node.serviceManager.RegisterService(service.SupportExplorer, explorer.New(&node.SelfPeer, node.NodeConfig.GetShardID(), node.Consensus.GetNodeIDs, node.GetBalanceOfAddress))
node.serviceManager.RegisterService(service.SupportExplorer, explorer.New(&node.SelfPeer))
// Register explorer service.
}

Loading…
Cancel
Save