Add support for explorer node

pull/1042/head
Rongjian Lan 5 years ago
parent d234697b02
commit 44aa643fa5
  1. 2
      api/service/explorer/storage.go
  2. 4
      api/service/explorer/storage_test.go
  3. 33
      cmd/harmony/main.go
  4. 6
      consensus/consensus.go
  5. 4
      consensus/consensus_service.go
  6. 52
      consensus/consensus_v2.go
  7. 4
      consensus/consensus_viewchange_msg.go
  8. 16
      consensus/view_change.go
  9. 60
      drand/drand.go
  10. 79
      drand/drand_test.go
  11. 3
      internal/configs/node/config.go
  12. 72
      node/node_handler.go
  13. 20
      node/service_setup.go
  14. 1
      test/configs/beaconchain20.txt
  15. 1
      test/deploy.sh

@ -84,7 +84,7 @@ func (storage *Storage) GetDB() *ethdb.LDBDatabase {
}
// Dump extracts information from block and index them into lvdb for explorer.
func (storage *Storage) Dump(block *types.Block, height uint32) {
func (storage *Storage) Dump(block *types.Block, height uint64) {
utils.GetLogInstance().Info("Dumping block ", "block height", height)
if block == nil {
return

@ -50,7 +50,7 @@ func TestDump(t *testing.T) {
block := types.NewBlock(&types.Header{Number: big.NewInt(314)}, txs, nil)
ins := GetStorageInstance("1.1.1.1", "3333", true)
ins.Dump(block, uint32(1))
ins.Dump(block, uint64(1))
db := ins.GetDB()
res, err := db.Get([]byte(BlockHeightKey))
@ -77,7 +77,7 @@ func TestUpdateAddressStorage(t *testing.T) {
block := types.NewBlock(&types.Header{Number: big.NewInt(314)}, txs, nil)
ins := GetStorageInstance("1.1.1.1", "3333", true)
ins.Dump(block, uint32(1))
ins.Dump(block, uint64(1))
db := ins.GetDB()
res, err := db.Get([]byte(BlockHeightKey))

@ -97,6 +97,8 @@ var (
isArchival = flag.Bool("is_archival", false, "true means this node is a archival node")
// delayCommit is the commit-delay timer, used by Harmony nodes
delayCommit = flag.String("delay_commit", "0ms", "how long to delay sending commit messages in consensus, ex: 500ms, 1s")
// isExplorer indicates this node is a node to serve explorer
isExplorer = flag.Bool("is_explorer", false, "true means this node is a node to serve explorer")
// isNewNode indicates this node is a new node
isNewNode = flag.Bool("is_newnode", false, "true means this node is a new node")
shardID = flag.Int("shard_id", -1, "the shard ID of this node")
@ -162,6 +164,9 @@ func initSetup() {
utils.BootNodes = bootNodeAddrs
}
fmt.Println(*isGenesis)
fmt.Println(*isExplorer)
if !*isExplorer { // Explorer node doesn't need the following setup
ks = hmykey.GetHmyKeyStore()
allAccounts := ks.Accounts()
@ -203,13 +208,16 @@ func initSetup() {
os.Exit(3)
}
}
hmykey.SetHmyPass(myPass)
} else {
genesisAccount = &genesis.DeployAccount{}
genesisAccount.ShardID = uint32(*shardID)
}
// Set up manual call for garbage collection.
if *enableGC {
memprofiling.MaybeCallGCPeriodically()
}
hmykey.SetHmyPass(myPass)
}
func createGlobalConfig() *nodeconfig.ConfigType {
@ -218,6 +226,7 @@ func createGlobalConfig() *nodeconfig.ConfigType {
nodeConfig := nodeconfig.GetDefaultConfig()
if !*isExplorer { // Explorer node doesn't need the following setup
// Specified Shard ID override calculated Shard ID
if *shardID >= 0 {
utils.GetLogInstance().Info("ShardID Override", "original", genesisAccount.ShardID, "override", *shardID)
@ -256,18 +265,13 @@ func createGlobalConfig() *nodeconfig.ConfigType {
*/
}
// P2p private key is used for secure message transfer between p2p nodes.
nodeConfig.P2pPriKey, _, err = utils.LoadKeyFromFile(*keyFile)
if err != nil {
panic(err)
}
// Consensus keys are the BLS12-381 keys used to sign consensus messages
nodeConfig.ConsensusPriKey, nodeConfig.ConsensusPubKey = consensusPriKey, consensusPriKey.GetPublicKey()
if nodeConfig.ConsensusPriKey == nil || nodeConfig.ConsensusPubKey == nil {
panic(fmt.Errorf("Failed to initialize BLS keys: %s", consensusPriKey.SerializeToHexStr()))
}
// Key Setup ================= [End]
}
nodeConfig.SelfPeer = p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: nodeConfig.ConsensusPubKey}
@ -278,6 +282,12 @@ func createGlobalConfig() *nodeconfig.ConfigType {
nodeConfig.StringRole = "validator"
}
// P2p private key is used for secure message transfer between p2p nodes.
nodeConfig.P2pPriKey, _, err = utils.LoadKeyFromFile(*keyFile)
if err != nil {
panic(err)
}
nodeConfig.Host, err = p2pimpl.NewHost(&nodeConfig.SelfPeer, nodeConfig.P2pPriKey)
if *logConn {
nodeConfig.Host.GetP2PHost().Network().Notify(utils.ConnLogger)
@ -371,6 +381,11 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID)))
currentNode.NodeConfig.SetClientGroupID(p2p.NewClientGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID)))
}
} else if *isExplorer {
currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode)
currentNode.NodeConfig.SetIsLeader(false)
currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(*shardID)))
currentNode.NodeConfig.SetClientGroupID(p2p.NewClientGroupIDByShardID(p2p.ShardID(*shardID)))
} else if nodeConfig.StringRole == "leader" {
currentNode.NodeConfig.SetRole(nodeconfig.ShardLeader)
currentNode.NodeConfig.SetIsLeader(true)
@ -427,6 +442,8 @@ func main() {
utils.SetLogContext(*port, *ip)
utils.SetLogVerbosity(log.Lvl(*verbosity))
fmt.Println(os.Args)
fmt.Println(*minPeers)
initSetup()
nodeConfig := createGlobalConfig()
initLogFile(*logFolder, nodeConfig.StringRole, *ip, *port, *onlyLogTps)

@ -31,8 +31,8 @@ var (
// Consensus is the main struct with all states and data related to consensus process.
type Consensus struct {
// pbftLog stores the pbft messages and blocks during PBFT process
pbftLog *PbftLog
// PbftLog stores the pbft messages and blocks during PBFT process
PbftLog *PbftLog
// phase: different phase of PBFT protocol: pre-prepare, prepare, commit, finish etc
phase PbftPhase
// mode: indicate a node is in normal or viewchanging mode
@ -210,7 +210,7 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe
consensus.blockNumLowChan = make(chan struct{})
// pbft related
consensus.pbftLog = NewPbftLog()
consensus.PbftLog = NewPbftLog()
consensus.phase = Announce
consensus.mode = PbftMode{mode: Normal}
// pbft timeout

@ -458,8 +458,8 @@ func (consensus *Consensus) SetBlockNum(blockNum uint64) {
consensus.blockNum = blockNum
}
// read the payload for signature and bitmap; offset is the beginning position of reading
func (consensus *Consensus) readSignatureBitmapPayload(recvPayload []byte, offset int) (*bls.Sign, *bls_cosi.Mask, error) {
// ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading
func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls.Sign, *bls_cosi.Mask, error) {
if offset+96 > len(recvPayload) {
return nil, nil, errors.New("payload not have enough length")
}

@ -10,7 +10,6 @@ import (
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/api/service/explorer"
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
@ -80,7 +79,7 @@ func (consensus *Consensus) tryAnnounce(block *types.Block) {
msgToSend := consensus.constructAnnounceMessage()
consensus.switchPhase(Prepare, true)
// save announce message to pbftLog
// save announce message to PbftLog
msgPayload, _ := proto.GetConsensusMessagePayload(msgToSend)
msg := &msg_pb.Message{}
_ = protobuf.Unmarshal(msgPayload, msg)
@ -90,8 +89,8 @@ func (consensus *Consensus) tryAnnounce(block *types.Block) {
return
}
consensus.pbftLog.AddMessage(pbftMsg)
consensus.pbftLog.AddBlock(block)
consensus.PbftLog.AddMessage(pbftMsg)
consensus.PbftLog.AddBlock(block)
// Leader sign the block hash itself
consensus.prepareSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(consensus.blockHash[:])
@ -164,7 +163,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
return
}
//blockObj.Logger(consensus.getLogger()).Debug("received announce", "viewID", recvMsg.ViewID, "msgBlockNum", recvMsg.BlockNum)
logMsgs := consensus.pbftLog.GetMessagesByTypeSeqView(msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID)
logMsgs := consensus.PbftLog.GetMessagesByTypeSeqView(msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID)
if len(logMsgs) > 0 {
if logMsgs[0].BlockHash != blockObj.Header().Hash() {
consensus.getLogger().Debug("onAnnounce leader is malicious", "leaderKey", consensus.LeaderPubKey)
@ -177,8 +176,8 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
consensus.block = blockPayload
consensus.blockHash = recvMsg.BlockHash
consensus.getLogger().Debug("announce block added", "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
consensus.pbftLog.AddMessage(recvMsg)
consensus.pbftLog.AddBlock(&blockObj)
consensus.PbftLog.AddMessage(recvMsg)
consensus.PbftLog.AddBlock(&blockObj)
// we have already added message and block, skip check viewID and send prepare message if is in ViewChanging mode
if consensus.mode.Mode() == ViewChanging {
@ -203,12 +202,12 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
func (consensus *Consensus) tryPrepare(blockHash common.Hash) {
var hash common.Hash
copy(hash[:], blockHash[:])
block := consensus.pbftLog.GetBlockByHash(hash)
block := consensus.PbftLog.GetBlockByHash(hash)
if block == nil {
return
}
if consensus.blockNum != block.NumberU64() || !consensus.pbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, hash) {
if consensus.blockNum != block.NumberU64() || !consensus.PbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, hash) {
consensus.getLogger().Debug("blockNum or announce message not match")
return
}
@ -253,7 +252,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
return
}
if !consensus.pbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) {
if !consensus.PbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) {
consensus.getLogger().Debug("onPrepare no matching announce message", "blockHash", recvMsg.BlockHash)
return
}
@ -312,7 +311,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
consensus.getLogger().Warn("onPrepare unable to parse pbft message", "error", err)
return
}
consensus.pbftLog.AddMessage(pbftMsg)
consensus.PbftLog.AddMessage(pbftMsg)
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn("cannot send prepared message")
@ -363,9 +362,9 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
}
blockHash := recvMsg.BlockHash
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0)
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil {
consensus.getLogger().Error("readSignatureBitmapPayload failed", "error", err)
consensus.getLogger().Error("ReadSignatureBitmapPayload failed", "error", err)
return
}
@ -386,7 +385,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
}
consensus.getLogger().Debug("prepared message added", "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
consensus.pbftLog.AddMessage(recvMsg)
consensus.PbftLog.AddMessage(recvMsg)
if consensus.mode.Mode() == ViewChanging {
consensus.getLogger().Debug("viewchanging mode just exist after viewchanging")
@ -457,12 +456,12 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
return
}
if !consensus.pbftLog.HasMatchingAnnounce(consensus.blockNum, recvMsg.BlockHash) {
if !consensus.PbftLog.HasMatchingAnnounce(consensus.blockNum, recvMsg.BlockHash) {
consensus.getLogger().Debug("cannot find matching blockhash")
return
}
if !consensus.pbftLog.HasMatchingPrepared(consensus.blockNum, recvMsg.BlockHash) {
if !consensus.PbftLog.HasMatchingPrepared(consensus.blockNum, recvMsg.BlockHash) {
consensus.getLogger().Debug("cannot find matching prepared message", "blockHash", recvMsg.BlockHash)
return
}
@ -558,9 +557,6 @@ func (consensus *Consensus) finalizeCommits() {
consensus.reportMetrics(blockObj)
// Dump new block into level db.
explorer.GetStorageInstance(consensus.leader.IP, consensus.leader.Port, true).Dump(&blockObj, consensus.viewID)
// Reset state to Finished, and clear other data.
consensus.ResetState()
consensus.viewID++
@ -611,9 +607,9 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
return
}
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0)
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil {
consensus.getLogger().Error("readSignatureBitmapPayload failed", "error", err)
consensus.getLogger().Error("ReadSignatureBitmapPayload failed", "error", err)
return
}
@ -633,7 +629,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.aggregatedCommitSig = aggSig
consensus.commitBitmap = mask
consensus.getLogger().Debug("committed message added", "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
consensus.pbftLog.AddMessage(recvMsg)
consensus.PbftLog.AddMessage(recvMsg)
if recvMsg.BlockNum-consensus.blockNum > consensusBlockNumBuffer {
consensus.getLogger().Debug("onCommitted out of sync", "msgBlock", recvMsg.BlockNum)
@ -677,7 +673,7 @@ func (consensus *Consensus) tryCatchup() {
// }
currentBlockNum := consensus.blockNum
for {
msgs := consensus.pbftLog.GetMessagesByTypeSeq(msg_pb.MessageType_COMMITTED, consensus.blockNum)
msgs := consensus.PbftLog.GetMessagesByTypeSeq(msg_pb.MessageType_COMMITTED, consensus.blockNum)
if len(msgs) == 0 {
break
}
@ -686,7 +682,7 @@ func (consensus *Consensus) tryCatchup() {
}
consensus.getLogger().Info("committed message found")
block := consensus.pbftLog.GetBlockByHash(msgs[0].BlockHash)
block := consensus.PbftLog.GetBlockByHash(msgs[0].BlockHash)
if block == nil {
break
}
@ -697,8 +693,8 @@ func (consensus *Consensus) tryCatchup() {
}
consensus.getLogger().Info("block found to commit")
preparedMsgs := consensus.pbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, msgs[0].BlockNum, msgs[0].BlockHash)
msg := consensus.pbftLog.FindMessageByMaxViewID(preparedMsgs)
preparedMsgs := consensus.PbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, msgs[0].BlockNum, msgs[0].BlockHash)
msg := consensus.PbftLog.FindMessageByMaxViewID(preparedMsgs)
if msg == nil {
break
}
@ -753,8 +749,8 @@ func (consensus *Consensus) tryCatchup() {
consensus.consensusTimeout[timeoutViewChange].Stop()
}
// clean up old log
consensus.pbftLog.DeleteBlocksLessThan(consensus.blockNum)
consensus.pbftLog.DeleteMessagesLessThan(consensus.blockNum)
consensus.PbftLog.DeleteBlocksLessThan(consensus.blockNum)
consensus.PbftLog.DeleteMessagesLessThan(consensus.blockNum)
}
// Start waits for the next new block and run consensus

@ -28,8 +28,8 @@ func (consensus *Consensus) constructViewChangeMessage() []byte {
// next leader key already updated
vcMsg.LeaderPubkey = consensus.LeaderPubKey.Serialize()
preparedMsgs := consensus.pbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, consensus.blockNum, consensus.blockHash)
preparedMsg := consensus.pbftLog.FindMessageByMaxViewID(preparedMsgs)
preparedMsgs := consensus.PbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, consensus.blockNum, consensus.blockHash)
preparedMsg := consensus.PbftLog.FindMessageByMaxViewID(preparedMsgs)
var msgToSign []byte
if preparedMsg == nil {

@ -224,8 +224,8 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
_, ok2 := consensus.bhpSigs[consensus.PubKey.SerializeToHexStr()]
if !(ok1 || ok2) {
// add own signature for newview message
preparedMsgs := consensus.pbftLog.GetMessagesByTypeSeq(msg_pb.MessageType_PREPARED, recvMsg.BlockNum)
preparedMsg := consensus.pbftLog.FindMessageByMaxViewID(preparedMsgs)
preparedMsgs := consensus.PbftLog.GetMessagesByTypeSeq(msg_pb.MessageType_PREPARED, recvMsg.BlockNum)
preparedMsg := consensus.PbftLog.FindMessageByMaxViewID(preparedMsgs)
if preparedMsg == nil {
sign := consensus.priKey.SignHash(NIL)
consensus.nilSigs[consensus.PubKey.SerializeToHexStr()] = sign
@ -282,7 +282,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
return
}
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32)
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil {
consensus.getLogger().Error("m1 recvMsg payload read error", "error", err)
return
@ -308,7 +308,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkey = consensus.PubKey
consensus.getLogger().Info("new leader prepared message added")
consensus.pbftLog.AddMessage(&preparedMsg)
consensus.PbftLog.AddMessage(&preparedMsg)
}
}
consensus.bhpSigs[senderKey.SerializeToHexStr()] = recvMsg.ViewchangeSig
@ -341,9 +341,9 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
} else {
consensus.switchPhase(Commit, true)
copy(consensus.blockHash[:], consensus.m1Payload[:32])
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32)
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil {
consensus.getLogger().Error("readSignatureBitmapPayload fail", "error", err)
consensus.getLogger().Error("ReadSignatureBitmapPayload fail", "error", err)
return
}
consensus.aggregatedPrepareSig = aggSig
@ -435,7 +435,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
}
// m1 is not empty, check it's valid
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32)
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil {
consensus.getLogger().Error("unable to read signature/bitmap", "error", err)
return
@ -455,7 +455,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
preparedMsg.Payload = make([]byte, len(recvMsg.Payload)-32)
copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkey = senderKey
consensus.pbftLog.AddMessage(&preparedMsg)
consensus.PbftLog.AddMessage(&preparedMsg)
}
// newView message verified success, override my state

@ -4,10 +4,8 @@ import (
"errors"
"sync"
"github.com/harmony-one/harmony/crypto/hash"
common2 "github.com/harmony-one/harmony/internal/common"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/crypto/hash"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
@ -33,10 +31,6 @@ type DRand struct {
// global consensus mutex
mutex sync.Mutex
// map of nodeID to validator Peer object
// FIXME: should use PubKey of p2p.Peer as the hashkey
validators sync.Map // key is string, value is p2p.Peer
// Leader's address
leader p2p.Peer
@ -92,7 +86,6 @@ func New(host p2p.Host, ShardID uint32, peers []p2p.Peer, leader p2p.Peer, confi
dRand.leader = leader
dRand.CommitteeAddresses = map[common.Address]bool{}
for _, peer := range peers {
dRand.validators.Store(common2.MustAddressToBech32(utils.GetBlsAddress(peer.ConsensusPubKey)), peer)
dRand.CommitteeAddresses[utils.GetBlsAddress(peer.ConsensusPubKey)] = true
}
@ -131,27 +124,6 @@ func New(host p2p.Host, ShardID uint32, peers []p2p.Peer, leader p2p.Peer, confi
return &dRand
}
// AddPeers adds new peers into the validator map of the consensus
// and add the public keys
func (dRand *DRand) AddPeers(peers []*p2p.Peer) int {
count := 0
for _, peer := range peers {
_, ok := dRand.validators.LoadOrStore(common2.MustAddressToBech32(utils.GetBlsAddress(peer.ConsensusPubKey)), *peer)
if !ok {
dRand.pubKeyLock.Lock()
if _, ok := dRand.CommitteeAddresses[peer.ConsensusPubKey.GetAddress()]; !ok {
dRand.PublicKeys = append(dRand.PublicKeys, peer.ConsensusPubKey)
dRand.CommitteeAddresses[peer.ConsensusPubKey.GetAddress()] = true
}
dRand.pubKeyLock.Unlock()
// utils.GetLogInstance().Debug("[DRAND]", "AddPeers", *peer)
}
count++
}
return count
}
// Sign on the drand message signature field.
func (dRand *DRand) signDRandMessage(message *msg_pb.Message) error {
message.Signature = nil
@ -186,21 +158,6 @@ func (dRand *DRand) vrf(blockHash [32]byte) (rand [32]byte, proof []byte) {
return
}
// GetValidatorPeers returns list of validator peers.
func (dRand *DRand) GetValidatorPeers() []p2p.Peer {
validatorPeers := make([]p2p.Peer, 0)
dRand.validators.Range(func(k, v interface{}) bool {
if peer, ok := v.(p2p.Peer); ok {
validatorPeers = append(validatorPeers, peer)
return true
}
return false
})
return validatorPeers
}
// Verify the signature of the message are valid from the signer's public key.
func verifyMessageSig(signerPubKey *bls.PublicKey, message *msg_pb.Message) error {
signature := message.Signature
@ -222,21 +179,6 @@ func verifyMessageSig(signerPubKey *bls.PublicKey, message *msg_pb.Message) erro
return nil
}
// Gets the validator peer based on validator ID.
func (dRand *DRand) getValidatorPeerByAddress(validatorAddress string) *p2p.Peer {
v, ok := dRand.validators.Load(validatorAddress)
if !ok {
utils.GetLogInstance().Warn("Unrecognized validator", "validatorAddress", validatorAddress, "dRand", dRand)
return nil
}
value, ok := v.(p2p.Peer)
if !ok {
utils.GetLogInstance().Warn("Invalid validator", "validatorAddress", validatorAddress, "dRand", dRand)
return nil
}
return &value
}
// IsValidatorInCommittee returns whether the given validator BLS address is part of my committee
func (dRand *DRand) IsValidatorInCommittee(validatorBlsAddress common.Address) bool {
_, ok := dRand.CommitteeAddresses[validatorBlsAddress]

@ -5,11 +5,9 @@ import (
"strings"
"testing"
bls2 "github.com/harmony-one/harmony/crypto/bls"
common2 "github.com/harmony-one/harmony/internal/common"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/bls/ffi/go/bls"
bls2 "github.com/harmony-one/harmony/crypto/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/types"
@ -33,81 +31,6 @@ func TestNew(test *testing.T) {
}
}
func TestGetValidatorPeers(test *testing.T) {
leader := p2p.Peer{IP: "127.0.0.1", Port: "9902", ConsensusPubKey: bls2.RandPrivateKey().GetPublicKey()}
validator := p2p.Peer{IP: "127.0.0.1", Port: "9905", ConsensusPubKey: bls2.RandPrivateKey().GetPublicKey()}
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902")
host, err := p2pimpl.NewHost(&leader, priKey)
if err != nil {
test.Fatalf("newhost failure: %v", err)
}
dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls2.RandPrivateKey())
if !dRand.IsLeader {
test.Error("dRand should belong to a leader")
}
countValidatorPeers := len(dRand.GetValidatorPeers())
if countValidatorPeers != 2 {
test.Error("Count of validator peers doesn't match, got", countValidatorPeers)
}
}
func TestAddPeers(test *testing.T) {
leader := p2p.Peer{IP: "127.0.0.1", Port: "9902", ConsensusPubKey: bls2.RandPrivateKey().GetPublicKey()}
validator := p2p.Peer{IP: "127.0.0.1", Port: "9905", ConsensusPubKey: bls2.RandPrivateKey().GetPublicKey()}
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902")
host, err := p2pimpl.NewHost(&leader, priKey)
if err != nil {
test.Fatalf("newhost failure: %v", err)
}
dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls2.RandPrivateKey())
if !dRand.IsLeader {
test.Error("dRand should belong to a leader")
}
newPeer := p2p.Peer{IP: "127.0.0.1", Port: "9907"}
countValidatorPeers := dRand.AddPeers([]*p2p.Peer{&newPeer})
if countValidatorPeers != 1 {
test.Error("Unable to add new peer")
}
if len(dRand.GetValidatorPeers()) != 3 {
test.Errorf("Number of validators doesn't match, actual count = %d expected count = 3", len(dRand.GetValidatorPeers()))
}
}
func TestGetValidatorByPeerId(test *testing.T) {
leaderPriKey := bls2.RandPrivateKey()
leaderPubKey := bls2.RandPrivateKey().GetPublicKey()
validatorPubKey := bls2.RandPrivateKey().GetPublicKey()
leader := p2p.Peer{IP: "127.0.0.1", Port: "9902", ConsensusPubKey: leaderPubKey}
validator := p2p.Peer{IP: "127.0.0.1", Port: "9905", ConsensusPubKey: validatorPubKey}
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902")
host, err := p2pimpl.NewHost(&leader, priKey)
if err != nil {
test.Fatalf("newhost failure: %v", err)
}
dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, leaderPriKey)
if !dRand.IsLeader {
test.Error("dRand should belong to a leader")
}
validatorAddress := common2.MustAddressToBech32(utils.GetAddressFromBlsPubKey(validatorPubKey))
if dRand.getValidatorPeerByAddress(validatorAddress) == nil {
test.Error("Unable to get validator by Peerid")
}
if dRand.getValidatorPeerByAddress("random address") != nil {
test.Error("Found validator for absent validatorId")
}
}
func TestResetState(test *testing.T) {
leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"}
validator := p2p.Peer{IP: "127.0.0.1", Port: "9905"}

@ -28,6 +28,7 @@ const (
NewNode
ClientNode
WalletNode
ExplorerNode
)
func (role Role) String() string {
@ -48,6 +49,8 @@ func (role Role) String() string {
return "ClientNode"
case WalletNode:
return "WalletNode"
case ExplorerNode:
return "ExplorerNode"
}
return "Unknown"
}

@ -3,6 +3,7 @@ package node
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"math"
@ -14,12 +15,17 @@ import (
"syscall"
"time"
"github.com/harmony-one/harmony/api/service/explorer"
"github.com/harmony-one/harmony/consensus"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
pb "github.com/golang/protobuf/proto"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/api/proto"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
@ -124,6 +130,9 @@ func (node *Node) messageHandler(content []byte, sender string) {
case proto.Consensus:
msgPayload, _ := proto.GetConsensusMessagePayload(content)
node.ConsensusMessageHandler(msgPayload)
if node.NodeConfig.Role() == nodeconfig.ExplorerNode {
node.ExplorerMessageHandler(msgPayload)
}
case proto.DRand:
msgPayload, _ := proto.GetDRandMessagePayload(content)
if node.DRand != nil {
@ -610,11 +619,13 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
peer.PeerID = p.PeerID
peer.ConsensusPubKey = &bls.PublicKey{}
if len(p.PubKey) != 0 { // TODO: add the check in bls library
err = peer.ConsensusPubKey.Deserialize(p.PubKey[:])
if err != nil {
utils.GetLogInstance().Error("UnmarshalBinary Failed", "error", err)
continue
}
}
peers = append(peers, peer)
}
@ -812,3 +823,64 @@ func (node *Node) ConsensusMessageHandler(msgPayload []byte) {
}
return
}
// ExplorerMessageHandler passes received message in node_handler to explorer service
func (node *Node) ExplorerMessageHandler(payload []byte) {
if len(payload) == 0 {
return
}
msg := &msg_pb.Message{}
err := protobuf.Unmarshal(payload, msg)
if err != nil {
utils.GetLogger().Error("Failed to unmarshal message payload.", "err", err)
return
}
if msg.Type == msg_pb.MessageType_COMMITTED {
recvMsg, err := consensus.ParsePbftMessage(msg)
if err != nil {
utils.GetLogInstance().Debug("[Explorer] onCommitted unable to parse msg", "error", err)
return
}
aggSig, mask, err := node.Consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil {
utils.GetLogInstance().Debug("[Explorer] readSignatureBitmapPayload failed", "error", err)
return
}
// check has 2f+1 signatures
if count := utils.CountOneBits(mask.Bitmap); count < node.Consensus.Quorum() {
utils.GetLogInstance().Debug("[Explorer] not have enough signature", "need", node.Consensus.Quorum(), "have", count)
return
}
blockNumHash := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumHash, recvMsg.BlockNum)
commitPayload := append(blockNumHash, recvMsg.BlockHash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
utils.GetLogInstance().Debug("[Explorer] Failed to verify the multi signature for commit phase", "msgBlock", recvMsg.BlockNum)
return
}
block := node.Consensus.PbftLog.GetBlockByHash(recvMsg.BlockHash)
// Dump new block into level db.
utils.GetLogInstance().Info("[Explorer] Committing block into explorer DB", "msgBlock", recvMsg.BlockNum)
explorer.GetStorageInstance(node.SelfPeer.IP, node.SelfPeer.Port, true).Dump(block, block.NumberU64())
} else if msg.Type == msg_pb.MessageType_ANNOUNCE {
recvMsg, err := consensus.ParsePbftMessage(msg)
if err != nil {
utils.GetLogInstance().Debug("[Explorer] onAnnounce unable to parse msg", "error", err)
return
}
block := recvMsg.Payload
// check block header is valid
var blockObj types.Block
err = rlp.DecodeBytes(block, &blockObj)
node.Consensus.PbftLog.AddBlock(&blockObj)
}
return
}

@ -23,8 +23,6 @@ func (node *Node) setupForShardLeader() {
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil))
// Register explorer service.
node.serviceManager.RegisterService(service.SupportExplorer, explorer.New(&node.SelfPeer, node.Consensus.GetNodeIDs, node.GetBalanceOfAddress))
// Register consensus service.
node.serviceManager.RegisterService(service.Consensus, consensus.New(node.BlockChannel, node.Consensus, node.startConsensus))
// Register new block service.
@ -64,8 +62,6 @@ func (node *Node) setupForBeaconLeader() {
node.serviceManager.RegisterService(service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReadyv2))
// Register randomness service
node.serviceManager.RegisterService(service.Randomness, randomness.New(node.DRand))
// Register explorer service.
node.serviceManager.RegisterService(service.SupportExplorer, explorer.New(&node.SelfPeer, node.Consensus.GetNodeIDs, node.GetBalanceOfAddress))
}
func (node *Node) setupForBeaconValidator() {
@ -106,6 +102,20 @@ func (node *Node) setupForClientNode() {
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil))
}
func (node *Node) setupForExplorerNode() {
// TODO determine the role of new node, currently assume it is beacon node
nodeConfig, chanPeer := node.initNodeConfiguration()
// Register peer discovery service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetBeaconGroupID(), chanPeer, nil))
// Register explorer service.
node.serviceManager.RegisterService(service.SupportExplorer, explorer.New(&node.SelfPeer, node.Consensus.GetNodeIDs, node.GetBalanceOfAddress))
// TODO: how to restart networkinfo and discovery service after receiving shard id info from beacon chain?
}
// ServiceManagerSetup setups service store.
func (node *Node) ServiceManagerSetup() {
// Run pingpong message protocol for all type of nodes.
@ -127,6 +137,8 @@ func (node *Node) ServiceManagerSetup() {
node.setupForNewNode()
case nodeconfig.ClientNode:
node.setupForClientNode()
case nodeconfig.ExplorerNode:
node.setupForExplorerNode()
}
node.serviceManager.SetupServiceMessageChan(node.serviceMessageChan)
}

@ -18,3 +18,4 @@
127.0.0.1 9017 validator 0x0CCa9111F4588EDB3c9a282faE233B830dE21A0D
127.0.0.1 9018 validator 0x0F595ed534b6464eB2C80A037FFba02D23AfdfD2
127.0.0.1 9019 validator 0x0a0b8c48e42c540078fD99004915Be265f380dB7
127.0.0.1 19999 explorer 0

@ -161,6 +161,7 @@ while IFS='' read -r line || [[ -n "$line" ]]; do
esac
case "${mode}" in leader*) args=("${args[@]}" -is_leader);; esac
case "${mode}" in *archival|archival) args=("${args[@]}" -is_archival);; esac
case "${mode}" in explorer*) args=("${args[@]}" -is_genesis=false -is_explorer=true -shard_id=${account});; esac
case "${mode}" in
newnode)
"${SYNC}" || continue

Loading…
Cancel
Save