Merge pull request #90 from harmony-one/lc4pr

HAR-5: update concensus.publicKeys
pull/92/head
Leo Chen 6 years ago committed by GitHub
commit e180e68ce0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      benchmark.go
  2. 44
      consensus/consensus.go
  3. 22
      consensus/consensus_leader.go
  4. 7
      deploy.sh
  5. 12
      node/node.go
  6. 2
      node/node_handler.go

@ -203,7 +203,7 @@ func main() {
// Temporary testing code, to be removed. // Temporary testing code, to be removed.
currentNode.AddTestingAddresses(10000) currentNode.AddTestingAddresses(10000)
currentNode.State = node.WAIT currentNode.State = node.NodeWaitToJoin
if consensus.IsLeader { if consensus.IsLeader {
if *accountModel { if *accountModel {

@ -38,7 +38,7 @@ type Consensus struct {
// FIXME: should use PubKey of p2p.Peer as the hashkey // FIXME: should use PubKey of p2p.Peer as the hashkey
// However, we have assumed uint16 in consensus/consensus_leader.go:136 // However, we have assumed uint16 in consensus/consensus_leader.go:136
// we won't change it now // we won't change it now
validators map[uint16]p2p.Peer validators sync.Map // key is uint16, value is p2p.Peer
// Minimal number of peers in the shard // Minimal number of peers in the shard
// If the number of validators is less than minPeers, the consensus won't start // If the number of validators is less than minPeers, the consensus won't start
@ -113,13 +113,12 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) *
consensus.commitments = &map[uint16]kyber.Point{} consensus.commitments = &map[uint16]kyber.Point{}
consensus.finalCommitments = &map[uint16]kyber.Point{} consensus.finalCommitments = &map[uint16]kyber.Point{}
consensus.validators = make(map[uint16]p2p.Peer)
consensus.responses = &map[uint16]kyber.Scalar{} consensus.responses = &map[uint16]kyber.Scalar{}
consensus.finalResponses = &map[uint16]kyber.Scalar{} consensus.finalResponses = &map[uint16]kyber.Scalar{}
consensus.leader = leader consensus.leader = leader
for _, peer := range peers { for _, peer := range peers {
consensus.validators[utils.GetUniqueIdFromPeer(peer)] = peer consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer)
} }
// Initialize cosign bitmap // Initialize cosign bitmap
@ -185,9 +184,15 @@ func (consensus *Consensus) signMessage(message []byte) []byte {
// GetValidatorPeers returns list of validator peers. // GetValidatorPeers returns list of validator peers.
func (consensus *Consensus) GetValidatorPeers() []p2p.Peer { func (consensus *Consensus) GetValidatorPeers() []p2p.Peer {
validatorPeers := make([]p2p.Peer, 0) validatorPeers := make([]p2p.Peer, 0)
for _, validatorPeer := range consensus.validators {
validatorPeers = append(validatorPeers, validatorPeer) consensus.validators.Range(func(k, v interface{}) bool {
} if peer, ok := v.(p2p.Peer); ok {
validatorPeers = append(validatorPeers, peer)
return true
}
return false
})
return validatorPeers return validatorPeers
} }
@ -223,14 +228,37 @@ func (consensus *Consensus) String() string {
duty, consensus.priKey.String(), consensus.ShardID, consensus.nodeID, consensus.state) duty, consensus.priKey.String(), consensus.ShardID, consensus.nodeID, consensus.state)
} }
// AddPeers will add new peers into the validator map of the consensus
// and add the public keys
func (consensus *Consensus) AddPeers(peers []p2p.Peer) int { func (consensus *Consensus) AddPeers(peers []p2p.Peer) int {
count := 0 count := 0
for _, peer := range peers { for _, peer := range peers {
_, ok := consensus.validators[utils.GetUniqueIdFromPeer(peer)] _, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer))
if !ok { if !ok {
consensus.validators[utils.GetUniqueIdFromPeer(peer)] = peer consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer)
consensus.publicKeys = append(consensus.publicKeys, peer.PubKey)
count++ count++
} }
} }
if count > 0 {
// regenerate bitmaps
mask, err := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey)
if err != nil {
panic("Failed to create mask")
}
finalMask, err := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey)
if err != nil {
panic("Failed to create final mask")
}
consensus.bitmap = mask
consensus.finalBitmap = finalMask
}
return count return count
} }
// RemovePeers will remove the peers from the validator list and publicKeys
// It will be called when leader/node lost connection to peers
func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
// TODO (lc) we need to have a corresponding RemovePeers function
return 0
}

