use mutex to fix race condtion when updating PublicKeys

Signed-off-by: Leo Chen <leo@harmony.one>
pull/106/head
Leo Chen 6 years ago
parent 1d7baff8c4
commit fe16532820
  1. 19
      consensus/consensus.go
  2. 7
      consensus/consensus_leader.go
  3. 12
      node/node_handler.go

@ -48,6 +48,7 @@ type Consensus struct {
leader p2p.Peer
// Public keys of the committee including leader and validators
PublicKeys []kyber.Point
pubKeyLock sync.Mutex
// private/public keys of current node
priKey kyber.Scalar
@ -87,7 +88,7 @@ type Consensus struct {
Log log.Logger
uniqueIdInstance *utils.UniqueValidatorId
uniqueIDInstance *utils.UniqueValidatorId
}
// BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far
@ -172,7 +173,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) *
}
consensus.Log = log.New()
consensus.uniqueIdInstance = utils.GetUniqueValidatorIdInstance()
consensus.uniqueIDInstance = utils.GetUniqueValidatorIdInstance()
return &consensus
}
@ -241,7 +242,7 @@ func (consensus *Consensus) AddPeers(peers []p2p.Peer) int {
_, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer))
if !ok {
if peer.ValidatorID == -1 {
peer.ValidatorID = int(consensus.uniqueIdInstance.GetUniqueId())
peer.ValidatorID = int(consensus.uniqueIDInstance.GetUniqueId())
}
consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer)
consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey)
@ -277,9 +278,17 @@ func (consensus *Consensus) DebugPrintValidators() {
consensus.Log.Debug("validator:", "IP", p.Ip, "Port", p.Port, "VID", p.ValidatorID, "Key", str2)
count++
return true
} else {
return false
}
return false
})
consensus.Log.Debug("Validators", "#", count)
}
// UpdatePublicKeys updates the PublicKeys variable, protected by a mutex
func (consensus *Consensus) UpdatePublicKeys(pubKeys []kyber.Point) int {
consensus.pubKeyLock.Lock()
consensus.PublicKeys = append(pubKeys[:0:0], pubKeys...)
consensus.pubKeyLock.Unlock()
return len(consensus.PublicKeys)
}

@ -53,9 +53,16 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
// WaitForNewBlock waits for the next new block to run consensus on
func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Block) {
consensus.Log.Debug("Waiting for block", "consensus", consensus)
backoff := p2p.NewExpBackoff(500*time.Millisecond, 30*time.Second, 2.0)
for { // keep waiting for new blocks
newBlock := <-blockChannel
// TODO: think about potential race condition
if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
backoff.Sleep()
}
startTime = time.Now()
consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime)
for consensus.state == Finished {

@ -604,6 +604,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
return -1
}
// node.log.Info("Pong", "Msg", pong)
// TODO (lc) state syncing, and wait for all public keys
node.State = NodeJoinedShard
peers := make([]p2p.Peer, 0)
@ -621,15 +622,16 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
continue
}
peers = append(peers, *peer)
}
count := node.AddPeers(peers)
if len(peers) > 0 {
node.AddPeers(peers)
}
// Reset Validator PublicKeys every time we receive PONG message from Leader
// The PublicKeys has to be idential across the shard on every node
// TODO (lc): we need to handle RemovePeer situation
node.Consensus.PublicKeys = make([]kyber.Point, 0)
publicKeys := make([]kyber.Point, 0)
// Create the the PubKey from the []byte sent from leader
for _, k := range pong.PubKeys {
@ -639,8 +641,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
node.log.Error("UnmarshalBinary Failed PubKeys", "error", err)
continue
}
node.Consensus.PublicKeys = append(node.Consensus.PublicKeys, key)
publicKeys = append(publicKeys, key)
}
return count
return node.Consensus.UpdatePublicKeys(publicKeys)
}

Loading…
Cancel
Save