Merge pull request #498 from LeoHChen/drg

Fix consensus error with DRG
pull/502/head
Leo Chen 6 years ago committed by GitHub
commit da946320fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      .gitignore
  2. 2
      README.md
  3. 2
      cmd/harmony.go
  4. 30
      consensus/consensus.go
  5. 7
      consensus/consensus_leader.go
  6. 4
      consensus/consensus_test.go
  7. 18
      consensus/consensus_validator.go
  8. 31
      drand/drand.go
  9. 1
      drand/drand_leader.go
  10. 2
      drand/drand_leader_msg_test.go
  11. 2
      drand/drand_test.go
  12. 9
      drand/drand_validator.go
  13. 2
      drand/drand_validator_msg_test.go
  14. 25
      node/node_handler.go
  15. 3
      node/node_newblock.go
  16. 2
      node/node_test.go
  17. 2
      node/service_setup.go

3
.gitignore vendored

@ -63,3 +63,6 @@ node_modules/
# txgen node keystore # txgen node keystore
.txgenkey .txgenkey
# go mod summary file
go.sum

@ -13,6 +13,8 @@ brew install openssl
## Dev Environment Setup ## Dev Environment Setup
The required go version is: **go1.11**
```bash ```bash
export GOPATH=$HOME/<path_of_your_choice> export GOPATH=$HOME/<path_of_your_choice>
export CGO_CFLAGS="-I$GOPATH/src/github.com/harmony-one/bls/include -I$GOPATH/src/github.com/harmony-one/mcl/include -I/usr/local/opt/openssl/include" export CGO_CFLAGS="-I$GOPATH/src/github.com/harmony-one/bls/include -I$GOPATH/src/github.com/harmony-one/mcl/include -I/usr/local/opt/openssl/include"

