diff --git a/benchmark.go b/benchmark.go index 1a99fa2ff..ef19246dd 100644 --- a/benchmark.go +++ b/benchmark.go @@ -203,7 +203,7 @@ func main() { // Temporary testing code, to be removed. currentNode.AddTestingAddresses(10000) - currentNode.State = node.WAIT + currentNode.State = node.NodeWaitToJoin if consensus.IsLeader { if *accountModel { diff --git a/consensus/consensus.go b/consensus/consensus.go index bd5136685..8b986a173 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -38,7 +38,7 @@ type Consensus struct { // FIXME: should use PubKey of p2p.Peer as the hashkey // However, we have assumed uint16 in consensus/consensus_leader.go:136 // we won't change it now - validators map[uint16]p2p.Peer + validators sync.Map // key is uint16, value is p2p.Peer // Minimal number of peers in the shard // If the number of validators is less than minPeers, the consensus won't start @@ -113,13 +113,12 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) * consensus.commitments = &map[uint16]kyber.Point{} consensus.finalCommitments = &map[uint16]kyber.Point{} - consensus.validators = make(map[uint16]p2p.Peer) consensus.responses = &map[uint16]kyber.Scalar{} consensus.finalResponses = &map[uint16]kyber.Scalar{} consensus.leader = leader for _, peer := range peers { - consensus.validators[utils.GetUniqueIdFromPeer(peer)] = peer + consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) } // Initialize cosign bitmap @@ -185,9 +184,15 @@ func (consensus *Consensus) signMessage(message []byte) []byte { // GetValidatorPeers returns list of validator peers. func (consensus *Consensus) GetValidatorPeers() []p2p.Peer { validatorPeers := make([]p2p.Peer, 0) - for _, validatorPeer := range consensus.validators { - validatorPeers = append(validatorPeers, validatorPeer) - } + + consensus.validators.Range(func(k, v interface{}) bool { + if peer, ok := v.(p2p.Peer); ok { + validatorPeers = append(validatorPeers, peer) + return true + } + return false + }) + return validatorPeers } @@ -223,14 +228,37 @@ func (consensus *Consensus) String() string { duty, consensus.priKey.String(), consensus.ShardID, consensus.nodeID, consensus.state) } +// AddPeers will add new peers into the validator map of the consensus +// and add the public keys func (consensus *Consensus) AddPeers(peers []p2p.Peer) int { count := 0 for _, peer := range peers { - _, ok := consensus.validators[utils.GetUniqueIdFromPeer(peer)] + _, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer)) if !ok { - consensus.validators[utils.GetUniqueIdFromPeer(peer)] = peer + consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) + consensus.publicKeys = append(consensus.publicKeys, peer.PubKey) count++ } } + if count > 0 { + // regenerate bitmaps + mask, err := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey) + if err != nil { + panic("Failed to create mask") + } + finalMask, err := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey) + if err != nil { + panic("Failed to create final mask") + } + consensus.bitmap = mask + consensus.finalBitmap = finalMask + } return count } + +// RemovePeers will remove the peers from the validator list and publicKeys +// It will be called when leader/node lost connection to peers +func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int { + // TODO (lc) we need to have a corresponding RemovePeers function + return 0 +} diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index f1ecd7647..f9904430d 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -31,7 +31,7 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) newBlock := <-blockChannel if !consensus.HasEnoughValidators() { - consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.validators)) + consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.publicKeys)) time.Sleep(500 * time.Millisecond) continue } @@ -157,11 +157,17 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta offset += 64 // Verify signature - value, ok := consensus.validators[validatorID] + v, ok := consensus.validators.Load(validatorID) if !ok { consensus.Log.Warn("Received message from unrecognized validator", "validatorID", validatorID, "consensus", consensus) return } + value, ok := v.(p2p.Peer) + if !ok { + consensus.Log.Warn("Invalid validator", "validatorID", validatorID, "consensus", consensus) + return + } + if schnorr.Verify(crypto.Ed25519Curve, value.PubKey, payload[:offset-64], signature) != nil { consensus.Log.Warn("Received message with invalid signature", "validatorKey", consensus.leader.PubKey, "consensus", consensus) return @@ -298,11 +304,17 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S } // Verify signature - value, ok := consensus.validators[validatorID] + v, ok := consensus.validators.Load(validatorID) if !ok { consensus.Log.Warn("Received message from unrecognized validator", "validatorID", validatorID, "consensus", consensus) return } + value, ok := v.(p2p.Peer) + if !ok { + consensus.Log.Warn("Invalid validator", "validatorID", validatorID, "consensus", consensus) + return + } + if schnorr.Verify(crypto.Ed25519Curve, value.PubKey, payload[:offset-64], signature) != nil { consensus.Log.Warn("Received message with invalid signature", "validatorKey", consensus.leader.PubKey, "consensus", consensus) return @@ -469,7 +481,7 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) { "key": consensus.pubKey.String(), "tps": tps, "txCount": numOfTxs, - "nodeCount": len(consensus.validators) + 1, + "nodeCount": len(consensus.publicKeys) + 1, "latestBlockHash": hex.EncodeToString(consensus.blockHash[:]), "latestTxHashes": txHashes, "blockLatency": int(timeElapsed / time.Millisecond), @@ -478,7 +490,7 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) { } func (consensus *Consensus) HasEnoughValidators() bool { - if len(consensus.validators) < consensus.MinPeers { + if len(consensus.publicKeys) < consensus.MinPeers { return false } return true diff --git a/deploy.sh b/deploy.sh index 6cdf621ae..7629e8b61 100755 --- a/deploy.sh +++ b/deploy.sh @@ -13,6 +13,7 @@ USAGE: $ME [OPTIONS] config_file_name -p use peer discovery (default: $PEER) -d enable db support (default: $DB) -t toggle txgen (default: $TXGEN) + -D duration txgen run duration (default: $DURATION) This script will build all the binaries and start benchmark and txgen based on the configuration file. @@ -28,13 +29,15 @@ EOU PEER= DB= TXGEN=true +DURATION=60 -while getopts "hpdt" option; do +while getopts "hpdtD:" option; do case $option in h) usage ;; p) PEER='-peer_discovery' ;; d) DB='-db_supported' ;; t) TXGEN=false ;; + D) DURATION=$OPTARG ;; esac done @@ -72,5 +75,5 @@ while IFS='' read -r line || [[ -n "$line" ]]; do done < $config if [ "$TXGEN" == "true" ]; then - ./bin/txgen -config_file $config -log_folder $log_folder + ./bin/txgen -config_file $config -log_folder $log_folder -duration $DURATION fi diff --git a/node/node.go b/node/node.go index 922cdc276..a4dbcbb7e 100644 --- a/node/node.go +++ b/node/node.go @@ -32,10 +32,10 @@ import ( type NodeState byte const ( - INIT NodeState = iota // Node just started, before contacting BeaconChain - WAIT // Node contacted BeaconChain, wait to join Shard - JOIN // Node joined Shard, ready for consensus - OFFLINE // Node is offline + NodeInit NodeState = iota // Node just started, before contacting BeaconChain + NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard + NodeJoinedShard // Node joined Shard, ready for consensus + NodeOffline // Node is offline ) type NetworkNode struct { @@ -245,7 +245,7 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { } // Logger node.log = log.New() - node.State = INIT + node.State = NodeInit return &node } @@ -275,7 +275,7 @@ func (node *Node) JoinShard(leader p2p.Peer) { // try to join the shard, with 10 minutes time-out backoff := p2p.NewExpBackoff(1*time.Second, 10*time.Minute, 2) - for node.State == WAIT { + for node.State == NodeWaitToJoin { backoff.Sleep() ping := proto_node.NewPingMessage(node.SelfPeer) buffer := ping.ConstructPingMessage() diff --git a/node/node_handler.go b/node/node_handler.go index 78c1fcb36..fd3430c84 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -555,7 +555,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) { return } // node.log.Info("Pong", "Msg", pong) - node.State = JOIN + node.State = NodeJoinedShard peers := make([]p2p.Peer, 0)