@ -31,7 +31,7 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
newBlock := <-blockChannel newBlock := <-blockChannel
if !consensus.HasEnoughValidators() { if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.validators)) consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.publicKeys))
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
continue continue
} }
@ -157,11 +157,17 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta
offset += 64 offset += 64
// Verify signature // Verify signature
value, ok := consensus.validators[validatorID] v, ok := consensus.validators.Load(validatorID)
if !ok { if !ok {
consensus.Log.Warn("Received message from unrecognized validator", "validatorID", validatorID, "consensus", consensus) consensus.Log.Warn("Received message from unrecognized validator", "validatorID", validatorID, "consensus", consensus)
return return
} }
value, ok := v.(p2p.Peer)
if !ok {
consensus.Log.Warn("Invalid validator", "validatorID", validatorID, "consensus", consensus)
return
}
if schnorr.Verify(crypto.Ed25519Curve, value.PubKey, payload[:offset-64], signature) != nil { if schnorr.Verify(crypto.Ed25519Curve, value.PubKey, payload[:offset-64], signature) != nil {
consensus.Log.Warn("Received message with invalid signature", "validatorKey", consensus.leader.PubKey, "consensus", consensus) consensus.Log.Warn("Received message with invalid signature", "validatorKey", consensus.leader.PubKey, "consensus", consensus)
return return
@ -298,11 +304,17 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S
} }
// Verify signature // Verify signature
value, ok := consensus.validators[validatorID] v, ok := consensus.validators.Load(validatorID)
if !ok { if !ok {
consensus.Log.Warn("Received message from unrecognized validator", "validatorID", validatorID, "consensus", consensus) consensus.Log.Warn("Received message from unrecognized validator", "validatorID", validatorID, "consensus", consensus)
return return
} }
value, ok := v.(p2p.Peer)
if !ok {
consensus.Log.Warn("Invalid validator", "validatorID", validatorID, "consensus", consensus)
return
}
if schnorr.Verify(crypto.Ed25519Curve, value.PubKey, payload[:offset-64], signature) != nil { if schnorr.Verify(crypto.Ed25519Curve, value.PubKey, payload[:offset-64], signature) != nil {
consensus.Log.Warn("Received message with invalid signature", "validatorKey", consensus.leader.PubKey, "consensus", consensus) consensus.Log.Warn("Received message with invalid signature", "validatorKey", consensus.leader.PubKey, "consensus", consensus)
return return
@ -469,7 +481,7 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) {
"key": consensus.pubKey.String(), "key": consensus.pubKey.String(),
"tps": tps, "tps": tps,
"txCount": numOfTxs, "txCount": numOfTxs,
"nodeCount": len(consensus.validators) + 1, "nodeCount": len(consensus.publicKeys) + 1,
"latestBlockHash": hex.EncodeToString(consensus.blockHash[:]), "latestBlockHash": hex.EncodeToString(consensus.blockHash[:]),
"latestTxHashes": txHashes, "latestTxHashes": txHashes,
"blockLatency": int(timeElapsed / time.Millisecond), "blockLatency": int(timeElapsed / time.Millisecond),
@ -478,7 +490,7 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) {
} }
func (consensus *Consensus) HasEnoughValidators() bool { func (consensus *Consensus) HasEnoughValidators() bool {
if len(consensus.validators) < consensus.MinPeers { if len(consensus.publicKeys) < consensus.MinPeers {
return false return false
} }
return true return true

@ -13,6 +13,7 @@ USAGE: $ME [OPTIONS] config_file_name
-p use peer discovery (default: $PEER) -p use peer discovery (default: $PEER)
-d enable db support (default: $DB) -d enable db support (default: $DB)
-t toggle txgen (default: $TXGEN) -t toggle txgen (default: $TXGEN)
-D duration txgen run duration (default: $DURATION)
This script will build all the binaries and start benchmark and txgen based on the configuration file. This script will build all the binaries and start benchmark and txgen based on the configuration file.
@ -28,13 +29,15 @@ EOU
PEER= PEER=
DB= DB=
TXGEN=true TXGEN=true
DURATION=60
while getopts "hpdt" option; do while getopts "hpdtD:" option; do
case $option in case $option in
h) usage ;; h) usage ;;
p) PEER='-peer_discovery' ;; p) PEER='-peer_discovery' ;;
d) DB='-db_supported' ;; d) DB='-db_supported' ;;
t) TXGEN=false ;; t) TXGEN=false ;;
D) DURATION=$OPTARG ;;
esac esac
done done
@ -72,5 +75,5 @@ while IFS='' read -r line || [[ -n "$line" ]]; do
done < $config done < $config
if [ "$TXGEN" == "true" ]; then if [ "$TXGEN" == "true" ]; then
./bin/txgen -config_file $config -log_folder $log_folder ./bin/txgen -config_file $config -log_folder $log_folder -duration $DURATION
fi fi

@ -32,10 +32,10 @@ import (
type NodeState byte type NodeState byte
const ( const (
INIT NodeState = iota // Node just started, before contacting BeaconChain NodeInit NodeState = iota // Node just started, before contacting BeaconChain
WAIT // Node contacted BeaconChain, wait to join Shard NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard
JOIN // Node joined Shard, ready for consensus NodeJoinedShard // Node joined Shard, ready for consensus
OFFLINE // Node is offline NodeOffline // Node is offline
) )
type NetworkNode struct { type NetworkNode struct {
@ -245,7 +245,7 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
} }
// Logger // Logger
node.log = log.New() node.log = log.New()
node.State = INIT node.State = NodeInit
return &node return &node
} }
@ -275,7 +275,7 @@ func (node *Node) JoinShard(leader p2p.Peer) {
// try to join the shard, with 10 minutes time-out // try to join the shard, with 10 minutes time-out
backoff := p2p.NewExpBackoff(1*time.Second, 10*time.Minute, 2) backoff := p2p.NewExpBackoff(1*time.Second, 10*time.Minute, 2)
for node.State == WAIT { for node.State == NodeWaitToJoin {
backoff.Sleep() backoff.Sleep()
ping := proto_node.NewPingMessage(node.SelfPeer) ping := proto_node.NewPingMessage(node.SelfPeer)
buffer := ping.ConstructPingMessage() buffer := ping.ConstructPingMessage()

@ -555,7 +555,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) {
return return
} }
// node.log.Info("Pong", "Msg", pong) // node.log.Info("Pong", "Msg", pong)
node.State = JOIN node.State = NodeJoinedShard
peers := make([]p2p.Peer, 0) peers := make([]p2p.Peer, 0)

Loading…
Cancel
Save