Merge pull request #1992 from flicker-harmony/pr_remove_committee

Remove committee in explorer service
pull/1999/head
flicker-harmony 5 years ago committed by GitHub
commit 23b42e5747
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 241
      api/service/explorer/service.go
  2. 24
      api/service/explorer/storage.go
  3. 39
      api/service/explorer/storage_test.go
  4. 5
      api/service/explorer/structs.go
  5. 4
      cmd/harmony/main.go
  6. 27
      node/node_explorer.go

@ -18,16 +18,12 @@ import (
"github.com/gorilla/mux"
libp2p_peer "github.com/libp2p/go-libp2p-peer"
"github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/types"
bls2 "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/bech32"
common2 "github.com/harmony-one/harmony/internal/common"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
)
// Constants for explorer service.
@ -137,10 +133,6 @@ func (s *Service) Run() *http.Server {
s.router.Path("/shard").Queries("id", "{[0-9]*?}").HandlerFunc(s.GetExplorerShard).Methods("GET")
s.router.Path("/shard").HandlerFunc(s.GetExplorerShard)
// Set up router for committee.
s.router.Path("/committee").Queries("shard_id", "{[0-9]*?}", "epoch", "{[0-9]*?}").HandlerFunc(s.GetExplorerCommittee).Methods("GET")
s.router.Path("/committee").HandlerFunc(s.GetExplorerCommittee).Methods("GET")
// Do serving now.
utils.Logger().Info().Str("port", GetExplorerPort(s.Port)).Msg("Listening")
server := &http.Server{Addr: addr, Handler: s.router}
@ -183,11 +175,6 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
to := r.FormValue("to")
pageParam := r.FormValue("page")
offsetParam := r.FormValue("offset")
withSignersParam := r.FormValue("with_signers")
withSigners := false
if withSignersParam == "true" {
withSigners = true
}
order := r.FormValue("order")
data := &Data{
Blocks: []*Block{},
@ -251,60 +238,11 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
}
accountBlocks := s.ReadBlocksFromDB(fromInt, toInt)
curEpoch := int64(-1)
committee := &shard.Committee{}
if withSigners {
if bytes, err := db.Get([]byte(GetCommitteeKey(uint32(s.ShardID), 0))); err == nil {
if err = rlp.DecodeBytes(bytes, committee); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot read committee for new epoch")
}
}
}
for id, accountBlock := range accountBlocks {
if id == 0 || id == len(accountBlocks)-1 || accountBlock == nil {
continue
}
block := NewBlock(accountBlock, id+fromInt-1)
if withSigners && int64(block.Epoch) > curEpoch {
if accountBlocks[id-1] != nil {
state, err := accountBlocks[id-1].Header().GetShardState()
if err == nil {
for _, shardCommittee := range state.Shards {
if shardCommittee.ShardID == accountBlock.ShardID() {
committee = &shardCommittee
break
}
}
} else {
utils.Logger().Warn().Err(err).Msg("error parsing shard state")
}
}
curEpoch = int64(block.Epoch)
}
if withSigners {
pubkeys := make([]*bls.PublicKey, len(committee.Slots))
for i, validator := range committee.Slots {
pubkeys[i] = new(bls.PublicKey)
validator.BlsPublicKey.ToLibBLSPublicKey(pubkeys[i])
}
mask, err := bls2.NewMask(pubkeys, nil)
if err == nil && accountBlocks[id+1] != nil {
err = mask.SetMask(accountBlocks[id+1].Header().LastCommitBitmap())
if err == nil {
for _, validator := range committee.Slots {
oneAddress, err := common2.AddressToBech32(validator.EcdsaAddress)
if err != nil {
continue
}
blsPublicKey := new(bls.PublicKey)
validator.BlsPublicKey.ToLibBLSPublicKey(blsPublicKey)
if ok, err := mask.KeyEnabled(blsPublicKey); err == nil && ok {
block.Signers = append(block.Signers, oneAddress)
}
}
}
}
}
// Populate transactions
for _, tx := range accountBlock.Transactions() {
transaction := GetTransaction(tx, accountBlock)
@ -357,7 +295,7 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
}
// GetExplorerBlocks rpc end-point.
func (s *ServiceAPI) GetExplorerBlocks(ctx context.Context, from, to, page, offset int, withSigners bool, order string) ([]*Block, error) {
func (s *ServiceAPI) GetExplorerBlocks(ctx context.Context, from, to, page, offset int, order string) ([]*Block, error) {
if offset == 0 {
offset = paginationOffset
}
@ -374,58 +312,11 @@ func (s *ServiceAPI) GetExplorerBlocks(ctx context.Context, from, to, page, offs
}
blocks := make([]*Block, 0)
accountBlocks := s.Service.ReadBlocksFromDB(from, to)
curEpoch := int64(-1)
committee := &shard.Committee{}
if withSigners {
if bytes, err := db.Get([]byte(GetCommitteeKey(uint32(s.Service.ShardID), 0))); err == nil {
if err = rlp.DecodeBytes(bytes, committee); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot read committee for new epoch")
}
}
}
for id, accountBlock := range accountBlocks {
if id == 0 || id == len(accountBlocks)-1 || accountBlock == nil {
continue
}
block := NewBlock(accountBlock, id+from-1)
if withSigners && int64(block.Epoch) > curEpoch {
if accountBlocks[id-1] != nil {
state, err := accountBlocks[id-1].Header().GetShardState()
if err == nil {
for _, shardCommittee := range state.Shards {
if shardCommittee.ShardID == accountBlock.ShardID() {
committee = &shardCommittee
break
}
}
}
}
curEpoch = int64(block.Epoch)
}
if withSigners {
pubkeys := make([]*bls.PublicKey, len(committee.Slots))
for i, validator := range committee.Slots {
pubkeys[i] = new(bls.PublicKey)
validator.BlsPublicKey.ToLibBLSPublicKey(pubkeys[i])
}
mask, err := bls2.NewMask(pubkeys, nil)
if err == nil && accountBlocks[id+1] != nil {
err = mask.SetMask(accountBlocks[id+1].Header().LastCommitBitmap())
if err == nil {
for _, validator := range committee.Slots {
oneAddress, err := common2.AddressToBech32(validator.EcdsaAddress)
if err != nil {
continue
}
blsPublicKey := new(bls.PublicKey)
validator.BlsPublicKey.ToLibBLSPublicKey(blsPublicKey)
if ok, err := mask.KeyEnabled(blsPublicKey); err == nil && ok {
block.Signers = append(block.Signers, oneAddress)
}
}
}
}
}
// Populate transactions
for _, tx := range accountBlock.Transactions() {
transaction := GetTransaction(tx, accountBlock)
@ -530,136 +421,6 @@ func (s *ServiceAPI) GetExplorerTransaction(ctx context.Context, id string) (*Tr
return tx, nil
}
// GetExplorerCommittee servers /comittee end-point.
func (s *Service) GetExplorerCommittee(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
shardIDRead := r.FormValue("shard_id")
epochRead := r.FormValue("epoch")
shardID := uint64(0)
epoch := uint64(0)
var err error
if shardIDRead != "" {
shardID, err = strconv.ParseUint(shardIDRead, 10, 32)
if err != nil {
utils.Logger().Warn().Err(err).Msg("cannot read shard id")
w.WriteHeader(http.StatusBadRequest)
return
}
}
if epochRead != "" {
epoch, err = strconv.ParseUint(epochRead, 10, 64)
if err != nil {
utils.Logger().Warn().Err(err).Msg("cannot read shard epoch")
w.WriteHeader(http.StatusBadRequest)
return
}
}
if s.ShardID != uint32(shardID) {
utils.Logger().Warn().Msg("incorrect shard id")
w.WriteHeader(http.StatusBadRequest)
return
}
// fetch current epoch if epoch is 0
db := s.Storage.GetDB()
if epoch == 0 {
bytes, err := db.Get([]byte(BlockHeightKey))
blockHeight, err := strconv.Atoi(string(bytes))
if err != nil {
utils.Logger().Warn().Err(err).Msg("cannot decode block height from DB")
w.WriteHeader(http.StatusInternalServerError)
return
}
key := GetBlockKey(blockHeight)
data, err := db.Get([]byte(key))
block := new(types.Block)
if rlp.DecodeBytes(data, block) != nil {
utils.Logger().Warn().Err(err).Msg("cannot get block from db")
w.WriteHeader(http.StatusInternalServerError)
return
}
epoch = block.Epoch().Uint64()
}
bytes, err := db.Get([]byte(GetCommitteeKey(uint32(shardID), epoch)))
if err != nil {
utils.Logger().Warn().Err(err).Msg("cannot read committee")
w.WriteHeader(http.StatusInternalServerError)
return
}
committee := &shard.Committee{}
if err := rlp.DecodeBytes(bytes, committee); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot decode committee data from DB")
w.WriteHeader(http.StatusInternalServerError)
return
}
validators := &Committee{}
for _, validator := range committee.Slots {
validatorBalance := big.NewInt(0)
validatorBalance, err := s.GetAccountBalance(validator.EcdsaAddress)
if err != nil {
continue
}
oneAddress, err := common2.AddressToBech32(validator.EcdsaAddress)
if err != nil {
continue
}
validators.Validators = append(validators.Validators, &Validator{Address: oneAddress, Balance: validatorBalance})
}
if err := json.NewEncoder(w).Encode(validators); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot JSON-encode committee")
w.WriteHeader(http.StatusInternalServerError)
}
}
// GetExplorerCommittee rpc end-point.
func (s *ServiceAPI) GetExplorerCommittee(ctx context.Context, shardID uint32, epoch uint64) (*Committee, error) {
if s.Service.ShardID != uint32(shardID) {
utils.Logger().Warn().Msg("incorrect shard id")
return nil, nil
}
// fetch current epoch if epoch is 0
db := s.Service.Storage.GetDB()
if epoch == 0 {
bytes, err := db.Get([]byte(BlockHeightKey))
blockHeight, err := strconv.Atoi(string(bytes))
if err != nil {
utils.Logger().Warn().Err(err).Msg("cannot decode block height from DB")
return nil, err
}
key := GetBlockKey(blockHeight)
data, err := db.Get([]byte(key))
block := new(types.Block)
if rlp.DecodeBytes(data, block) != nil {
utils.Logger().Warn().Err(err).Msg("cannot get block from db")
return nil, err
}
epoch = block.Epoch().Uint64()
}
bytes, err := db.Get([]byte(GetCommitteeKey(uint32(shardID), epoch)))
if err != nil {
utils.Logger().Warn().Err(err).Msg("cannot read committee")
return nil, err
}
committee := &shard.Committee{}
if err := rlp.DecodeBytes(bytes, committee); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot decode committee data from DB")
return nil, err
}
validators := &Committee{}
for _, validator := range committee.Slots {
validatorBalance := big.NewInt(0)
validatorBalance, err := s.Service.GetAccountBalance(validator.EcdsaAddress)
if err != nil {
continue
}
oneAddress, err := common2.AddressToBech32(validator.EcdsaAddress)
if err != nil {
continue
}
validators.Validators = append(validators.Validators, &Validator{Address: oneAddress, Balance: validatorBalance})
}
return validators, nil
}
// GetExplorerAddress serves /address end-point.
func (s *Service) GetExplorerAddress(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")

@ -11,7 +11,6 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
)
// Constants for storage.
@ -21,7 +20,6 @@ const (
BlockPrefix = "b"
TXPrefix = "tx"
AddressPrefix = "ad"
CommitteePrefix = "cp"
)
// GetBlockInfoKey ...
@ -44,11 +42,6 @@ func GetTXKey(hash string) string {
return fmt.Sprintf("%s_%s", TXPrefix, hash)
}
// GetCommitteeKey ...
func GetCommitteeKey(shardID uint32, epoch uint64) string {
return fmt.Sprintf("%s_%d_%d", CommitteePrefix, shardID, epoch)
}
var storage *Storage
var once sync.Once
@ -120,23 +113,6 @@ func (storage *Storage) Dump(block *types.Block, height uint64) {
}
}
// DumpCommittee commits validators for shardNum and epoch.
func (storage *Storage) DumpCommittee(shardID uint32, epoch uint64, committee shard.Committee) error {
batch := storage.db.NewBatch()
// Store committees.
committeeData, err := rlp.EncodeToBytes(committee)
if err != nil {
return err
}
if err := batch.Put([]byte(GetCommitteeKey(shardID, epoch)), committeeData); err != nil {
return err
}
if err := batch.Write(); err != nil {
return err
}
return nil
}
// UpdateTXStorage ...
func (storage *Storage) UpdateTXStorage(batch ethdb.Batch, explorerTransaction *Transaction, tx *types.Transaction) {
if data, err := rlp.EncodeToBytes(explorerTransaction); err == nil {

@ -8,12 +8,9 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/bls/ffi/go/bls"
blockfactory "github.com/harmony-one/harmony/block/factory"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/shard"
"github.com/stretchr/testify/assert"
)
@ -37,11 +34,6 @@ func TestGetTXKey(t *testing.T) {
assert.Equal(t, GetTXKey("abcd"), "tx_abcd", "error")
}
// Test for GetCommitteeKey
func TestGetCommitteeKey(t *testing.T) {
assert.Equal(t, GetCommitteeKey(uint32(0), uint64(0)), "cp_0_0", "error")
}
func TestInit(t *testing.T) {
ins := GetStorageInstance("1.1.1.1", "3333", true)
if err := ins.GetDB().Put([]byte{1}, []byte{2}); err != nil {
@ -79,37 +71,6 @@ func TestDump(t *testing.T) {
assert.Equal(t, bytes.Compare(data, blockData), 0, "should be equal")
}
func TestDumpCommittee(t *testing.T) {
blsPubKey1 := new(bls.PublicKey)
blsPubKey2 := new(bls.PublicKey)
err := blsPubKey1.DeserializeHexStr("1c1fb28d2de96e82c3d9b4917eb54412517e2763112a3164862a6ed627ac62e87ce274bb4ea36e6a61fb66a15c263a06")
assert.Nil(t, err, "should be nil")
err = blsPubKey2.DeserializeHexStr("02c8ff0b88f313717bc3a627d2f8bb172ba3ad3bb9ba3ecb8eed4b7c878653d3d4faf769876c528b73f343967f74a917")
assert.Nil(t, err, "should be nil")
BlsPublicKey1 := new(shard.BlsPublicKey)
BlsPublicKey2 := new(shard.BlsPublicKey)
BlsPublicKey1.FromLibBLSPublicKey(blsPubKey1)
BlsPublicKey2.FromLibBLSPublicKey(blsPubKey2)
nodeID1 := shard.Slot{EcdsaAddress: common.HexToAddress("52789f18a342da8023cc401e5d2b14a6b710fba9"), BlsPublicKey: *BlsPublicKey1}
nodeID2 := shard.Slot{EcdsaAddress: common.HexToAddress("7c41e0668b551f4f902cfaec05b5bdca68b124ce"), BlsPublicKey: *BlsPublicKey2}
nodeIDList := []shard.Slot{nodeID1, nodeID2}
committee := shard.Committee{ShardID: uint32(0), Slots: nodeIDList}
shardID := uint32(0)
epoch := uint64(0)
ins := GetStorageInstance("1.1.1.1", "3333", true)
err = ins.DumpCommittee(shardID, epoch, committee)
if err != nil {
assert.Nilf(t, err, "should be nil, but %s", err.Error())
}
db := ins.GetDB()
data, err := db.Get([]byte(GetCommitteeKey(shardID, epoch)))
assert.Nil(t, err, "should be nil")
committeeData, err := rlp.EncodeToBytes(committee)
assert.Nil(t, err, "should be nil")
assert.Equal(t, bytes.Compare(data, committeeData), 0, "should be equal")
}
func TestUpdateAddressStorage(t *testing.T) {
tx1 := types.NewTransaction(1, common.BytesToAddress([]byte{0x11}), 0, big.NewInt(111), 1111, big.NewInt(11111), []byte{0x11, 0x11, 0x11})
tx2 := types.NewTransaction(2, common.BytesToAddress([]byte{0x22}), 0, big.NewInt(222), 2222, big.NewInt(22222), []byte{0x22, 0x22, 0x22})

@ -35,11 +35,6 @@ type Address struct {
TXs []*Transaction `json:"txs"`
}
// Committee contains list of node validators of a particular shard and epoch.
type Committee struct {
Validators []*Validator `json:"validators"`
}
// Validator contains harmony validator node address and its balance.
type Validator struct {
Address string `json:"address"`

@ -558,10 +558,6 @@ func main() {
if currentNode.NodeConfig.GetMetricsFlag() {
go currentNode.CollectMetrics()
}
// Commit committtee if node role is explorer
if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode {
go currentNode.CommitCommittee()
}
currentNode.StartServer()
}

@ -11,7 +11,6 @@ import (
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/api/service/explorer"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
@ -177,29 +176,3 @@ func (node *Node) GetTransactionsHistory(address, txType, order string) ([]commo
}
return hashes, nil
}
// CommitCommittee commits committee with shard id and epoch to explorer service.
func (node *Node) CommitCommittee() {
events := make(chan core.ChainEvent)
node.Blockchain().SubscribeChainEvent(events)
for event := range events {
curBlock := event.Block
if curBlock == nil {
continue
}
state, err := node.Blockchain().ReadShardState(curBlock.Epoch())
if err != nil {
utils.Logger().Error().Err(err).Msg("[Explorer] Error reading shard state")
continue
}
for _, committee := range state.Shards {
if committee.ShardID == curBlock.ShardID() {
utils.Logger().Debug().Msg("[Explorer] Dumping committee")
err := explorer.GetStorageInstance(node.SelfPeer.IP, node.SelfPeer.Port, false).DumpCommittee(curBlock.ShardID(), curBlock.Epoch().Uint64(), committee)
if err != nil {
utils.Logger().Warn().Err(err).Msgf("[Explorer] Error dumping committee for block %d", curBlock.NumberU64())
}
}
}
}
}

Loading…
Cancel
Save