You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
299 lines
9.8 KiB
299 lines
9.8 KiB
// Package consensus implements the Cosi PBFT consensus
|
|
package consensus // consensus
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"sync"
|
|
|
|
"github.com/dedis/kyber"
|
|
"github.com/dedis/kyber/sign/schnorr"
|
|
"github.com/harmony-one/harmony/blockchain"
|
|
"github.com/harmony-one/harmony/crypto"
|
|
"github.com/harmony-one/harmony/crypto/pki"
|
|
"github.com/harmony-one/harmony/log"
|
|
"github.com/harmony-one/harmony/p2p"
|
|
"github.com/harmony-one/harmony/utils"
|
|
)
|
|
|
|
// Consensus data containing all info related to one round of consensus process
|
|
type Consensus struct {
|
|
state State
|
|
// Commits collected from validators. A map from node Id to its commitment
|
|
commitments *map[uint16]kyber.Point
|
|
finalCommitments *map[uint16]kyber.Point
|
|
aggregatedCommitment kyber.Point
|
|
aggregatedFinalCommitment kyber.Point
|
|
bitmap *crypto.Mask
|
|
finalBitmap *crypto.Mask
|
|
|
|
// Challenges
|
|
challenge [32]byte
|
|
finalChallenge [32]byte
|
|
|
|
// Responses collected from validators
|
|
responses *map[uint16]kyber.Scalar
|
|
finalResponses *map[uint16]kyber.Scalar
|
|
// map of nodeID to validator Peer object
|
|
// FIXME: should use PubKey of p2p.Peer as the hashkey
|
|
// However, we have assumed uint16 in consensus/consensus_leader.go:136
|
|
// we won't change it now
|
|
validators sync.Map // key is uint16, value is p2p.Peer
|
|
|
|
// Minimal number of peers in the shard
|
|
// If the number of validators is less than minPeers, the consensus won't start
|
|
MinPeers int
|
|
|
|
// Leader
|
|
leader p2p.Peer
|
|
// Public keys of the committee including leader and validators
|
|
PublicKeys []kyber.Point
|
|
pubKeyLock sync.Mutex
|
|
|
|
// private/public keys of current node
|
|
priKey kyber.Scalar
|
|
pubKey kyber.Point
|
|
|
|
// Whether I am leader. False means I am validator
|
|
IsLeader bool
|
|
// Leader or validator Id - 2 byte
|
|
nodeID uint16
|
|
// Consensus Id (View Id) - 4 byte
|
|
consensusID uint32
|
|
// Blockhash - 32 byte
|
|
blockHash [32]byte
|
|
// BlockHeader to run consensus on
|
|
blockHeader []byte
|
|
// Array of block hashes.
|
|
blockHashes [][32]byte
|
|
// Shard Id which this node belongs to
|
|
ShardID uint32
|
|
|
|
// global consensus mutex
|
|
mutex sync.Mutex
|
|
|
|
// Validator specific fields
|
|
// Blocks received but not done with consensus yet
|
|
blocksReceived map[uint32]*BlockConsensusStatus
|
|
// Commitment secret
|
|
secret map[uint32]kyber.Scalar
|
|
|
|
// Signal channel for starting a new consensus process
|
|
ReadySignal chan struct{}
|
|
// The verifier func passed from Node object
|
|
BlockVerifier func(*blockchain.Block) bool
|
|
// The post-consensus processing func passed from Node object
|
|
// Called when consensus on a new block is done
|
|
OnConsensusDone func(*blockchain.Block)
|
|
|
|
Log log.Logger
|
|
|
|
uniqueIDInstance *utils.UniqueValidatorID
|
|
}
|
|
|
|
// BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far
|
|
// This is mainly used in the case that this node is lagging behind and needs to catch up.
|
|
// For example, the consensus moved to round N and this node received message(N).
|
|
// However, this node may still not finished with round N-1, so the newly received message(N)
|
|
// should be stored in this temporary structure. In case the round N-1 finishes, it can catch
|
|
// up to the latest state of round N by using this structure.
|
|
type BlockConsensusStatus struct {
|
|
blockHeader []byte // the block header of the block which the consensus is running on
|
|
state State // the latest state of the consensus
|
|
}
|
|
|
|
// New creates a new Consensus object
|
|
func New(selfPeer p2p.Peer, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Consensus {
|
|
consensus := Consensus{}
|
|
|
|
if leader.Port == selfPeer.Port && leader.IP == selfPeer.IP {
|
|
consensus.IsLeader = true
|
|
} else {
|
|
consensus.IsLeader = false
|
|
}
|
|
|
|
consensus.commitments = &map[uint16]kyber.Point{}
|
|
consensus.finalCommitments = &map[uint16]kyber.Point{}
|
|
consensus.responses = &map[uint16]kyber.Scalar{}
|
|
consensus.finalResponses = &map[uint16]kyber.Scalar{}
|
|
|
|
consensus.leader = leader
|
|
for _, peer := range peers {
|
|
consensus.validators.Store(utils.GetUniqueIDFromPeer(peer), peer)
|
|
}
|
|
|
|
// Initialize cosign bitmap
|
|
allPublicKeys := make([]kyber.Point, 0)
|
|
for _, validatorPeer := range peers {
|
|
allPublicKeys = append(allPublicKeys, validatorPeer.PubKey)
|
|
}
|
|
allPublicKeys = append(allPublicKeys, leader.PubKey)
|
|
mask, err := crypto.NewMask(crypto.Ed25519Curve, allPublicKeys, consensus.leader.PubKey)
|
|
if err != nil {
|
|
panic("Failed to create mask")
|
|
}
|
|
finalMask, err := crypto.NewMask(crypto.Ed25519Curve, allPublicKeys, consensus.leader.PubKey)
|
|
if err != nil {
|
|
panic("Failed to create final mask")
|
|
}
|
|
consensus.PublicKeys = allPublicKeys
|
|
consensus.bitmap = mask
|
|
consensus.finalBitmap = finalMask
|
|
|
|
consensus.secret = map[uint32]kyber.Scalar{}
|
|
|
|
// For now use socket address as 16 byte Id
|
|
// TODO: populate with correct Id
|
|
consensus.nodeID = utils.GetUniqueIDFromPeer(selfPeer)
|
|
|
|
// Set private key for myself so that I can sign messages.
|
|
consensus.priKey = crypto.Ed25519Curve.Scalar().SetInt64(int64(consensus.nodeID))
|
|
consensus.pubKey = pki.GetPublicKeyFromScalar(consensus.priKey)
|
|
consensus.consensusID = 0 // or view Id in the original pbft paper
|
|
|
|
myShardID, err := strconv.Atoi(ShardID)
|
|
if err != nil {
|
|
panic("Unparseable shard Id" + ShardID)
|
|
}
|
|
consensus.ShardID = uint32(myShardID)
|
|
|
|
// For validators to keep track of all blocks received but not yet committed, so as to catch up to latest consensus if lagged behind.
|
|
consensus.blocksReceived = make(map[uint32]*BlockConsensusStatus)
|
|
|
|
if consensus.IsLeader {
|
|
consensus.ReadySignal = make(chan struct{})
|
|
// send a signal to indicate it's ready to run consensus
|
|
// this signal is consumed by node object to create a new block and in turn trigger a new consensus on it
|
|
// this is a goroutine because go channel without buffer will block
|
|
go func() {
|
|
consensus.ReadySignal <- struct{}{}
|
|
}()
|
|
}
|
|
|
|
consensus.Log = log.New()
|
|
consensus.uniqueIDInstance = utils.GetUniqueValidatorIDInstance()
|
|
|
|
// consensus.Log.Info("New Consensus", "IP", ip, "Port", port, "NodeID", consensus.nodeID, "priKey", consensus.priKey, "pubKey", consensus.pubKey)
|
|
return &consensus
|
|
}
|
|
|
|
func (consensus *Consensus) signMessage(message []byte) []byte {
|
|
signature, err := schnorr.Sign(crypto.Ed25519Curve, consensus.priKey, message)
|
|
if err != nil {
|
|
panic("Failed to sign message with Schnorr signature.")
|
|
}
|
|
return signature
|
|
}
|
|
|
|
// GetValidatorPeers returns list of validator peers.
|
|
func (consensus *Consensus) GetValidatorPeers() []p2p.Peer {
|
|
validatorPeers := make([]p2p.Peer, 0)
|
|
|
|
consensus.validators.Range(func(k, v interface{}) bool {
|
|
if peer, ok := v.(p2p.Peer); ok {
|
|
validatorPeers = append(validatorPeers, peer)
|
|
return true
|
|
}
|
|
return false
|
|
})
|
|
|
|
return validatorPeers
|
|
}
|
|
|
|
// ResetState resets the state of the consensus
|
|
func (consensus *Consensus) ResetState() {
|
|
consensus.state = Finished
|
|
consensus.commitments = &map[uint16]kyber.Point{}
|
|
consensus.finalCommitments = &map[uint16]kyber.Point{}
|
|
consensus.responses = &map[uint16]kyber.Scalar{}
|
|
consensus.finalResponses = &map[uint16]kyber.Scalar{}
|
|
|
|
mask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.PublicKeys, consensus.leader.PubKey)
|
|
finalMask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.PublicKeys, consensus.leader.PubKey)
|
|
consensus.bitmap = mask
|
|
consensus.finalBitmap = finalMask
|
|
consensus.bitmap.SetMask([]byte{})
|
|
consensus.finalBitmap.SetMask([]byte{})
|
|
|
|
consensus.aggregatedCommitment = nil
|
|
consensus.aggregatedFinalCommitment = nil
|
|
consensus.secret = map[uint32]kyber.Scalar{}
|
|
}
|
|
|
|
// Returns a string representation of this consensus
|
|
func (consensus *Consensus) String() string {
|
|
var duty string
|
|
if consensus.IsLeader {
|
|
duty = "LDR" // leader
|
|
} else {
|
|
duty = "VLD" // validator
|
|
}
|
|
return fmt.Sprintf("[duty:%s, priKey:%s, ShardID:%v, nodeID:%v, state:%s]",
|
|
duty, consensus.priKey.String(), consensus.ShardID, consensus.nodeID, consensus.state)
|
|
}
|
|
|
|
// AddPeers will add new peers into the validator map of the consensus
|
|
// and add the public keys
|
|
func (consensus *Consensus) AddPeers(peers []*p2p.Peer) int {
|
|
count := 0
|
|
|
|
for _, peer := range peers {
|
|
_, ok := consensus.validators.Load(utils.GetUniqueIDFromPeer(*peer))
|
|
if !ok {
|
|
if peer.ValidatorID == -1 {
|
|
peer.ValidatorID = int(consensus.uniqueIDInstance.GetUniqueID())
|
|
}
|
|
consensus.validators.Store(utils.GetUniqueIDFromPeer(*peer), *peer)
|
|
consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey)
|
|
}
|
|
count++
|
|
}
|
|
return count
|
|
}
|
|
|
|
// RemovePeers will remove the peers from the validator list and PublicKeys
|
|
// It will be called when leader/node lost connection to peers
|
|
func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
|
|
// TODO (lc) we need to have a corresponding RemovePeers function
|
|
return 0
|
|
}
|
|
|
|
// DebugPrintPublicKeys print all the PublicKeys in string format in Consensus
|
|
func (consensus *Consensus) DebugPrintPublicKeys() {
|
|
for _, k := range consensus.PublicKeys {
|
|
str := fmt.Sprintf("%s", k)
|
|
consensus.Log.Debug("pk:", "string", str)
|
|
}
|
|
|
|
consensus.Log.Debug("PublicKeys:", "#", len(consensus.PublicKeys))
|
|
}
|
|
|
|
// DebugPrintValidators print all validator ip/port/key in string format in Consensus
|
|
func (consensus *Consensus) DebugPrintValidators() {
|
|
count := 0
|
|
consensus.validators.Range(func(k, v interface{}) bool {
|
|
if p, ok := v.(p2p.Peer); ok {
|
|
str2 := fmt.Sprintf("%s", p.PubKey)
|
|
consensus.Log.Debug("validator:", "IP", p.IP, "Port", p.Port, "VID", p.ValidatorID, "Key", str2)
|
|
count++
|
|
return true
|
|
}
|
|
return false
|
|
})
|
|
consensus.Log.Debug("Validators", "#", count)
|
|
}
|
|
|
|
// UpdatePublicKeys updates the PublicKeys variable, protected by a mutex
|
|
func (consensus *Consensus) UpdatePublicKeys(pubKeys []kyber.Point) int {
|
|
consensus.pubKeyLock.Lock()
|
|
// consensus.PublicKeys = make([]kyber.Point, len(pubKeys))
|
|
consensus.PublicKeys = append(pubKeys[:0:0], pubKeys...)
|
|
consensus.pubKeyLock.Unlock()
|
|
|
|
return len(consensus.PublicKeys)
|
|
}
|
|
|
|
// GetNodeID returns the nodeID
|
|
func (consensus *Consensus) GetNodeID() uint16 {
|
|
return consensus.nodeID
|
|
}
|
|
|