Add collective signature verification and temporarily bump the sig threshold to every nodes

pull/61/head
Rongjian Lan 6 years ago
parent 88ee34821c
commit 09676aafb9
  1. 4
      configr/main.go
  2. 15
      consensus/consensus.go
  3. 29
      consensus/consensus_leader.go
  4. 4
      consensus/consensus_leader_msg.go
  5. 84
      consensus/consensus_validator.go
  6. 4
      utils/distribution_config.go

@ -112,11 +112,11 @@ func (configr *Configr) GetShardID(ip, port string) string {
return "N/A"
}
// GetPeers Gets the peer list of the node corresponding to this ip and port
// GetPeers Gets the validator list
func (configr *Configr) GetPeers(ip, port, shardID string) []p2p.Peer {
var peerList []p2p.Peer
for _, entry := range configr.config {
if entry.Role != "validator" || entry.IP == ip && entry.Port == port || entry.ShardID != shardID {
if entry.Role != "validator" || entry.ShardID != shardID {
continue
}
// Get public key deterministically based on ip and port

@ -23,6 +23,9 @@ type Consensus struct {
commitments map[uint16]kyber.Point
aggregatedCommitment kyber.Point
// Challenges
challenge [32]byte
// Commits collected from validators.
bitmap *crypto.Mask
// Responses collected from validators
@ -31,6 +34,9 @@ type Consensus struct {
validators map[uint16]p2p.Peer
// Leader
leader p2p.Peer
// Public keys of the committee including leader and validators
publicKeys []kyber.Point
// private/public keys of current node
priKey kyber.Scalar
pubKey kyber.Point
@ -101,15 +107,16 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) *
}
// Initialize cosign bitmap
allPublics := make([]kyber.Point, 0)
allPublicKeys := make([]kyber.Point, 0)
for _, validatorPeer := range consensus.validators {
allPublics = append(allPublics, validatorPeer.PubKey)
allPublicKeys = append(allPublicKeys, validatorPeer.PubKey)
}
allPublics = append(allPublics, leader.PubKey)
mask, err := crypto.NewMask(crypto.Ed25519Curve, allPublics, consensus.leader.PubKey)
allPublicKeys = append(allPublicKeys, leader.PubKey)
mask, err := crypto.NewMask(crypto.Ed25519Curve, allPublicKeys, consensus.leader.PubKey)
if err != nil {
panic("Failed to create commitment mask")
}
consensus.publicKeys = allPublicKeys
consensus.bitmap = mask
// For now use socket address as 16 byte Id

@ -152,11 +152,24 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
return
}
if len(consensus.commitments) >= (2*(len(consensus.validators)+1))/3+1 && consensus.state < CHALLENGE_DONE {
consensus.Log.Debug("Enough commitments received with signatures", "numOfSignatures", len(consensus.commitments))
if len(consensus.commitments) >= len(consensus.validators)+1 && consensus.state < CHALLENGE_DONE {
consensus.Log.Debug("Enough commitments received with signatures", "num", len(consensus.commitments))
// Broadcast challenge
msgToSend := consensus.constructChallengeMessage(proto_consensus.CHALLENGE)
// Add leader's response
challengeScalar := crypto.Ed25519Curve.Scalar()
challengeScalar.UnmarshalBinary(consensus.challenge[:])
response, err := crypto.Response(crypto.Ed25519Curve, consensus.priKey, consensus.secret, challengeScalar)
if err == nil {
consensus.responses[consensus.nodeId] = response
consensus.bitmap.SetKey(consensus.pubKey, true)
} else {
log.Warn("Failed to generate response", "err", err)
}
// Broadcast challenge message
p2p.BroadcastMessage(consensus.getValidatorPeers(), msgToSend)
// Set state to CHALLENGE_DONE
@ -230,20 +243,22 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
}
//consensus.Log.Debug("RECEIVED RESPONSE", "consensusId", consensusId)
if len(consensus.responses) >= (2*len(consensus.validators))/3+1 && consensus.state != FINISHED {
if len(consensus.responses) >= len(consensus.validators)+1 && consensus.state != FINISHED {
consensus.mutex.Lock()
if len(consensus.responses) >= (2*len(consensus.validators))/3+1 && consensus.state != FINISHED {
if len(consensus.responses) >= len(consensus.validators)+1 && consensus.state != FINISHED {
consensus.Log.Debug("Enough responses received with signatures", "num", len(consensus.responses))
// Aggregate responses
responses := make([]kyber.Scalar, 0)
responses := []kyber.Scalar{}
for _, val := range consensus.responses {
responses = append(responses, val)
}
aggResponse, err := crypto.AggregateResponses(crypto.Ed25519Curve, responses)
aggregatedResponse, err := crypto.AggregateResponses(crypto.Ed25519Curve, responses)
if err != nil {
log.Error("Failed to aggregate responses")
return
}
collectiveSigAndBitmap, err := crypto.Sign(crypto.Ed25519Curve, consensus.aggregatedCommitment, aggResponse, consensus.bitmap)
collectiveSigAndBitmap, err := crypto.Sign(crypto.Ed25519Curve, consensus.aggregatedCommitment, aggregatedResponse, consensus.bitmap)
if err != nil {
log.Error("Failed to create collective signature")

@ -65,7 +65,9 @@ func (consensus *Consensus) constructChallengeMessage(msgType proto_consensus.Me
buffer.Write(getAggregatedKey(consensus.bitmap))
// 32 byte challenge
buffer.Write(getChallenge(aggCommitment, consensus.bitmap.AggregatePublic, buffer.Bytes()[:36])) // message contains consensus id and block hash for now.
challenge := getChallenge(aggCommitment, consensus.bitmap.AggregatePublic, buffer.Bytes()[:36])
buffer.Write(challenge) // message contains consensus id and block hash for now.
copy(consensus.challenge[:], challenge)
consensus.aggregatedCommitment = aggCommitment
// 64 byte of signature on previous data

@ -32,6 +32,8 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
consensus.processAnnounceMessage(payload)
case proto_consensus.CHALLENGE:
consensus.processChallengeMessage(payload)
case proto_consensus.COLLECTIVE_SIG:
consensus.processCollectiveSigMessage(payload)
default:
consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus)
}
@ -230,7 +232,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
response, err := crypto.Response(crypto.Ed25519Curve, consensus.priKey, consensus.secret, receivedChallenge)
if err != nil {
log.Info("Failed to generate response", "err", err)
log.Warn("Failed to generate response", "err", err)
return
}
msgToSend := consensus.constructResponseMessage(proto_consensus.RESPONSE, response)
@ -275,3 +277,83 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
}
consensus.mutex.Unlock()
}
// Processes the collective signature message sent from the leader
func (consensus *Consensus) processCollectiveSigMessage(payload []byte) {
//#### Read payload data
offset := 0
// 4 byte consensus id
consensusId := binary.BigEndian.Uint32(payload[offset : offset+4])
offset += 4
// 32 byte block hash
blockHash := payload[offset : offset+32]
offset += 32
// 2 byte leader id
leaderId := binary.BigEndian.Uint16(payload[offset : offset+2])
offset += 2
// 64 byte of collective signature
collectiveSig := payload[offset : offset+64]
offset += 64
// N byte of bitmap
n := len(payload) - offset - 64 // the number means 64 signature
bitmap := payload[offset : offset+n]
offset += n
// 64 byte of signature on previous data
signature := payload[offset : offset+64]
offset += 64
//#### END: Read payload data
copy(consensus.blockHash[:], blockHash[:])
// Verify block data
// check leader Id
myLeaderId := utils.GetUniqueIdFromPeer(consensus.leader)
if leaderId != myLeaderId {
consensus.Log.Warn("Received message from wrong leader", "myLeaderId", myLeaderId, "receivedLeaderId", leaderId, "consensus", consensus)
return
}
// Verify signature
if schnorr.Verify(crypto.Ed25519Curve, consensus.leader.PubKey, payload[:offset-64], signature) != nil {
consensus.Log.Warn("Received message with invalid signature", "leaderKey", consensus.leader.PubKey, "consensus", consensus)
return
}
// Verify collective signature
err := crypto.Verify(crypto.Ed25519Curve, consensus.publicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy(len(consensus.publicKeys)/2))
if err != nil {
consensus.Log.Warn("Failed to verify the collective sig message", "consensusId", consensusId, "err", err)
}
// Add attack model of IncorrectResponse.
if attack.GetInstance().IncorrectResponse() {
consensus.Log.Warn("IncorrectResponse attacked")
return
}
// check consensus Id
if consensusId != consensus.consensusId {
consensus.Log.Warn("Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus)
return
}
// check block hash
if bytes.Compare(blockHash[:], consensus.blockHash[:]) != 0 {
consensus.Log.Warn("Block hash doesn't match", "consensus", consensus)
return
}
secret, msgToSend := consensus.constructCommitMessage(proto_consensus.FINAL_COMMIT)
// Store the commitment secret
consensus.secret = secret
p2p.SendMessage(consensus.leader, msgToSend)
// Set state to COMMIT_DONE
consensus.state = FINAL_COMMIT_DONE
}

@ -116,11 +116,11 @@ func (config *DistributionConfig) GetShardID(ip, port string) string {
return "N/A"
}
// GetPeers Gets the peer list of the node corresponding to this ip and port
// GetPeers Gets the validator list
func (config *DistributionConfig) GetPeers(ip, port, shardID string) []p2p.Peer {
var peerList []p2p.Peer
for _, entry := range config.config {
if entry.Role != "validator" || entry.IP == ip && entry.Port == port || entry.ShardID != shardID {
if entry.Role != "validator" || entry.ShardID != shardID {
continue
}
// Get public key deterministically based on ip and port

Loading…
Cancel
Save