@ -279,7 +279,7 @@ func main() {
// Add randomness protocol // Add randomness protocol
// TODO: enable drand only for beacon chain // TODO: enable drand only for beacon chain
// TODO: put this in a better place other than main. // TODO: put this in a better place other than main.
dRand := drand.New(host, shardID, peers, leader, currentNode.ConfirmedBlockChannel) dRand := drand.New(host, shardID, peers, leader, currentNode.ConfirmedBlockChannel, *isLeader)
currentNode.Consensus.RegisterPRndChannel(dRand.PRndChannel) currentNode.Consensus.RegisterPRndChannel(dRand.PRndChannel)
currentNode.Consensus.RegisterRndChannel(dRand.RndChannel) currentNode.Consensus.RegisterRndChannel(dRand.RndChannel)
currentNode.DRand = dRand currentNode.DRand = dRand

@ -51,7 +51,7 @@ type Consensus struct {
MinPeers int MinPeers int
// Leader's address // Leader's address
Leader p2p.Peer leader p2p.Peer
// Public keys of the committee including leader and validators // Public keys of the committee including leader and validators
PublicKeys []*bls.PublicKey PublicKeys []*bls.PublicKey
@ -177,7 +177,7 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Cons
consensus.IsLeader = false consensus.IsLeader = false
} }
consensus.Leader = leader consensus.leader = leader
for _, peer := range peers { for _, peer := range peers {
consensus.validators.Store(utils.GetUniqueIDFromPeer(peer), peer) consensus.validators.Store(utils.GetUniqueIDFromPeer(peer), peer)
} }
@ -194,8 +194,8 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Cons
consensus.PublicKeys = allPublicKeys consensus.PublicKeys = allPublicKeys
prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey) prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey)
commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey) commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey)
consensus.prepareBitmap = prepareBitmap consensus.prepareBitmap = prepareBitmap
consensus.commitBitmap = commitBitmap consensus.commitBitmap = commitBitmap
@ -258,7 +258,6 @@ func (consensus *Consensus) checkConsensusMessage(message *msg_pb.Message, publi
consensusID := consensusMsg.ConsensusId consensusID := consensusMsg.ConsensusId
blockHash := consensusMsg.BlockHash blockHash := consensusMsg.BlockHash
utils.GetLogInstance().Warn("checkConsensusMessage", "publicKey", publicKey)
// Verify message signature // Verify message signature
err := verifyMessageSig(publicKey, message) err := verifyMessageSig(publicKey, message)
if err != nil { if err != nil {
@ -309,7 +308,7 @@ func verifyMessageSig(signerPubKey *bls.PublicKey, message *msg_pb.Message) erro
return err return err
} }
msgHash := sha256.Sum256(messageBytes) msgHash := sha256.Sum256(messageBytes)
utils.GetLogInstance().Debug("verifyMessageSig", "signerPubKey", signerPubKey, "msgHash", msgHash) utils.GetLogInstance().Debug("verifyMessageSig")
if !msgSig.VerifyHash(signerPubKey, msgHash[:]) { if !msgSig.VerifyHash(signerPubKey, msgHash[:]) {
return errors.New("failed to verify the signature") return errors.New("failed to verify the signature")
} }
@ -382,8 +381,8 @@ func (consensus *Consensus) ResetState() {
consensus.prepareSigs = map[uint32]*bls.Sign{} consensus.prepareSigs = map[uint32]*bls.Sign{}
consensus.commitSigs = map[uint32]*bls.Sign{} consensus.commitSigs = map[uint32]*bls.Sign{}
prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey) prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey)
commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey) commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey)
consensus.prepareBitmap = prepareBitmap consensus.prepareBitmap = prepareBitmap
consensus.commitBitmap = commitBitmap consensus.commitBitmap = commitBitmap
@ -471,7 +470,7 @@ func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
// Or the shard won't be able to reach consensus if public keys are mismatch // Or the shard won't be able to reach consensus if public keys are mismatch
validators := consensus.GetValidatorPeers() validators := consensus.GetValidatorPeers()
pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys, consensus.Leader.PubKey) pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys, consensus.leader.PubKey)
buffer := pong.ConstructPongMessage() buffer := pong.ConstructPongMessage()
if utils.UseLibP2P { if utils.UseLibP2P {
@ -655,6 +654,8 @@ func (consensus *Consensus) populateMessageFields(request *msg_pb.ConsensusReque
// 4 byte sender id // 4 byte sender id
request.SenderId = uint32(consensus.nodeID) request.SenderId = uint32(consensus.nodeID)
utils.GetLogInstance().Debug("[populateMessageFields]", "myConsensusID", consensus.consensusID, "SenderId", request.SenderId)
} }
// Signs the consensus message and returns the marshaled message. // Signs the consensus message and returns the marshaled message.
@ -670,3 +671,14 @@ func (consensus *Consensus) signAndMarshalConsensusMessage(message *msg_pb.Messa
} }
return marshaledMessage, nil return marshaledMessage, nil
} }
// SetLeaderPubKey deserialize the public key of consensus leader
func (consensus *Consensus) SetLeaderPubKey(k []byte) error {
consensus.leader.PubKey = &bls.PublicKey{}
return consensus.leader.PubKey.Deserialize(k)
}
// GetLeaderPubKey returns the public key of consensus leader
func (consensus *Consensus) GetLeaderPubKey() *bls.PublicKey {
return consensus.leader.PubKey
}

