From 0e5e4dce1f9ed363f0d5bc4adaa25a2743792913 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 21 Aug 2018 15:02:10 -0700 Subject: [PATCH 1/8] Bring back the 2/3f+1 threshold for multi-sig and add basic response check --- consensus/consensus_leader.go | 48 ++++++++++++++++++++++++++------ consensus/consensus_validator.go | 2 +- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index b9c6efd99..876038ecd 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "encoding/gob" + "errors" "time" "github.com/dedis/kyber" @@ -152,7 +153,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte) { return } - if len(consensus.commitments) >= len(consensus.validators)+1 && consensus.state < CHALLENGE_DONE { + if len(consensus.commitments) >= (2*len(consensus.publicKeys)/3)+1 && consensus.state < CHALLENGE_DONE { consensus.Log.Debug("Enough commitments received with signatures", "num", len(consensus.commitments)) // Broadcast challenge @@ -230,11 +231,19 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { _, ok = consensus.responses[validatorId] shouldProcess = shouldProcess && !ok if shouldProcess { - scalar := crypto.Ed25519Curve.Scalar() - scalar.UnmarshalBinary(response) - consensus.responses[validatorId] = scalar - // Set the bitmap indicate this validate signed. TODO: figure out how to resolve the inconsistency of validators from commit and response messages - consensus.bitmap.SetKey(value.PubKey, true) + // verify the response matches the received commit + responseScalar := crypto.Ed25519Curve.Scalar() + responseScalar.UnmarshalBinary(response) + err := consensus.verifyResponse(responseScalar, validatorId) + if err != nil { + consensus.Log.Warn("Failed to verify the response", "error", err) + shouldProcess = false + } else { + consensus.responses[validatorId] = responseScalar + // Set the bitmap indicate this validate signed. TODO: figure out how to resolve the inconsistency of validators from commit and response messages + consensus.bitmap.SetKey(value.PubKey, true) + } + } consensus.mutex.Unlock() @@ -243,9 +252,9 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { } //consensus.Log.Debug("RECEIVED RESPONSE", "consensusId", consensusId) - if len(consensus.responses) >= len(consensus.validators)+1 && consensus.state != FINISHED { + if len(consensus.responses) >= (2*len(consensus.publicKeys)/3)+1 && consensus.state != FINISHED { consensus.mutex.Lock() - if len(consensus.responses) >= len(consensus.validators)+1 && consensus.state != FINISHED { + if len(consensus.responses) >= (2*len(consensus.publicKeys)/3)+1 && consensus.state != FINISHED { consensus.Log.Debug("Enough responses received with signatures", "num", len(consensus.responses)) // Aggregate responses responses := []kyber.Scalar{} @@ -317,3 +326,26 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { consensus.mutex.Unlock() } } + +func (consensus *Consensus) verifyResponse(response kyber.Scalar, validatorId uint16) error { + if response.Equal(crypto.Ed25519Curve.Scalar()) { + return errors.New("response is zero valued") + } + _, ok := consensus.commitments[validatorId] + if !ok { + return errors.New("no commit is received for the validator") + } + // TODO(RJ): enable the actual check + //challenge := crypto.Ed25519Curve.Scalar() + //challenge.UnmarshalBinary(consensus.challenge[:]) + // + //// compute Q = sG + r*pubKey + //sG := crypto.Ed25519Curve.Point().Mul(response, nil) + //r_pubKey := crypto.Ed25519Curve.Point().Mul(challenge, consensus.validators[validatorId].PubKey) + //Q := crypto.Ed25519Curve.Point().Add(sG, r_pubKey) + // + //if !Q.Equal(commit) { + // return errors.New("recreated commit doesn't match the received one") + //} + return nil +} diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 047eeb0ef..7ca661b1c 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -325,7 +325,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) { } // Verify collective signature - err := crypto.Verify(crypto.Ed25519Curve, consensus.publicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy(len(consensus.publicKeys)/2)) + err := crypto.Verify(crypto.Ed25519Curve, consensus.publicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy((2*len(consensus.publicKeys)/3)+1)) if err != nil { consensus.Log.Warn("Failed to verify the collective sig message", "consensusId", consensusId, "err", err) } From 9e645f186e68b804ff97cf8c4ad274feefe3016b Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Tue, 21 Aug 2018 16:30:51 -0700 Subject: [PATCH 2/8] enable leveldb into blockchain --- benchmark.go | 18 +++++++++++++++++- client/btctxgen/main.go | 4 ++-- client/txgen/main.go | 4 ++-- deploy.sh | 8 +++++++- node/node.go | 15 ++++++++++++++- node/node_handler.go | 6 ++++++ node/node_test.go | 4 ++-- 7 files changed, 50 insertions(+), 9 deletions(-) diff --git a/benchmark.go b/benchmark.go index a38ede8c7..21875d8ef 100644 --- a/benchmark.go +++ b/benchmark.go @@ -10,6 +10,7 @@ import ( "github.com/simple-rules/harmony-benchmark/attack" "github.com/simple-rules/harmony-benchmark/consensus" + "github.com/simple-rules/harmony-benchmark/db" "github.com/simple-rules/harmony-benchmark/log" "github.com/simple-rules/harmony-benchmark/node" "github.com/simple-rules/harmony-benchmark/utils" @@ -38,12 +39,23 @@ func startProfiler(shardID string, logFolder string) { } } +func InitLDBDatabase(ip string, port string) (*db.LDBDatabase, error) { + // TODO(minhdoan): Refactor this. + dbFileName := "/tmp/harmony_" + ip + port + ".dat" + var err = os.RemoveAll(dbFileName) + if err != nil { + fmt.Println(err.Error()) + } + return db.NewLDBDatabase(dbFileName, 0, 0) +} + func main() { ip := flag.String("ip", "127.0.0.1", "IP of the node") port := flag.String("port", "9000", "port of the node.") configFile := flag.String("config_file", "config.txt", "file containing all ip addresses") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") attackedMode := flag.Int("attacked_mode", 0, "0 means not attacked, 1 means attacked, 2 means being open to be selected as attacked") + dbSupported := flag.Int("db_supported", 0, "0 means not db_supported, 1 means db_supported") flag.Parse() // Set up randomization seed. @@ -85,7 +97,11 @@ func main() { // Set logger to attack model. attack.GetInstance().SetLogger(consensus.Log) // Current node. - currentNode := node.New(consensus) + currentNode := node.New(consensus, true) + // Initialize leveldb if dbSupported. + if *dbSupported == 1 { + currentNode.DB, _ = InitLDBDatabase(*ip, *port) + } // Create client peer. clientPeer := distributionConfig.GetClientPeer() // If there is a client configured in the node list. diff --git a/client/btctxgen/main.go b/client/btctxgen/main.go index 014e57621..eaa51ae35 100644 --- a/client/btctxgen/main.go +++ b/client/btctxgen/main.go @@ -200,13 +200,13 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} for _, shardID := range shardIDs { - nodes = append(nodes, node.New(&consensus.Consensus{ShardID: shardID})) + nodes = append(nodes, node.New(&consensus.Consensus{ShardID: shardID}, false)) } // Client/txgenerator server node setup clientPort := configr.GetClientPort() consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{}) - clientNode := node.New(consensusObj) + clientNode := node.New(consensusObj, false) initClient(clientNode, clientPort, &leaders, &nodes) diff --git a/client/txgen/main.go b/client/txgen/main.go index 74a85e00a..f7fdc8c80 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -276,7 +276,7 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} for _, shardId := range shardIds { - node := node.New(&consensus.Consensus{ShardID: shardId}) + node := node.New(&consensus.Consensus{ShardID: shardId}, false) // Assign many fake addresses so we have enough address to play with at first node.AddTestingAddresses(setting.numOfAddress) nodes = append(nodes, node) @@ -285,7 +285,7 @@ func main() { // Client/txgenerator server node setup clientPort := configr.GetClientPort() consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{}) - clientNode := node.New(consensusObj) + clientNode := node.New(consensusObj, false) if clientPort != "" { clientNode.Client = client.NewClient(&leaders) diff --git a/deploy.sh b/deploy.sh index 83032a680..3e2c69349 100755 --- a/deploy.sh +++ b/deploy.sh @@ -25,6 +25,8 @@ if [ -z "$config" ]; then usage fi +db_supported=$2 + # Kill nodes if any ./kill_node.sh @@ -48,7 +50,11 @@ while IFS='' read -r line || [[ -n "$line" ]]; do IFS=' ' read ip port mode shardId <<< $line #echo $ip $port $mode if [ "$mode" != "client" ]; then - ./bin/benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder& + if [ -z "$db_supported" ]; then + ./bin/benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder& + else + ./bin/benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder -db_supported 1& + fi fi done < $config diff --git a/node/node.go b/node/node.go index 3554ee857..133fdfad9 100644 --- a/node/node.go +++ b/node/node.go @@ -9,6 +9,7 @@ import ( "github.com/simple-rules/harmony-benchmark/client" "github.com/simple-rules/harmony-benchmark/consensus" "github.com/simple-rules/harmony-benchmark/crypto/pki" + "github.com/simple-rules/harmony-benchmark/db" "github.com/simple-rules/harmony-benchmark/log" "github.com/simple-rules/harmony-benchmark/p2p" ) @@ -21,6 +22,7 @@ type Node struct { pendingTransactions []*blockchain.Transaction // All the transactions received but not yet processed for Consensus transactionInConsensus []*blockchain.Transaction // The transactions selected into the new block and under Consensus process blockchain *blockchain.Blockchain // The blockchain for the shard where this node belongs + DB *db.LDBDatabase // LevelDB to store blockchain. UtxoPool *blockchain.UTXOPool // The corresponding UTXO pool of the current blockchain CrossTxsInConsensus []*blockchain.CrossShardTxAndProof // The cross shard txs that is under consensus, the proof is not filled yet. CrossTxsToReturn []*blockchain.CrossShardTxAndProof // The cross shard txs and proof that needs to be sent back to the user client. @@ -95,7 +97,7 @@ func (node *Node) countNumTransactionsInBlockchain() int { } // Create a new Node -func New(consensus *consensus.Consensus) *Node { +func New(consensus *consensus.Consensus, dbSupported bool) *Node { node := Node{} // Consensus and associated channel to communicate blocks @@ -116,5 +118,16 @@ func New(consensus *consensus.Consensus) *Node { // Logger node.log = node.Consensus.Log + + // // Initialize leveldb + // if dbSupported { + // // TODO(minhdoan): Refactor this. + // var err = os.Remove("/tmp/harmony_db.dat") + // if err != nil { + // fmt.Println(err.Error()) + // os.Exit(1) + // } + // node.db, _ = db.NewLDBDatabase("/tmp/harmony_db.dat", 0, 0) + // } return &node } diff --git a/node/node_handler.go b/node/node_handler.go index e23365851..8427a5193 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -5,6 +5,7 @@ import ( "encoding/gob" "net" "os" + "strconv" "time" "github.com/simple-rules/harmony-benchmark/blockchain" @@ -278,6 +279,11 @@ func (node *Node) PostConsensusProcessing(newBlock *blockchain.Block) { func (node *Node) AddNewBlock(newBlock *blockchain.Block) { // Add it to blockchain node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock) + // Store it into leveldb. + if node.DB != nil { + node.log.Info("Writing new block into disk.") + newBlock.Write(node.DB, strconv.Itoa(len(node.blockchain.Blocks))) + } // Update UTXO pool node.UtxoPool.Update(newBlock.Transactions) // Clear transaction-in-Consensus list diff --git a/node/node_test.go b/node/node_test.go index 918e5e840..b45fbab72 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -12,7 +12,7 @@ func TestNewNewNode(test *testing.T) { validator := p2p.Peer{Ip: "3", Port: "5"} consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus) + node := New(consensus, false) if node.Consensus == nil { test.Error("Consensus is not initialized for the node") } @@ -39,7 +39,7 @@ func TestCountNumTransactionsInBlockchain(test *testing.T) { validator := p2p.Peer{Ip: "3", Port: "5"} consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus) + node := New(consensus, false) node.AddTestingAddresses(1000) if node.countNumTransactionsInBlockchain() != 1001 { test.Error("Count of transactions in the blockchain is incorrect") From 6cd04086f6105ae796e56c05b1ae41d7be98d6fe Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Tue, 21 Aug 2018 16:44:01 -0700 Subject: [PATCH 3/8] refactor db init --- benchmark.go | 13 ++++++++----- client/btctxgen/main.go | 4 ++-- client/txgen/main.go | 4 ++-- node/node.go | 17 +++++------------ node/node_handler.go | 4 ++-- node/node_test.go | 4 ++-- 6 files changed, 21 insertions(+), 25 deletions(-) diff --git a/benchmark.go b/benchmark.go index 21875d8ef..e30278e6a 100644 --- a/benchmark.go +++ b/benchmark.go @@ -86,6 +86,13 @@ func main() { ) log.Root().SetHandler(h) + // Initialize leveldb if dbSupported. + var ldb *db.LDBDatabase = nil + + if *dbSupported == 1 { + ldb, _ = InitLDBDatabase(*ip, *port) + } + // Consensus object. consensus := consensus.NewConsensus(*ip, *port, shardID, peers, leader) @@ -97,11 +104,7 @@ func main() { // Set logger to attack model. attack.GetInstance().SetLogger(consensus.Log) // Current node. - currentNode := node.New(consensus, true) - // Initialize leveldb if dbSupported. - if *dbSupported == 1 { - currentNode.DB, _ = InitLDBDatabase(*ip, *port) - } + currentNode := node.New(consensus, ldb) // Create client peer. clientPeer := distributionConfig.GetClientPeer() // If there is a client configured in the node list. diff --git a/client/btctxgen/main.go b/client/btctxgen/main.go index eaa51ae35..716848028 100644 --- a/client/btctxgen/main.go +++ b/client/btctxgen/main.go @@ -200,13 +200,13 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} for _, shardID := range shardIDs { - nodes = append(nodes, node.New(&consensus.Consensus{ShardID: shardID}, false)) + nodes = append(nodes, node.New(&consensus.Consensus{ShardID: shardID}, nil)) } // Client/txgenerator server node setup clientPort := configr.GetClientPort() consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{}) - clientNode := node.New(consensusObj, false) + clientNode := node.New(consensusObj, nil) initClient(clientNode, clientPort, &leaders, &nodes) diff --git a/client/txgen/main.go b/client/txgen/main.go index f7fdc8c80..719142c9c 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -276,7 +276,7 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} for _, shardId := range shardIds { - node := node.New(&consensus.Consensus{ShardID: shardId}, false) + node := node.New(&consensus.Consensus{ShardID: shardId}, nil) // Assign many fake addresses so we have enough address to play with at first node.AddTestingAddresses(setting.numOfAddress) nodes = append(nodes, node) @@ -285,7 +285,7 @@ func main() { // Client/txgenerator server node setup clientPort := configr.GetClientPort() consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{}) - clientNode := node.New(consensusObj, false) + clientNode := node.New(consensusObj, nil) if clientPort != "" { clientNode.Client = client.NewClient(&leaders) diff --git a/node/node.go b/node/node.go index 133fdfad9..acc19f587 100644 --- a/node/node.go +++ b/node/node.go @@ -22,7 +22,7 @@ type Node struct { pendingTransactions []*blockchain.Transaction // All the transactions received but not yet processed for Consensus transactionInConsensus []*blockchain.Transaction // The transactions selected into the new block and under Consensus process blockchain *blockchain.Blockchain // The blockchain for the shard where this node belongs - DB *db.LDBDatabase // LevelDB to store blockchain. + db *db.LDBDatabase // LevelDB to store blockchain. UtxoPool *blockchain.UTXOPool // The corresponding UTXO pool of the current blockchain CrossTxsInConsensus []*blockchain.CrossShardTxAndProof // The cross shard txs that is under consensus, the proof is not filled yet. CrossTxsToReturn []*blockchain.CrossShardTxAndProof // The cross shard txs and proof that needs to be sent back to the user client. @@ -97,7 +97,7 @@ func (node *Node) countNumTransactionsInBlockchain() int { } // Create a new Node -func New(consensus *consensus.Consensus, dbSupported bool) *Node { +func New(consensus *consensus.Consensus, db *db.LDBDatabase) *Node { node := Node{} // Consensus and associated channel to communicate blocks @@ -119,15 +119,8 @@ func New(consensus *consensus.Consensus, dbSupported bool) *Node { // Logger node.log = node.Consensus.Log - // // Initialize leveldb - // if dbSupported { - // // TODO(minhdoan): Refactor this. - // var err = os.Remove("/tmp/harmony_db.dat") - // if err != nil { - // fmt.Println(err.Error()) - // os.Exit(1) - // } - // node.db, _ = db.NewLDBDatabase("/tmp/harmony_db.dat", 0, 0) - // } + // Initialize level db. + node.db = db + return &node } diff --git a/node/node_handler.go b/node/node_handler.go index 8427a5193..3ffffee82 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -280,9 +280,9 @@ func (node *Node) AddNewBlock(newBlock *blockchain.Block) { // Add it to blockchain node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock) // Store it into leveldb. - if node.DB != nil { + if node.db != nil { node.log.Info("Writing new block into disk.") - newBlock.Write(node.DB, strconv.Itoa(len(node.blockchain.Blocks))) + newBlock.Write(node.db, strconv.Itoa(len(node.blockchain.Blocks))) } // Update UTXO pool node.UtxoPool.Update(newBlock.Transactions) diff --git a/node/node_test.go b/node/node_test.go index b45fbab72..ad18b0f0e 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -12,7 +12,7 @@ func TestNewNewNode(test *testing.T) { validator := p2p.Peer{Ip: "3", Port: "5"} consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus, false) + node := New(consensus, nil) if node.Consensus == nil { test.Error("Consensus is not initialized for the node") } @@ -39,7 +39,7 @@ func TestCountNumTransactionsInBlockchain(test *testing.T) { validator := p2p.Peer{Ip: "3", Port: "5"} consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus, false) + node := New(consensus, nil) node.AddTestingAddresses(1000) if node.countNumTransactionsInBlockchain() != 1001 { test.Error("Count of transactions in the blockchain is incorrect") From 443ed41fcd54dd0535160aabfdc23ce01e961aac Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Tue, 21 Aug 2018 17:27:41 -0700 Subject: [PATCH 4/8] rename filenames --- waitnode/{waitNode.go => wait_node.go} | 0 waitnode/{waitNode_test.go => wait_node_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename waitnode/{waitNode.go => wait_node.go} (100%) rename waitnode/{waitNode_test.go => wait_node_test.go} (100%) diff --git a/waitnode/waitNode.go b/waitnode/wait_node.go similarity index 100% rename from waitnode/waitNode.go rename to waitnode/wait_node.go diff --git a/waitnode/waitNode_test.go b/waitnode/wait_node_test.go similarity index 100% rename from waitnode/waitNode_test.go rename to waitnode/wait_node_test.go From 10624beaf21ef5243e5a1bdce3af0a02d6852152 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Tue, 21 Aug 2018 17:54:55 -0700 Subject: [PATCH 5/8] add merkel tree data structure --- blockchain/merkle_tree.go | 65 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 blockchain/merkle_tree.go diff --git a/blockchain/merkle_tree.go b/blockchain/merkle_tree.go new file mode 100644 index 000000000..c4ffa4c9d --- /dev/null +++ b/blockchain/merkle_tree.go @@ -0,0 +1,65 @@ +package blockchain + +import ( + "crypto/sha256" +) + +// MerkleTree represent a Merkle tree +type MerkleTree struct { + RootNode *MerkleNode +} + +// MerkleNode represent a Merkle tree node +type MerkleNode struct { + Left *MerkleNode + Right *MerkleNode + Data []byte +} + +// NewMerkleTree creates a new Merkle tree from a sequence of data +func NewMerkleTree(data [][]byte) *MerkleTree { + var nodes []MerkleNode + + if len(data)%2 != 0 { + data = append(data, data[len(data)-1]) + } + + for _, datum := range data { + node := NewMerkleNode(nil, nil, datum) + nodes = append(nodes, *node) + } + + for i := 0; i < len(data)/2; i++ { + var newLevel []MerkleNode + + for j := 0; j < len(nodes); j += 2 { + node := NewMerkleNode(&nodes[j], &nodes[j+1], nil) + newLevel = append(newLevel, *node) + } + + nodes = newLevel + } + + mTree := MerkleTree{&nodes[0]} + + return &mTree +} + +// NewMerkleNode creates a new Merkle tree node +func NewMerkleNode(left, right *MerkleNode, data []byte) *MerkleNode { + mNode := MerkleNode{} + + if left == nil && right == nil { + hash := sha256.Sum256(data) + mNode.Data = hash[:] + } else { + prevHashes := append(left.Data, right.Data...) + hash := sha256.Sum256(prevHashes) + mNode.Data = hash[:] + } + + mNode.Left = left + mNode.Right = right + + return &mNode +} From ab091e988114d405f2846930e13e39a1e75e9556 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 21 Aug 2018 17:57:02 -0700 Subject: [PATCH 6/8] Add final commit and final challenge messaging process and temporarily disable block wrap-up process --- consensus/consensus.go | 28 ++++-- consensus/consensus_leader.go | 133 ++++++++++++++----------- consensus/consensus_leader_msg.go | 41 +++++--- consensus/consensus_leader_msg_test.go | 6 +- consensus/consensus_validator.go | 78 ++++++++------- 5 files changed, 169 insertions(+), 117 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index 832f48a9c..f3a83f708 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -20,14 +20,17 @@ import ( type Consensus struct { state ConsensusState // Commits collected from validators. A map from node Id to its commitment - commitments map[uint16]kyber.Point - aggregatedCommitment kyber.Point + commitments *map[uint16]kyber.Point + finalCommitments *map[uint16]kyber.Point + aggregatedCommitment kyber.Point + aggregatedFinalCommitment kyber.Point + bitmap *crypto.Mask + finalBitmap *crypto.Mask // Challenges - challenge [32]byte + challenge [32]byte + finalChallenge [32]byte - // Commits collected from validators. - bitmap *crypto.Mask // Responses collected from validators responses map[uint16]kyber.Scalar // map of nodeId to validator Peer object @@ -97,7 +100,8 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) * consensus.IsLeader = false } - consensus.commitments = make(map[uint16]kyber.Point) + consensus.commitments = &map[uint16]kyber.Point{} + consensus.finalCommitments = &map[uint16]kyber.Point{} consensus.validators = make(map[uint16]p2p.Peer) consensus.responses = make(map[uint16]kyber.Scalar) @@ -114,10 +118,15 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) * allPublicKeys = append(allPublicKeys, leader.PubKey) mask, err := crypto.NewMask(crypto.Ed25519Curve, allPublicKeys, consensus.leader.PubKey) if err != nil { - panic("Failed to create commitment mask") + panic("Failed to create mask") + } + finalMask, err := crypto.NewMask(crypto.Ed25519Curve, allPublicKeys, consensus.leader.PubKey) + if err != nil { + panic("Failed to create final mask") } consensus.publicKeys = allPublicKeys consensus.bitmap = mask + consensus.finalBitmap = finalMask // For now use socket address as 16 byte Id // TODO: populate with correct Id @@ -170,7 +179,10 @@ func (consensus *Consensus) getValidatorPeers() []p2p.Peer { // Reset the state of the consensus func (consensus *Consensus) ResetState() { consensus.state = FINISHED - consensus.commitments = make(map[uint16]kyber.Point) + consensus.commitments = &map[uint16]kyber.Point{} + consensus.bitmap.SetMask([]byte{}) + consensus.finalCommitments = &map[uint16]kyber.Point{} + consensus.finalBitmap.SetMask([]byte{}) consensus.responses = make(map[uint16]kyber.Scalar) consensus.secret = nil } diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 876038ecd..517edddcb 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -49,12 +49,14 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) { } switch msgType { + case proto_consensus.START_CONSENSUS: + consensus.processStartConsensusMessage(payload) case proto_consensus.COMMIT: - consensus.processCommitMessage(payload) + consensus.processCommitMessage(payload, CHALLENGE_DONE) case proto_consensus.RESPONSE: consensus.processResponseMessage(payload) - case proto_consensus.START_CONSENSUS: - consensus.processStartConsensusMessage(payload) + case proto_consensus.FINAL_COMMIT: + consensus.processCommitMessage(payload, FINAL_CHALLENGE_DONE) default: consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus) } @@ -82,16 +84,25 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { p2p.BroadcastMessage(consensus.getValidatorPeers(), msgToSend) // Set state to ANNOUNCE_DONE consensus.state = ANNOUNCE_DONE + consensus.commitByLeader(true) +} +// Leader commit to the message itself before receiving others commits +func (consensus *Consensus) commitByLeader(firstRound bool) { // Generate leader's own commitment secret, commitment := crypto.Commit(crypto.Ed25519Curve) consensus.secret = secret - consensus.commitments[consensus.nodeId] = commitment - consensus.bitmap.SetKey(consensus.pubKey, true) + if firstRound { + (*consensus.commitments)[consensus.nodeId] = commitment + consensus.bitmap.SetKey(consensus.pubKey, true) + } else { + (*consensus.finalCommitments)[consensus.nodeId] = commitment + consensus.finalBitmap.SetKey(consensus.pubKey, true) + } } // Processes the commit message sent from validators -func (consensus *Consensus) processCommitMessage(payload []byte) { +func (consensus *Consensus) processCommitMessage(payload []byte, targetState ConsensusState) { // Read payload data offset := 0 // 4 byte consensus id @@ -138,34 +149,43 @@ func (consensus *Consensus) processCommitMessage(payload []byte) { return } + commitments := consensus.commitments // targetState == CHALLENGE_DONE + bitmap := consensus.bitmap + if targetState == FINAL_CHALLENGE_DONE { + commitments = consensus.finalCommitments + bitmap = consensus.finalBitmap + } + // proceed only when the message is not received before - _, ok = consensus.commitments[validatorId] + _, ok = (*commitments)[validatorId] shouldProcess := !ok if shouldProcess { point := crypto.Ed25519Curve.Point() point.UnmarshalBinary(commitment) - consensus.commitments[validatorId] = point + (*commitments)[validatorId] = point // Set the bitmap indicate this validate signed. TODO: figure out how to resolve the inconsistency of validators from commit and response messages - consensus.bitmap.SetKey(value.PubKey, true) + bitmap.SetKey(value.PubKey, true) } if !shouldProcess { return } - if len(consensus.commitments) >= (2*len(consensus.publicKeys)/3)+1 && consensus.state < CHALLENGE_DONE { - consensus.Log.Debug("Enough commitments received with signatures", "num", len(consensus.commitments)) + if len((*commitments)) >= len(consensus.publicKeys) && consensus.state < targetState { + consensus.Log.Debug("Enough commitments received with signatures", "num", len((*commitments)), "state", consensus.state) // Broadcast challenge - msgToSend := consensus.constructChallengeMessage(proto_consensus.CHALLENGE) + msgTypeToSend := proto_consensus.CHALLENGE // targetState == CHALLENGE_DONE + if targetState == FINAL_CHALLENGE_DONE { + msgTypeToSend = proto_consensus.FINAL_CHALLENGE + } + msgToSend, challengeScalar := consensus.constructChallengeMessage(msgTypeToSend) // Add leader's response - challengeScalar := crypto.Ed25519Curve.Scalar() - challengeScalar.UnmarshalBinary(consensus.challenge[:]) response, err := crypto.Response(crypto.Ed25519Curve, consensus.priKey, consensus.secret, challengeScalar) if err == nil { consensus.responses[consensus.nodeId] = response - consensus.bitmap.SetKey(consensus.pubKey, true) + bitmap.SetKey(consensus.pubKey, true) } else { log.Warn("Failed to generate response", "err", err) } @@ -173,8 +193,8 @@ func (consensus *Consensus) processCommitMessage(payload []byte) { // Broadcast challenge message p2p.BroadcastMessage(consensus.getValidatorPeers(), msgToSend) - // Set state to CHALLENGE_DONE - consensus.state = CHALLENGE_DONE + // Set state to targetState (CHALLENGE_DONE or FINAL_CHALLENGE_DONE) + consensus.state = targetState } } @@ -252,9 +272,9 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { } //consensus.Log.Debug("RECEIVED RESPONSE", "consensusId", consensusId) - if len(consensus.responses) >= (2*len(consensus.publicKeys)/3)+1 && consensus.state != FINISHED { + if len(consensus.responses) >= len(consensus.publicKeys) && consensus.state != FINISHED { consensus.mutex.Lock() - if len(consensus.responses) >= (2*len(consensus.publicKeys)/3)+1 && consensus.state != FINISHED { + if len(consensus.responses) >= len(consensus.publicKeys) && consensus.state != FINISHED { consensus.Log.Debug("Enough responses received with signatures", "num", len(consensus.responses)) // Aggregate responses responses := []kyber.Scalar{} @@ -286,42 +306,43 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { // Set state to CHALLENGE_DONE consensus.state = COLLECTIVE_SIG_DONE - - consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(consensus.responses)) - // Reset state to FINISHED, and clear other data. - consensus.ResetState() - consensus.consensusId++ - consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "consensusId", consensus.consensusId) - - // TODO: reconstruct the whole block from header and transactions - // For now, we used the stored whole block already stored in consensus.blockHeader - txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader)) - var blockHeaderObj blockchain.Block - err = txDecoder.Decode(&blockHeaderObj) - if err != nil { - consensus.Log.Debug("failed to construct the new block after consensus") - } - - // Sign the block - // TODO(RJ): populate bitmap - copy(blockHeaderObj.Signature[:], collectiveSig[:]) - copy(blockHeaderObj.Bitmap[:], bitmap) - consensus.OnConsensusDone(&blockHeaderObj) - - // TODO: @ricl these logic are irrelevant to consensus, move them to another file, say profiler. - endTime := time.Now() - timeElapsed := endTime.Sub(startTime) - numOfTxs := blockHeaderObj.NumTransactions - consensus.Log.Info("TPS Report", - "numOfTXs", numOfTxs, - "startTime", startTime, - "endTime", endTime, - "timeElapsed", timeElapsed, - "TPS", float64(numOfTxs)/timeElapsed.Seconds(), - "consensus", consensus) - - // Send signal to Node so the new block can be added and new round of consensus can be triggered - consensus.ReadySignal <- 1 + consensus.commitByLeader(false) + + //consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(consensus.responses)) + //// Reset state to FINISHED, and clear other data. + //consensus.ResetState() + //consensus.consensusId++ + //consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "consensusId", consensus.consensusId) + // + //// TODO: reconstruct the whole block from header and transactions + //// For now, we used the stored whole block already stored in consensus.blockHeader + //txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader)) + //var blockHeaderObj blockchain.Block + //err = txDecoder.Decode(&blockHeaderObj) + //if err != nil { + // consensus.Log.Debug("failed to construct the new block after consensus") + //} + // + //// Sign the block + //// TODO(RJ): populate bitmap + //copy(blockHeaderObj.Signature[:], collectiveSig[:]) + //copy(blockHeaderObj.Bitmap[:], bitmap) + //consensus.OnConsensusDone(&blockHeaderObj) + // + //// TODO: @ricl these logic are irrelevant to consensus, move them to another file, say profiler. + //endTime := time.Now() + //timeElapsed := endTime.Sub(startTime) + //numOfTxs := blockHeaderObj.NumTransactions + //consensus.Log.Info("TPS Report", + // "numOfTXs", numOfTxs, + // "startTime", startTime, + // "endTime", endTime, + // "timeElapsed", timeElapsed, + // "TPS", float64(numOfTxs)/timeElapsed.Seconds(), + // "consensus", consensus) + // + //// Send signal to Node so the new block can be added and new round of consensus can be triggered + //consensus.ReadySignal <- 1 } consensus.mutex.Unlock() } @@ -331,7 +352,7 @@ func (consensus *Consensus) verifyResponse(response kyber.Scalar, validatorId ui if response.Equal(crypto.Ed25519Curve.Scalar()) { return errors.New("response is zero valued") } - _, ok := consensus.commitments[validatorId] + _, ok := (*consensus.commitments)[validatorId] if !ok { return errors.New("no commit is received for the validator") } diff --git a/consensus/consensus_leader_msg.go b/consensus/consensus_leader_msg.go index f71f74880..0feb06627 100644 --- a/consensus/consensus_leader_msg.go +++ b/consensus/consensus_leader_msg.go @@ -37,7 +37,7 @@ func (consensus *Consensus) constructAnnounceMessage() []byte { } // Construct the challenge message -func (consensus *Consensus) constructChallengeMessage(msgType proto_consensus.MessageType) []byte { +func (consensus *Consensus) constructChallengeMessage(msgTypeToSend proto_consensus.MessageType) ([]byte, kyber.Scalar) { buffer := bytes.NewBuffer([]byte{}) // 4 byte consensus id @@ -53,28 +53,45 @@ func (consensus *Consensus) constructChallengeMessage(msgType proto_consensus.Me binary.BigEndian.PutUint16(twoBytes, consensus.nodeId) buffer.Write(twoBytes) + commitmentsMap := consensus.commitments // msgType == CHALLENGE + bitmap := consensus.bitmap + if msgTypeToSend == proto_consensus.FINAL_CHALLENGE { + commitmentsMap = consensus.finalCommitments + bitmap = consensus.finalBitmap + } + // 33 byte aggregated commit commitments := make([]kyber.Point, 0) - for _, val := range consensus.commitments { + for _, val := range *commitmentsMap { commitments = append(commitments, val) } aggCommitment, aggCommitmentBytes := getAggregatedCommit(commitments) buffer.Write(aggCommitmentBytes) // 33 byte aggregated key - buffer.Write(getAggregatedKey(consensus.bitmap)) + buffer.Write(getAggregatedKey(bitmap)) // 32 byte challenge - challenge := getChallenge(aggCommitment, consensus.bitmap.AggregatePublic, buffer.Bytes()[:36]) - buffer.Write(challenge) // message contains consensus id and block hash for now. - copy(consensus.challenge[:], challenge) - consensus.aggregatedCommitment = aggCommitment + challengeScalar := getChallenge(aggCommitment, bitmap.AggregatePublic, buffer.Bytes()[:36]) + bytes, err := challengeScalar.MarshalBinary() + if err != nil { + log.Error("Failed to serialize challenge") + } + buffer.Write(bytes) + + if msgTypeToSend == proto_consensus.CHALLENGE { + copy(consensus.challenge[:], bytes) + consensus.aggregatedCommitment = aggCommitment + } else if msgTypeToSend == proto_consensus.FINAL_CHALLENGE { + copy(consensus.finalChallenge[:], bytes) + consensus.aggregatedFinalCommitment = aggCommitment + } // 64 byte of signature on previous data signature := consensus.signMessage(buffer.Bytes()) buffer.Write(signature) - return proto_consensus.ConstructConsensusMessage(msgType, buffer.Bytes()) + return proto_consensus.ConstructConsensusMessage(msgTypeToSend, buffer.Bytes()), challengeScalar } // Construct the collective signature message @@ -124,14 +141,10 @@ func getAggregatedKey(bitmap *crypto.Mask) []byte { return append(bytes[:], byte(0)) } -func getChallenge(aggCommitment, aggKey kyber.Point, message []byte) []byte { +func getChallenge(aggCommitment, aggKey kyber.Point, message []byte) kyber.Scalar { challenge, err := crypto.Challenge(crypto.Ed25519Curve, aggCommitment, aggKey, message) if err != nil { log.Error("Failed to generate challenge") } - bytes, err := challenge.MarshalBinary() - if err != nil { - log.Error("Failed to serialize challenge") - } - return bytes + return challenge } diff --git a/consensus/consensus_leader_msg_test.go b/consensus/consensus_leader_msg_test.go index 263422684..8f74a1048 100644 --- a/consensus/consensus_leader_msg_test.go +++ b/consensus/consensus_leader_msg_test.go @@ -37,12 +37,12 @@ func TestConstructChallengeMessage(test *testing.T) { consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} - consensus.commitments[0] = leaderPubKey - consensus.commitments[1] = validatorPubKey + (*consensus.commitments)[0] = leaderPubKey + (*consensus.commitments)[1] = validatorPubKey consensus.bitmap.SetKey(leaderPubKey, true) consensus.bitmap.SetKey(validatorPubKey, true) - msg := consensus.constructChallengeMessage(consensus_proto.CHALLENGE) + msg, _ := consensus.constructChallengeMessage(consensus_proto.CHALLENGE) if len(msg) != 1+1+1+4+32+2+33+33+32+64 { test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg)) diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 7ca661b1c..ff32045da 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -31,7 +31,9 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) { case proto_consensus.ANNOUNCE: consensus.processAnnounceMessage(payload) case proto_consensus.CHALLENGE: - consensus.processChallengeMessage(payload) + consensus.processChallengeMessage(payload, RESPONSE_DONE) + case proto_consensus.FINAL_CHALLENGE: + consensus.processChallengeMessage(payload, FINAL_RESPONSE_DONE) case proto_consensus.COLLECTIVE_SIG: consensus.processCollectiveSigMessage(payload) default: @@ -129,7 +131,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { } // Processes the challenge message sent from the leader -func (consensus *Consensus) processChallengeMessage(payload []byte) { +func (consensus *Consensus) processChallengeMessage(payload []byte, targetState ConsensusState) { //#### Read payload data offset := 0 // 4 byte consensus id @@ -235,46 +237,50 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) { log.Warn("Failed to generate response", "err", err) return } - msgToSend := consensus.constructResponseMessage(proto_consensus.RESPONSE, response) + msgTypeToSend := proto_consensus.RESPONSE + if targetState == FINAL_RESPONSE_DONE { + msgTypeToSend = proto_consensus.FINAL_RESPONSE + } + msgToSend := consensus.constructResponseMessage(msgTypeToSend, response) p2p.SendMessage(consensus.leader, msgToSend) - // Set state to RESPONSE_DONE - consensus.state = RESPONSE_DONE + // Set state to target state (RESPONSE_DONE, FINAL_RESPONSE_DONE) + consensus.state = targetState // BIG TODO: the block catch up logic is basically a mock now. More checks need to be done to make it correct. // The logic is to roll up to the latest blocks one by one to try catching up with the leader. - for { - val, ok := consensus.blocksReceived[consensus.consensusId] - if ok { - delete(consensus.blocksReceived, consensus.consensusId) - - consensus.blockHash = [32]byte{} - consensus.consensusId++ // roll up one by one, until the next block is not received yet. - - // TODO: think about when validators know about the consensus is reached. - // For now, the blockchain is updated right here. - - // TODO: reconstruct the whole block from header and transactions - // For now, we used the stored whole block in consensus.blockHeader - txDecoder := gob.NewDecoder(bytes.NewReader(val.blockHeader)) - var blockHeaderObj blockchain.Block - err := txDecoder.Decode(&blockHeaderObj) - if err != nil { - consensus.Log.Debug("failed to construct the new block after consensus") - } - // check block data (transactions - if !consensus.BlockVerifier(&blockHeaderObj) { - consensus.Log.Debug("[WARNING] Block content is not verified successfully", "consensusId", consensus.consensusId) - consensus.mutex.Unlock() - return - } - consensus.OnConsensusDone(&blockHeaderObj) - } else { - break - } - - } + //for { + // val, ok := consensus.blocksReceived[consensus.consensusId] + // if ok { + // delete(consensus.blocksReceived, consensus.consensusId) + // + // consensus.blockHash = [32]byte{} + // consensus.consensusId++ // roll up one by one, until the next block is not received yet. + // + // // TODO: think about when validators know about the consensus is reached. + // // For now, the blockchain is updated right here. + // + // // TODO: reconstruct the whole block from header and transactions + // // For now, we used the stored whole block in consensus.blockHeader + // txDecoder := gob.NewDecoder(bytes.NewReader(val.blockHeader)) + // var blockHeaderObj blockchain.Block + // err := txDecoder.Decode(&blockHeaderObj) + // if err != nil { + // consensus.Log.Debug("failed to construct the new block after consensus") + // } + // // check block data (transactions + // if !consensus.BlockVerifier(&blockHeaderObj) { + // consensus.Log.Debug("[WARNING] Block content is not verified successfully", "consensusId", consensus.consensusId) + // consensus.mutex.Unlock() + // return + // } + // consensus.OnConsensusDone(&blockHeaderObj) + // } else { + // break + // } + // + //} consensus.mutex.Unlock() } From f2aba09438d67060057ddf9aa288aa98142ec14c Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Tue, 21 Aug 2018 18:04:41 -0700 Subject: [PATCH 7/8] add test for merkle tree --- blockchain/merkle_tree_test.go | 60 ++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 blockchain/merkle_tree_test.go diff --git a/blockchain/merkle_tree_test.go b/blockchain/merkle_tree_test.go new file mode 100644 index 000000000..dd443e5cd --- /dev/null +++ b/blockchain/merkle_tree_test.go @@ -0,0 +1,60 @@ +package blockchain + +import ( + "encoding/hex" + "fmt" + "testing" +) + +func TestNewMerkleNode(t *testing.T) { + data := [][]byte{ + []byte("node1"), + []byte("node2"), + []byte("node3"), + } + + // Level 1 + + n1 := NewMerkleNode(nil, nil, data[0]) + n2 := NewMerkleNode(nil, nil, data[1]) + n3 := NewMerkleNode(nil, nil, data[2]) + n4 := NewMerkleNode(nil, nil, data[2]) + + // Level 2 + n5 := NewMerkleNode(n1, n2, nil) + n6 := NewMerkleNode(n3, n4, nil) + + // Level 3 + n7 := NewMerkleNode(n5, n6, nil) + + if hex.EncodeToString(n7.Data) != "4e3e44e55926330ab6c31892f980f8bfd1a6e910ff1ebc3f778211377f35227e" { + t.Errorf("merkle tree is not built correctly.") + } +} + +func TestNewMerkleTree(t *testing.T) { + data := [][]byte{ + []byte("node1"), + []byte("node2"), + []byte("node3"), + } + // Level 1 + n1 := NewMerkleNode(nil, nil, data[0]) + n2 := NewMerkleNode(nil, nil, data[1]) + n3 := NewMerkleNode(nil, nil, data[2]) + n4 := NewMerkleNode(nil, nil, data[2]) + + // Level 2 + n5 := NewMerkleNode(n1, n2, nil) + n6 := NewMerkleNode(n3, n4, nil) + + // Level 3 + n7 := NewMerkleNode(n5, n6, nil) + + rootHash := fmt.Sprintf("%x", n7.Data) + mTree := NewMerkleTree(data) + + if rootHash != fmt.Sprintf("%x", mTree.RootNode.Data) { + t.Errorf("Merkle tree root hash is incorrect") + } +} From 6685aaac5a272532b066d89dd314d1ac9c5df163 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Tue, 21 Aug 2018 20:13:36 -0700 Subject: [PATCH 8/8] update readme --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 163525655..d4e3a48d2 100644 --- a/README.md +++ b/README.md @@ -24,10 +24,17 @@ cd harmony-benchmark go get ./... ``` ## Usage + +### Running local test without db ``` ./deploy.sh local_config.txt ``` +### Running local test with db +``` +./deploy.sh local_config.txt 1 +``` + ## Testing Make sure you the following command and make sure everything passed before submitting your code.