Merge pull request #1601 from rlan35/explorer_fix

Explorer node fix
pull/1603/head
Rongjian Lan 5 years ago committed by GitHub
commit ac1f414793
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 73
      api/service/explorer/service.go
  2. 4
      cmd/harmony/main.go
  3. 60
      node/node_explorer.go

@ -172,6 +172,13 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
to := r.FormValue("to")
pageParam := r.FormValue("page")
offsetParam := r.FormValue("offset")
withSignersParam := r.FormValue("with_signers")
withSigners := false
if withSignersParam == "true" {
withSigners = true
}
data := &Data{
Blocks: []*Block{},
}
@ -236,19 +243,21 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
accountBlocks := s.ReadBlocksFromDB(fromInt, toInt)
curEpoch := int64(-1)
committee := &shard.Committee{}
if withSigners {
if bytes, err := db.Get([]byte(GetCommitteeKey(uint32(s.ShardID), 0))); err == nil {
if err = rlp.DecodeBytes(bytes, committee); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot read committee for new epoch")
}
}
}
for id, accountBlock := range accountBlocks {
if id == 0 || id == len(accountBlocks)-1 || accountBlock == nil {
continue
}
block := NewBlock(accountBlock, id+fromInt-1)
if int64(block.Epoch) > curEpoch {
if bytes, err := db.Get([]byte(GetCommitteeKey(uint32(s.ShardID), block.Epoch))); err == nil {
committee = &shard.Committee{}
if err = rlp.DecodeBytes(bytes, committee); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot read committee for new epoch")
}
} else {
state, err := accountBlock.Header().GetShardState()
if withSigners && int64(block.Epoch) > curEpoch {
if accountBlocks[id-1] != nil {
state, err := accountBlocks[id-1].Header().GetShardState()
if err == nil {
for _, shardCommittee := range state {
if shardCommittee.ShardID == accountBlock.ShardID() {
@ -256,28 +265,32 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
break
}
}
} else {
utils.Logger().Warn().Err(err).Msg("error parsing shard state")
}
}
curEpoch = int64(block.Epoch)
}
pubkeys := make([]*bls.PublicKey, len(committee.NodeList))
for i, validator := range committee.NodeList {
pubkeys[i] = new(bls.PublicKey)
validator.BlsPublicKey.ToLibBLSPublicKey(pubkeys[i])
}
mask, err := bls2.NewMask(pubkeys, nil)
if err == nil && accountBlocks[id+1] != nil {
err = mask.SetMask(accountBlocks[id+1].Header().LastCommitBitmap())
if err == nil {
for _, validator := range committee.NodeList {
oneAddress, err := common2.AddressToBech32(validator.EcdsaAddress)
if err != nil {
continue
}
blsPublicKey := new(bls.PublicKey)
validator.BlsPublicKey.ToLibBLSPublicKey(blsPublicKey)
if ok, err := mask.KeyEnabled(blsPublicKey); err == nil && ok {
block.Signers = append(block.Signers, oneAddress)
if withSigners {
pubkeys := make([]*bls.PublicKey, len(committee.NodeList))
for i, validator := range committee.NodeList {
pubkeys[i] = new(bls.PublicKey)
validator.BlsPublicKey.ToLibBLSPublicKey(pubkeys[i])
}
mask, err := bls2.NewMask(pubkeys, nil)
if err == nil && accountBlocks[id+1] != nil {
err = mask.SetMask(accountBlocks[id+1].Header().LastCommitBitmap())
if err == nil {
for _, validator := range committee.NodeList {
oneAddress, err := common2.AddressToBech32(validator.EcdsaAddress)
if err != nil {
continue
}
blsPublicKey := new(bls.PublicKey)
validator.BlsPublicKey.ToLibBLSPublicKey(blsPublicKey)
if ok, err := mask.KeyEnabled(blsPublicKey); err == nil && ok {
block.Signers = append(block.Signers, oneAddress)
}
}
}
}
@ -316,11 +329,11 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
data.Blocks = append(data.Blocks, block)
}
paginatedBlocks := make([]*Block, 0)
for i := 0; i < offset && i+offset*page < len(data.Blocks); i++ {
paginatedBlocks = append(paginatedBlocks, data.Blocks[i+offset*page])
if offset*page+offset > len(data.Blocks) {
data.Blocks = data.Blocks[offset*page:]
} else {
data.Blocks = data.Blocks[offset*page : offset*page+offset]
}
data.Blocks = paginatedBlocks
}
// GetExplorerTransaction servers /tx end-point.

@ -457,8 +457,8 @@ func main() {
nodeConfig := createGlobalConfig()
currentNode := setupConsensusAndNode(nodeConfig)
if nodeConfig.ShardID != 0 {
utils.GetLogInstance().Info("SupportBeaconSyncing", "shardID", currentNode.Blockchain().ShardID(), "shardID1", nodeConfig.ShardID)
if nodeConfig.ShardID != 0 && currentNode.NodeConfig.Role() != nodeconfig.ExplorerNode {
utils.GetLogInstance().Info("SupportBeaconSyncing", "shardID", currentNode.Blockchain().ShardID(), "shardID", nodeConfig.ShardID)
go currentNode.SupportBeaconSyncing()
}

@ -72,7 +72,7 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
return
}
node.AddNewBlockForExplorer()
node.AddNewBlockForExplorer(block)
node.commitBlockForExplorer(block)
} else if msg.Type == msg_pb.MessageType_PREPARED {
@ -91,7 +91,7 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
msgs := node.Consensus.PbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_COMMITTED, blockObj.NumberU64(), blockObj.Hash())
// If found, then add the new block into blockchain db.
if len(msgs) > 0 {
node.AddNewBlockForExplorer()
node.AddNewBlockForExplorer(blockObj)
node.commitBlockForExplorer(blockObj)
}
}
@ -99,41 +99,29 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
}
// AddNewBlockForExplorer add new block for explorer.
func (node *Node) AddNewBlockForExplorer() {
utils.Logger().Debug().Msg("[Explorer] Add new block for explorer")
// Search for the next block in PbftLog and commit the block into blockchain for explorer node.
for {
blocks := node.Consensus.PbftLog.GetBlocksByNumber(node.Blockchain().CurrentBlock().NumberU64() + 1)
if len(blocks) == 0 {
break
} else {
if len(blocks) > 1 {
utils.Logger().Error().Msg("[Explorer] We should have not received more than one block with the same block height.")
}
utils.Logger().Debug().Uint64("blockHeight", blocks[0].NumberU64()).Msg("Adding new block for explorer node")
if err := node.AddNewBlock(blocks[0]); err == nil {
if core.IsEpochLastBlock(blocks[0]) {
node.Consensus.UpdateConsensusInformation()
}
// Clean up the blocks to avoid OOM.
node.Consensus.PbftLog.DeleteBlockByNumber(blocks[0].NumberU64())
// Do dump all blocks from state syncing for explorer one time
// TODO: some blocks can be dumped before state syncing finished.
// And they would be dumped again here. Please fix it.
once.Do(func() {
utils.Logger().Info().Int64("starting height", int64(blocks[0].NumberU64())-1).
Msg("[Explorer] Populating explorer data from state synced blocks")
go func() {
for blockHeight := int64(blocks[0].NumberU64()) - 1; blockHeight >= 0; blockHeight-- {
explorer.GetStorageInstance(node.SelfPeer.IP, node.SelfPeer.Port, true).Dump(
node.Blockchain().GetBlockByNumber(uint64(blockHeight)), uint64(blockHeight))
}
}()
})
} else {
utils.Logger().Error().Err(err).Msg("[Explorer] Error when adding new block for explorer node")
}
func (node *Node) AddNewBlockForExplorer(block *types.Block) {
utils.Logger().Debug().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node")
if err := node.AddNewBlock(block); err == nil {
if core.IsEpochLastBlock(block) {
node.Consensus.UpdateConsensusInformation()
}
// Clean up the blocks to avoid OOM.
node.Consensus.PbftLog.DeleteBlockByNumber(block.NumberU64())
// Do dump all blocks from state syncing for explorer one time
// TODO: some blocks can be dumped before state syncing finished.
// And they would be dumped again here. Please fix it.
once.Do(func() {
utils.Logger().Info().Int64("starting height", int64(block.NumberU64())-1).
Msg("[Explorer] Populating explorer data from state synced blocks")
go func() {
for blockHeight := int64(block.NumberU64()) - 1; blockHeight >= 0; blockHeight-- {
explorer.GetStorageInstance(node.SelfPeer.IP, node.SelfPeer.Port, true).Dump(
node.Blockchain().GetBlockByNumber(uint64(blockHeight)), uint64(blockHeight))
}
}()
})
} else {
utils.Logger().Error().Err(err).Msg("[Explorer] Error when adding new block for explorer node")
}
}

Loading…
Cancel
Save