@ -71,7 +71,7 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stop
pRnd := [32]byte{} pRnd := [32]byte{}
copy(pRnd[:], pRndAndBitmap[:32]) copy(pRnd[:], pRndAndBitmap[:32])
bitmap := pRndAndBitmap[32:] bitmap := pRndAndBitmap[32:]
vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey) vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey)
vrfBitmap.SetMask(bitmap) vrfBitmap.SetMask(bitmap)
// TODO: check validity of pRnd // TODO: check validity of pRnd
@ -148,6 +148,7 @@ func (consensus *Consensus) startConsensus(newBlock *types.Block) {
if utils.UseLibP2P { if utils.UseLibP2P {
// Construct broadcast p2p message // Construct broadcast p2p message
utils.GetLogInstance().Warn("[Consensus]", "sent announce message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else { } else {
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
@ -217,6 +218,7 @@ func (consensus *Consensus) processPrepareMessage(message *msg_pb.Message) {
consensus.aggregatedPrepareSig = aggSig consensus.aggregatedPrepareSig = aggSig
if utils.UseLibP2P { if utils.UseLibP2P {
utils.GetLogInstance().Warn("[Consensus]", "sent prepared message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else { } else {
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
@ -295,6 +297,7 @@ func (consensus *Consensus) processCommitMessage(message *msg_pb.Message) {
consensus.aggregatedCommitSig = aggSig consensus.aggregatedCommitSig = aggSig
if utils.UseLibP2P { if utils.UseLibP2P {
utils.GetLogInstance().Warn("[Consensus]", "sent committed message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else { } else {
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
@ -323,7 +326,7 @@ func (consensus *Consensus) processCommitMessage(message *msg_pb.Message) {
consensus.reportMetrics(blockObj) consensus.reportMetrics(blockObj)
// Dump new block into level db. // Dump new block into level db.
explorer.GetStorageInstance(consensus.Leader.IP, consensus.Leader.Port, true).Dump(&blockObj, consensus.consensusID) explorer.GetStorageInstance(consensus.leader.IP, consensus.leader.Port, true).Dump(&blockObj, consensus.consensusID)
// Reset state to Finished, and clear other data. // Reset state to Finished, and clear other data.
consensus.ResetState() consensus.ResetState()

@ -30,10 +30,6 @@ func TestNew(test *testing.T) {
if consensus.ReadySignal == nil { if consensus.ReadySignal == nil {
test.Error("Consensus ReadySignal should be initialized") test.Error("Consensus ReadySignal should be initialized")
} }
if consensus.Leader.IP != leader.IP || consensus.Leader.Port != leader.Port {
test.Error("Consensus Leader is set to wrong Peer")
}
} }
func TestRemovePeers(t *testing.T) { func TestRemovePeers(t *testing.T) {

@ -62,6 +62,12 @@ func (consensus *Consensus) ProcessMessageValidator(payload []byte) {
consensus.processPreparedMessage(message) consensus.processPreparedMessage(message)
case msg_pb.MessageType_COMMITTED: case msg_pb.MessageType_COMMITTED:
consensus.processCommittedMessage(message) consensus.processCommittedMessage(message)
case msg_pb.MessageType_PREPARE:
case msg_pb.MessageType_COMMIT:
// ignore consensus message that is only meant to sent to leader
// since we use pubsub, the relay node will also receive those message
// but we should just ignore them
default: default:
utils.GetLogInstance().Error("Unexpected message type", "msgType", message.Type, "consensus", consensus) utils.GetLogInstance().Error("Unexpected message type", "msgType", message.Type, "consensus", consensus)
} }
@ -85,7 +91,7 @@ func (consensus *Consensus) processAnnounceMessage(message *msg_pb.Message) {
copy(consensus.blockHash[:], blockHash[:]) copy(consensus.blockHash[:], blockHash[:])
consensus.block = block consensus.block = block
if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil { if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil {
utils.GetLogInstance().Debug("Failed to check the leader message") utils.GetLogInstance().Debug("Failed to check the leader message")
if err == consensus_engine.ErrConsensusIDNotMatch { if err == consensus_engine.ErrConsensusIDNotMatch {
utils.GetLogInstance().Debug("sending bft block to state syncing") utils.GetLogInstance().Debug("sending bft block to state syncing")
@ -117,9 +123,10 @@ func (consensus *Consensus) processAnnounceMessage(message *msg_pb.Message) {
// Construct and send prepare message // Construct and send prepare message
msgToSend := consensus.constructPrepareMessage() msgToSend := consensus.constructPrepareMessage()
if utils.UseLibP2P { if utils.UseLibP2P {
utils.GetLogInstance().Warn("[Consensus]", "sent prepare message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else { } else {
consensus.SendMessage(consensus.Leader, msgToSend) consensus.SendMessage(consensus.leader, msgToSend)
} }
consensus.state = PrepareDone consensus.state = PrepareDone
@ -149,7 +156,7 @@ func (consensus *Consensus) processPreparedMessage(message *msg_pb.Message) {
// Update readyByConsensus for attack. // Update readyByConsensus for attack.
attack.GetInstance().UpdateConsensusReady(consensusID) attack.GetInstance().UpdateConsensusReady(consensusID)
if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil { if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil {
utils.GetLogInstance().Debug("processPreparedMessage error", "error", err) utils.GetLogInstance().Debug("processPreparedMessage error", "error", err)
return return
} }
@ -183,9 +190,10 @@ func (consensus *Consensus) processPreparedMessage(message *msg_pb.Message) {
multiSigAndBitmap := append(multiSig, bitmap...) multiSigAndBitmap := append(multiSig, bitmap...)
msgToSend := consensus.constructCommitMessage(multiSigAndBitmap) msgToSend := consensus.constructCommitMessage(multiSigAndBitmap)
if utils.UseLibP2P { if utils.UseLibP2P {
utils.GetLogInstance().Warn("[Consensus]", "sent commit message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else { } else {
consensus.SendMessage(consensus.Leader, msgToSend) consensus.SendMessage(consensus.leader, msgToSend)
} }
consensus.state = CommitDone consensus.state = CommitDone
@ -213,7 +221,7 @@ func (consensus *Consensus) processCommittedMessage(message *msg_pb.Message) {
// Update readyByConsensus for attack. // Update readyByConsensus for attack.
attack.GetInstance().UpdateConsensusReady(consensusID) attack.GetInstance().UpdateConsensusReady(consensusID)
if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil { if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil {
utils.GetLogInstance().Debug("processCommittedMessage error", "error", err) utils.GetLogInstance().Debug("processCommittedMessage error", "error", err)
return return
} }

@ -36,7 +36,7 @@ type DRand struct {
validators sync.Map // key is uint16, value is p2p.Peer validators sync.Map // key is uint16, value is p2p.Peer
// Leader's address // Leader's address
Leader p2p.Peer leader p2p.Peer
// Public keys of the committee including leader and validators // Public keys of the committee including leader and validators
PublicKeys []*bls.PublicKey PublicKeys []*bls.PublicKey
@ -67,7 +67,7 @@ type DRand struct {
} }
// New creates a new dRand object // New creates a new dRand object
func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer, confirmedBlockChannel chan *types.Block) *DRand { func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer, confirmedBlockChannel chan *types.Block, isLeader bool) *DRand {
dRand := DRand{} dRand := DRand{}
dRand.host = host dRand.host = host
@ -79,13 +79,9 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer, confi
dRand.RndChannel = make(chan [64]byte) dRand.RndChannel = make(chan [64]byte)
selfPeer := host.GetSelfPeer() selfPeer := host.GetSelfPeer()
if leader.Port == selfPeer.Port && leader.IP == selfPeer.IP { dRand.IsLeader = isLeader
dRand.IsLeader = true
} else {
dRand.IsLeader = false
}
dRand.Leader = leader dRand.leader = leader
for _, peer := range peers { for _, peer := range peers {
dRand.validators.Store(utils.GetUniqueIDFromPeer(peer), peer) dRand.validators.Store(utils.GetUniqueIDFromPeer(peer), peer)
} }
@ -101,7 +97,7 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer, confi
dRand.PublicKeys = allPublicKeys dRand.PublicKeys = allPublicKeys
bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.Leader.PubKey) bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.leader.PubKey)
dRand.bitmap = bitmap dRand.bitmap = bitmap
dRand.pRand = nil dRand.pRand = nil
@ -242,8 +238,23 @@ func (dRand *DRand) getValidatorPeerByID(validatorID uint32) *p2p.Peer {
func (dRand *DRand) ResetState() { func (dRand *DRand) ResetState() {
dRand.vrfs = &map[uint32][]byte{} dRand.vrfs = &map[uint32][]byte{}
bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.Leader.PubKey) bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.leader.PubKey)
dRand.bitmap = bitmap dRand.bitmap = bitmap
dRand.pRand = nil dRand.pRand = nil
dRand.rand = nil dRand.rand = nil
} }
// SetLeaderPubKey deserialize the public key of drand leader
func (dRand *DRand) SetLeaderPubKey(k []byte) error {
dRand.leader.PubKey = &bls.PublicKey{}
return dRand.leader.PubKey.Deserialize(k)
}
// UpdatePublicKeys updates the PublicKeys variable, protected by a mutex
func (dRand *DRand) UpdatePublicKeys(pubKeys []*bls.PublicKey) int {
dRand.pubKeyLock.Lock()
dRand.PublicKeys = append(pubKeys[:0:0], pubKeys...)
dRand.pubKeyLock.Unlock()
return len(dRand.PublicKeys)
}

@ -75,6 +75,7 @@ func (dRand *DRand) init(epochBlock *types.Block) {
(*dRand.vrfs)[dRand.nodeID] = append(rand[:], proof...) (*dRand.vrfs)[dRand.nodeID] = append(rand[:], proof...)
if utils.UseLibP2P { if utils.UseLibP2P {
utils.GetLogInstance().Info("[DRG] sent init", "msg", msgToSend, "leader.PubKey", dRand.leader.PubKey)
dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else { } else {
host.BroadcastMessageFromLeader(dRand.host, dRand.GetValidatorPeers(), msgToSend, nil) host.BroadcastMessageFromLeader(dRand.host, dRand.GetValidatorPeers(), msgToSend, nil)

@ -16,7 +16,7 @@ func TestConstructInitMessage(test *testing.T) {
if err != nil { if err != nil {
test.Fatalf("newhost failure: %v", err) test.Fatalf("newhost failure: %v", err)
} }
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil) dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil, true)
dRand.blockHash = [32]byte{} dRand.blockHash = [32]byte{}
msg := dRand.constructInitMessage() msg := dRand.constructInitMessage()

@ -16,7 +16,7 @@ func TestNew(test *testing.T) {
if err != nil { if err != nil {
test.Fatalf("newhost failure: %v", err) test.Fatalf("newhost failure: %v", err)
} }
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil) dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil, true)
if !dRand.IsLeader { if !dRand.IsLeader {
test.Error("dRand should belong to a leader") test.Error("dRand should belong to a leader")

@ -20,6 +20,8 @@ func (dRand *DRand) ProcessMessageValidator(payload []byte) {
switch message.Type { switch message.Type {
case drand_proto.MessageType_INIT: case drand_proto.MessageType_INIT:
dRand.processInitMessage(message) dRand.processInitMessage(message)
case drand_proto.MessageType_COMMIT:
// do nothing on the COMMIT message, as it is intended to send to leader
default: default:
utils.GetLogInstance().Error("Unexpected message type", "msgType", message.Type, "dRand", dRand) utils.GetLogInstance().Error("Unexpected message type", "msgType", message.Type, "dRand", dRand)
} }
@ -35,11 +37,12 @@ func (dRand *DRand) processInitMessage(message drand_proto.Message) {
blockHash := message.BlockHash blockHash := message.BlockHash
// Verify message signature // Verify message signature
err := verifyMessageSig(dRand.Leader.PubKey, message) err := verifyMessageSig(dRand.leader.PubKey, message)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err, "Leader.PubKey", dRand.Leader.PubKey) utils.GetLogInstance().Warn("[DRG] Failed to verify the message signature", "Error", err)
return return
} }
utils.GetLogInstance().Debug("[DRG] verify the message signature Succeeded")
// TODO: check the blockHash is the block hash of last block of last epoch. // TODO: check the blockHash is the block hash of last block of last epoch.
copy(dRand.blockHash[:], blockHash[:]) copy(dRand.blockHash[:], blockHash[:])
@ -52,6 +55,6 @@ func (dRand *DRand) processInitMessage(message drand_proto.Message) {
if utils.UseLibP2P { if utils.UseLibP2P {
dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else { } else {
host.SendMessage(dRand.host, dRand.Leader, msgToSend, nil) host.SendMessage(dRand.host, dRand.leader, msgToSend, nil)
} }
} }

@ -16,7 +16,7 @@ func TestConstructCommitMessage(test *testing.T) {
if err != nil { if err != nil {
test.Fatalf("newhost failure: %v", err) test.Fatalf("newhost failure: %v", err)
} }
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil) dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil, true)
dRand.blockHash = [32]byte{} dRand.blockHash = [32]byte{}
msg := dRand.constructCommitMessage([32]byte{}, []byte{}) msg := dRand.constructCommitMessage([32]byte{}, []byte{})

@ -132,10 +132,10 @@ func (node *Node) messageHandler(content []byte, sender string) {
case proto.Consensus: case proto.Consensus:
msgPayload, _ := proto.GetConsensusMessagePayload(content) msgPayload, _ := proto.GetConsensusMessagePayload(content)
if consensusObj.IsLeader { if consensusObj.IsLeader {
utils.GetLogInstance().Info("NET: Leader received message:", "messageCategory", msgCategory) utils.GetLogInstance().Info("NET: Leader received consensus message")
consensusObj.ProcessMessageLeader(msgPayload) consensusObj.ProcessMessageLeader(msgPayload)
} else { } else {
utils.GetLogInstance().Info("NET: Validator received message:", "messageCategory", msgCategory) utils.GetLogInstance().Info("NET: Validator received consensus message")
consensusObj.ProcessMessageValidator(msgPayload) consensusObj.ProcessMessageValidator(msgPayload)
// TODO(minhdoan): add logic to check if the current blockchain is not sync with other consensus // TODO(minhdoan): add logic to check if the current blockchain is not sync with other consensus
// we should switch to other state rather than DoingConsensus. // we should switch to other state rather than DoingConsensus.
@ -144,10 +144,10 @@ func (node *Node) messageHandler(content []byte, sender string) {
msgPayload, _ := proto.GetDRandMessagePayload(content) msgPayload, _ := proto.GetDRandMessagePayload(content)
if node.DRand != nil { if node.DRand != nil {
if node.DRand.IsLeader { if node.DRand.IsLeader {
utils.GetLogInstance().Info("NET: DRand Leader received message:", "messageCategory", msgCategory) utils.GetLogInstance().Info("NET: DRand Leader received message")
node.DRand.ProcessMessageLeader(msgPayload) node.DRand.ProcessMessageLeader(msgPayload)
} else { } else {
utils.GetLogInstance().Info("NET: DRand Validator received message:", "messageCategory", msgCategory) utils.GetLogInstance().Info("NET: DRand Validator received message")
node.DRand.ProcessMessageValidator(msgPayload) node.DRand.ProcessMessageValidator(msgPayload)
} }
} }
@ -169,6 +169,7 @@ func (node *Node) messageHandler(content []byte, sender string) {
blockMsgType := proto_node.BlockMessageType(msgPayload[0]) blockMsgType := proto_node.BlockMessageType(msgPayload[0])
switch blockMsgType { switch blockMsgType {
case proto_node.Sync: case proto_node.Sync:
utils.GetLogInstance().Info("NET: received message: Node/Sync")
var blocks []*types.Block var blocks []*types.Block
err := rlp.DecodeBytes(msgPayload[1:], &blocks) // skip the Sync messge type err := rlp.DecodeBytes(msgPayload[1:], &blocks) // skip the Sync messge type
if err != nil { if err != nil {
@ -384,7 +385,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int {
// This is the old way of broadcasting pong message // This is the old way of broadcasting pong message
if node.Consensus.IsLeader && !utils.UseLibP2P { if node.Consensus.IsLeader && !utils.UseLibP2P {
peers := node.Consensus.GetValidatorPeers() peers := node.Consensus.GetValidatorPeers()
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.Leader.PubKey) pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.GetLeaderPubKey())
buffer := pong.ConstructPongMessage() buffer := pong.ConstructPongMessage()
// Send a Pong message directly to the sender // Send a Pong message directly to the sender
@ -409,7 +410,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int {
// SendPongMessage is the a goroutine to periodcally send pong message to all peers // SendPongMessage is the a goroutine to periodcally send pong message to all peers
func (node *Node) SendPongMessage() { func (node *Node) SendPongMessage() {
tick := time.NewTicker(10 * time.Second) tick := time.NewTicker(3 * time.Second)
numPeers := len(node.Consensus.GetValidatorPeers()) numPeers := len(node.Consensus.GetValidatorPeers())
numPubKeys := len(node.Consensus.PublicKeys) numPubKeys := len(node.Consensus.PublicKeys)
sentMessage := false sentMessage := false
@ -432,7 +433,7 @@ func (node *Node) SendPongMessage() {
} else { } else {
// stable number of peers/pubkeys, sent the pong message // stable number of peers/pubkeys, sent the pong message
if !sentMessage { if !sentMessage {
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.Leader.PubKey) pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.GetLeaderPubKey())
buffer := pong.ConstructPongMessage() buffer := pong.ConstructPongMessage()
err := node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, host.ConstructP2pMessage(byte(0), buffer)) err := node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, host.ConstructP2pMessage(byte(0), buffer))
if err != nil { if err != nil {
@ -482,13 +483,11 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
node.AddPeers(peers) node.AddPeers(peers)
} }
node.Consensus.Leader.PubKey = &bls.PublicKey{} err = node.Consensus.SetLeaderPubKey(pong.LeaderPubKey)
err = node.Consensus.Leader.PubKey.Deserialize(pong.LeaderPubKey)
if err != nil { if err != nil {
utils.GetLogInstance().Error("Unmarshal Consensus Leader PubKey Failed", "error", err) utils.GetLogInstance().Error("Unmarshal Consensus Leader PubKey Failed", "error", err)
} }
node.DRand.Leader.PubKey = &bls.PublicKey{} err = node.DRand.SetLeaderPubKey(pong.LeaderPubKey)
err = node.DRand.Leader.PubKey.Deserialize(pong.LeaderPubKey)
if err != nil { if err != nil {
utils.GetLogInstance().Error("Unmarshal DRand Leader PubKey Failed", "error", err) utils.GetLogInstance().Error("Unmarshal DRand Leader PubKey Failed", "error", err)
} }
@ -509,7 +508,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
publicKeys = append(publicKeys, &key) publicKeys = append(publicKeys, &key)
} }
// utils.GetLogInstance().Debug("[pongMessageHandler]", "#keys", len(publicKeys), "#peers", len(peers)) utils.GetLogInstance().Debug("[pongMessageHandler]", "#keys", len(publicKeys), "#peers", len(peers))
if node.State == NodeWaitToJoin { if node.State == NodeWaitToJoin {
node.State = NodeReadyForConsensus node.State = NodeReadyForConsensus
@ -524,5 +523,5 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
data["peer"] = p2p.GroupAction{Name: node.MyShardGroupID, Action: p2p.ActionPause} data["peer"] = p2p.GroupAction{Name: node.MyShardGroupID, Action: p2p.ActionPause}
node.serviceManager.TakeAction(&service.Action{Action: service.Notify, ServiceType: service.PeerDiscovery, Params: data}) node.serviceManager.TakeAction(&service.Action{Action: service.Notify, ServiceType: service.PeerDiscovery, Params: data})
return node.Consensus.UpdatePublicKeys(publicKeys) return node.Consensus.UpdatePublicKeys(publicKeys) + node.DRand.UpdatePublicKeys(publicKeys)
} }

@ -30,10 +30,11 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}, stopChan chan
select { select {
case <-readySignal: case <-readySignal:
time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up (test-only). time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up (test-only).
case <-time.After(200 * time.Second): case <-time.After(300 * time.Second):
node.Consensus.ResetState() node.Consensus.ResetState()
timeoutCount++ timeoutCount++
utils.GetLogInstance().Debug("Consensus timeout, retry!", "count", timeoutCount, "node", node) utils.GetLogInstance().Debug("Consensus timeout, retry!", "count", timeoutCount, "node", node)
// FIXME: retry is not working, there is no retry logic here. It will only wait for new transaction.
case <-stopChan: case <-stopChan:
return return
} }

@ -94,7 +94,7 @@ func TestAddPeers(t *testing.T) {
t.Fatalf("newhost failure: %v", err) t.Fatalf("newhost failure: %v", err)
} }
consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader)
dRand := drand.New(host, "0", []p2p.Peer{leader, validator}, leader, nil) dRand := drand.New(host, "0", []p2p.Peer{leader, validator}, leader, nil, true)
node := New(host, consensus, nil) node := New(host, consensus, nil)
node.DRand = dRand node.DRand = dRand

@ -71,8 +71,6 @@ func (node *Node) setupForBeaconValidator() {
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer))
// Register networkinfo service. // Register networkinfo service.
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
// Register randomness service
node.serviceManager.RegisterService(service.Randomness, randomness.New(node.DRand))
} }
func (node *Node) setupForNewNode() { func (node *Node) setupForNewNode() {

Loading…
Cancel
Save