From 271106289e64aa3132ba13f4c645f67c5e50d3c7 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sun, 17 Jun 2018 17:01:10 -0700 Subject: [PATCH 01/13] fix comments --- consensus/consensus_leader.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 10472166d..85f2127f7 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -14,6 +14,7 @@ import ( var mutex = &sync.Mutex{} +// WaitForNewBlock waits for a new block. func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) { for { // keep waiting for new blocks newBlock := <-blockChannel @@ -24,7 +25,7 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) } } -// Leader's consensus message dispatcher +// ProcessMessageLeader is the leader's consensus message dispatcher func (consensus *Consensus) ProcessMessageLeader(message []byte) { msgType, err := GetConsensusMessageType(message) if err != nil { From 0498d8705933878e86ff3050a2b53d52f4370b03 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 11:24:50 -0700 Subject: [PATCH 02/13] Add consensus view Id and checks --- consensus/consensus_leader.go | 1 + consensus/consensus_validator.go | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 85f2127f7..59c93f21f 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -289,6 +289,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { // TODO: do followups on the consensus log.Printf("HOORAY!!! CONSENSUS REACHED AMONG %d NODES!!!\n", len(consensus.validators)) consensus.ResetState() + consensus.consensusId++ consensus.ReadySignal <- 1 } // TODO: composes new block and broadcast the new block to validators diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 82e71196c..d8d0f1ad5 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -73,7 +73,10 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { consensus.blockHash = blockHash // verify block data - + if consensusId != consensus.consensusId { + log.Println("Received message with consensus Id: %s. My consensus Id: %s", consensusId, consensus.consensusId) + return + } // sign block // TODO: return the signature(commit) to leader @@ -160,6 +163,10 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) { _ = signature // verify block data and the aggregated signatures + if consensusId != consensus.consensusId { + log.Println("Received message with consensus Id: %s. My consensus Id: %s", consensusId, consensus.consensusId) + return + } // sign the message @@ -170,6 +177,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) { // Set state to RESPONSE_DONE consensus.state = RESPONSE_DONE + consensus.consensusId++ } // Construct the response message to send to leader (assumption the consensus data is already verified) From c66f7be673d9eea08efdfa8c771c8880ac5cdcbb Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 11:41:56 -0700 Subject: [PATCH 03/13] Fix log formating --- consensus/consensus_validator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index d8d0f1ad5..c34d343f3 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -74,7 +74,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { consensus.blockHash = blockHash // verify block data if consensusId != consensus.consensusId { - log.Println("Received message with consensus Id: %s. My consensus Id: %s", consensusId, consensus.consensusId) + log.Printf("Received message with consensus Id: %d. My consensus Id: %d\n", consensusId, consensus.consensusId) return } // sign block @@ -164,7 +164,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) { // verify block data and the aggregated signatures if consensusId != consensus.consensusId { - log.Println("Received message with consensus Id: %s. My consensus Id: %s", consensusId, consensus.consensusId) + log.Printf("Received message with consensus Id: %d. My consensus Id: %d\n", consensusId, consensus.consensusId) return } From ae693698eb4bf5fd48e7f3be956c075c4da28351 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 12:03:26 -0700 Subject: [PATCH 04/13] Add test for consensus.go --- consensus/consensus_test.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 consensus/consensus_test.go diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go new file mode 100644 index 000000000..232242cde --- /dev/null +++ b/consensus/consensus_test.go @@ -0,0 +1,35 @@ +package consensus + +import ( + "testing" + "harmony-benchmark/p2p" + "harmony-benchmark/message" +) +func TestNewConsensus(test *testing.T) { + leader := p2p.Peer{Ip: "1", Port:"2"} + validator := p2p.Peer{Ip: "3", Port:"5"} + consensus := NewConsensus("1", "2", []p2p.Peer{leader, validator}, leader) + if consensus.consensusId != 0 { + test.Errorf("Consensus Id is initialized to the wrong value: %d", consensus.consensusId) + } + + if consensus.IsLeader != true { + test.Error("Consensus should belong to a leader") + } + + if consensus.ReadySignal == nil { + test.Error("Consensus ReadySignal should be initialized") + } + + if consensus.actionType != byte(message.CONSENSUS) { + test.Error("Consensus actionType should be CONSENSUS") + } + + if consensus.msgCategory != byte(message.COMMITTEE) { + test.Error("Consensus msgCategory should be COMMITTEE") + } + + if consensus.leader != leader { + test.Error("Consensus Leader is set to wrong Peer") + } +} \ No newline at end of file From 81518201900d158d511efeb52151a1268f6d14c0 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 12:20:09 -0700 Subject: [PATCH 05/13] Move travis notification to travis-ci channel --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 42e529457..120c9a6a6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,4 +9,4 @@ install: | cd $HOME/gopath/src/harmony-benchmark notifications: slack: - secure: F6PRLJ7fIVpvZAEAIikE9u53CwklytznkNSJPuzB7cxLttAejls8ou3jsFFUkdTyyyy+QKFP9Bj+IFcAI9dq3CDYp7MnTQx/ajn8TC0xBwKW3gEKYoptIQJkXzvdwp9OanpU78QynRe1oRLefkN8qrL0zWLvoqcaHSYwjh1kzDCbO9G13aiI5CiFdW7jKwBKEPlleze4so0UdAaaGxyXUiSsmvHrOwF72ElnahKLy9DzmIgT8PdQkf1BmV3yaH3VQoU4fyu8t5P/jy8NYIXomcpo+pavgUBfXwd3mPf1KkAoB2RXdGCkORzHNZ6h+63ZVDzi1t8FWAbuNmvJ/Hq07IScGcsjZCEdy+79NSSBHThUejnHnqD4QdcXdhr30PoWBX85+phwPEVehAtddzgLyzivGByKHQMfWcetvPRww7Im+MpJW3OssgHlPxbRCE0uFtSDYgD9UoKevuZzFnME3U1rT9xqTweqOzxdsGRpvU/fLbh0PQCJnDQJjOB054l6ZTu9mtt+ogbYpOhB2PO6qLJZ64cn7M5/wLMiswRWooT9DeN+5a7n5MRo1i/TWFDFWBFYQVWTYdiTaU+oZdSZsu25TMboVraHPw3HE5tKIGgqOBLLd187HbE5ZeR62bzUnlN1wK1tphfgVa4ztmL+NY9nTgHMKki7LiNIL1p7M68= + secure: RPB3ThYIGuDUidvaWfOA7Hc9x1bDfd5+Y10r7xwY+NGCN3zW86s/GNLpLutI0MWTV9e2CJupHvz5clp8Ktle/tVjLhs6jHQnNV7U8PTWKkL5By6IFVAHN12unMQn/m0RPwqMfdubajXoV51XhbFA/iow/0fqwsd61VdPIuBrlQjy9z7kyVnRLNoGvYjDqKEkJfYVb3qFNFLzD0F7Y2AgxnezIRjsTLgHzR4owLJYqVMhvTYIV9/vSf1w4UUPzhHyZRESl6bri+a1+g7GxE32OtNwq68xxVeeJcrO/MbjAHHW9V6BW1MjJfYzD5T+7JHIfZOjV2WgzJ7uCkVYztfq+02yOCSWsLNxFVojIDhVFEhhJ6Vd2Zf1otolS7j0svK/qNmShID9q9NAasaI105GsQgtaSPAUGd88J/vyX2ndG1nDOvxmgOo10tZFOnPHW7JnWMybk3PLza8o1ujA7X3JFdvDA8BPP9h6MVP4N7doCQ/n4Crts53HvEWlvcv5sBNu61WYlSTBzf1qNwBKMyN2E0rNubsxKmW8B6jLdWYdlx57nyTRPraNKGE1fnUW5nWRZGax3F1tQRwEfpQMk22qgeUK0RYWsPgHFaPciKCA3dJX7t1k/ib9pyR4nc9SZnYw54KMhkAXPIVQ0iy0EpTAH1DNYV6v8zXCwjl+BdkhlY= From 4419aa15df550e987e3e84e5d4a8da7c59b6c97c Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 12:47:41 -0700 Subject: [PATCH 06/13] Add test for consensus leader/validator --- consensus/consensus_leader_test.go | 34 +++++++++++++++++++++++++++ consensus/consensus_validator_test.go | 30 +++++++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 consensus/consensus_leader_test.go create mode 100644 consensus/consensus_validator_test.go diff --git a/consensus/consensus_leader_test.go b/consensus/consensus_leader_test.go new file mode 100644 index 000000000..4233509ad --- /dev/null +++ b/consensus/consensus_leader_test.go @@ -0,0 +1,34 @@ +package consensus + +import ( + "testing" + "harmony-benchmark/p2p" +) + +func TestConstructAnnounceMessage(test *testing.T) { + header := getBlockHeader() + leader := p2p.Peer{Ip: "1", Port:"2"} + validator := p2p.Peer{Ip: "3", Port:"5"} + consensus := NewConsensus("1", "2", []p2p.Peer{leader, validator}, leader) + consensus.blockHash = getBlockHash(make([]byte, 10)) + msg, err := consensus.constructAnnounceMessage() + + if err != nil { + test.Error("Annouce message is not constructed successfully") + } + if len(msg) != 1 + 1 + 1 + 4 + 32 + 2 + 4 + 64 + len(header) { + test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg)) + } +} + +func TestConstructChallengeMessage(test *testing.T) { + leader := p2p.Peer{Ip: "1", Port:"2"} + validator := p2p.Peer{Ip: "3", Port:"5"} + consensus := NewConsensus("1", "2", []p2p.Peer{leader, validator}, leader) + consensus.blockHash = getBlockHash(make([]byte, 10)) + msg := consensus.constructChallengeMessage() + + if len(msg) != 1 + 1 + 1 + 4 + 32 + 2 + 33 + 33 + 32 + 64 { + test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg)) + } +} \ No newline at end of file diff --git a/consensus/consensus_validator_test.go b/consensus/consensus_validator_test.go new file mode 100644 index 000000000..3728145b5 --- /dev/null +++ b/consensus/consensus_validator_test.go @@ -0,0 +1,30 @@ +package consensus + +import ( + "testing" + "harmony-benchmark/p2p" +) + +func TestConstructCommitMessage(test *testing.T) { + leader := p2p.Peer{Ip: "1", Port:"2"} + validator := p2p.Peer{Ip: "3", Port:"5"} + consensus := NewConsensus("1", "2", []p2p.Peer{leader, validator}, leader) + consensus.blockHash = getBlockHash(make([]byte, 10)) + msg := consensus.constructCommitMessage() + + if len(msg) != 1 + 1 + 1 + 4 + 32 + 2 + 33 + 64 { + test.Errorf("Commit message is not constructed in the correct size: %d", len(msg)) + } +} + +func TestConstructResponseMessage(test *testing.T) { + leader := p2p.Peer{Ip: "1", Port:"2"} + validator := p2p.Peer{Ip: "3", Port:"5"} + consensus := NewConsensus("1", "2", []p2p.Peer{leader, validator}, leader) + consensus.blockHash = getBlockHash(make([]byte, 10)) + msg := consensus.constructResponseMessage() + + if len(msg) != 1 + 1 + 1 + 4 + 32 + 2 + 32 + 64 { + test.Errorf("Response message is not constructed in the correct size: %d", len(msg)) + } +} \ No newline at end of file From c7d9c6843ad2d7ce1f381ebc2261128c82be3d99 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 14:59:00 -0700 Subject: [PATCH 07/13] Add shardId into consensus and rename ipList file into config file --- README.md | 2 +- aws-code/transaction_generator.go | 8 +- aws-scripts/parse_json.py | 2 +- aws-scripts/run_instances.sh | 2 +- benchmark_main.go | 54 ++++++++++---- consensus/consensus.go | 9 ++- consensus/consensus_leader_test.go | 4 +- consensus/consensus_test.go | 2 +- consensus/consensus_validator_test.go | 4 +- deploy.sh | 10 +-- deploy_linux.sh | 10 +-- local_config.txt | 101 ++++++++++++++++++++++++++ local_config2.txt | 11 +++ local_iplist.txt | 101 -------------------------- local_iplist2.txt | 11 --- 15 files changed, 180 insertions(+), 151 deletions(-) create mode 100644 local_config.txt create mode 100644 local_config2.txt delete mode 100644 local_iplist.txt delete mode 100644 local_iplist2.txt diff --git a/README.md b/README.md index 4184cbb1d..4a0618ac0 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ git clone git@github.com:simple-rules/harmony-benchmark.git ## Usage ``` -./deploy.sh local_iplist.txt +./deploy.sh local_config.txt ./send_txn.sh ``` diff --git a/aws-code/transaction_generator.go b/aws-code/transaction_generator.go index 2cb8de967..9aea7a5f5 100644 --- a/aws-code/transaction_generator.go +++ b/aws-code/transaction_generator.go @@ -22,8 +22,8 @@ func newRandTransaction() blockchain.Transaction { return tx } -func getPeers(Ip, Port, iplist string) []p2p.Peer { - file, _ := os.Open(iplist) +func getPeers(Ip, Port, config string) []p2p.Peer { + file, _ := os.Open(config) fscanner := bufio.NewScanner(file) var peerList []p2p.Peer for fscanner.Scan() { @@ -41,7 +41,7 @@ func main() { ip := flag.String("ip", "127.0.0.1", "IP of the leader") port := flag.String("port", "9000", "port of the leader.") - ipfile := flag.String("ipfile", "local_iplist.txt", "file containing all ip addresses") + configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") //getLeader to get ip,port and get totaltime I want to run start := time.Now() totalTime := 60.0 @@ -64,6 +64,6 @@ func main() { var leaderPeer p2p.Peer leaderPeer.Ip = *ip leaderPeer.Port = *port - peers := append(getPeers(*ip, *port, *ipfile), leaderPeer) + peers := append(getPeers(*ip, *port, *configFile), leaderPeer) p2p.BroadcastMessage(peers, msg) } diff --git a/aws-scripts/parse_json.py b/aws-scripts/parse_json.py index 8e8eb643b..196b10fa5 100644 --- a/aws-scripts/parse_json.py +++ b/aws-scripts/parse_json.py @@ -9,7 +9,7 @@ def get_public_ip(all_reservations): all_public_ip_addresses.append(instance_information['PublicIpAddress']) return all_public_ip_addresses -def make_peers_list(all_reservations,port="9001",filename="ipList.txt"): +def make_peers_list(all_reservations,port="9001",filename="config.txt"): p = get_public_ip(all_reservations) f = open(filename,"w") for i in range(len(p)): diff --git a/aws-scripts/run_instances.sh b/aws-scripts/run_instances.sh index b326edd34..bd4b7f59f 100755 --- a/aws-scripts/run_instances.sh +++ b/aws-scripts/run_instances.sh @@ -1,3 +1,3 @@ #!/bin/bash -x cd /home/ec2-user/projects/src/harmony-benchmark -./deploy_linux.sh local_iplist2.txt \ No newline at end of file +./deploy_linux.sh local_config2.txt \ No newline at end of file diff --git a/benchmark_main.go b/benchmark_main.go index ba30661f0..2652697c5 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -11,14 +11,21 @@ import ( "strings" ) -func getLeader(iplist string) p2p.Peer { - file, _ := os.Open(iplist) - fscanner := bufio.NewScanner(file) +func getShardId(myIp, myPort string, config *[][]string) string { + for _, node := range *config { + ip, port, shardId := node[0], node[1], node[3] + if ip == myIp && port == myPort { + return shardId + } + } + return "N/A" +} + +func getLeader(myShardId string, config *[][]string) p2p.Peer { var leaderPeer p2p.Peer - for fscanner.Scan() { - p := strings.Split(fscanner.Text(), " ") - ip, port, status := p[0], p[1], p[2] - if status == "leader" { + for _, node := range *config { + ip, port, status, shardId := node[0], node[1], node[2], node[3] + if status == "leader" && myShardId == shardId{ leaderPeer.Ip = ip leaderPeer.Port = port } @@ -26,14 +33,11 @@ func getLeader(iplist string) p2p.Peer { return leaderPeer } -func getPeers(Ip, Port, iplist string) []p2p.Peer { - file, _ := os.Open(iplist) - fscanner := bufio.NewScanner(file) +func getPeers(myIp, myPort, myShardId string, config *[][]string) []p2p.Peer { var peerList []p2p.Peer - for fscanner.Scan() { - p := strings.Split(fscanner.Text(), " ") - ip, port, status := p[0], p[1], p[2] - if status == "leader" || ip == Ip && port == Port { + for _, node := range *config { + ip, port, status := node[0], node[1], node[2] + if status == "leader" || ip == myIp && port == myPort { continue } peer := p2p.Peer{Port: port, Ip: ip} @@ -42,13 +46,31 @@ func getPeers(Ip, Port, iplist string) []p2p.Peer { return peerList } +func readConfigFile(configFile string) [][]string{ + file, _ := os.Open(configFile) + fscanner := bufio.NewScanner(file) + + result := [][]string{} + for fscanner.Scan() { + p := strings.Split(fscanner.Text(), " ") + result = append(result, p) + } + return result +} + func main() { ip := flag.String("ip", "127.0.0.1", "IP of the node") port := flag.String("port", "9000", "port of the node.") - ipfile := flag.String("ipfile", "iplist.txt", "file containing all ip addresses") + configFile := flag.String("config_file", "config.txt", "file containing all ip addresses") flag.Parse() - consensus := consensus.NewConsensus(*ip, *port, getPeers(*ip, *port, *ipfile), getLeader(*ipfile)) + config := readConfigFile(*configFile) + shardId := getShardId(*ip, *port, &config) + peers := getPeers(*ip, *port, shardId, &config) + leader := getLeader(shardId, &config) + + consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader) + var nodeStatus string if consensus.IsLeader { nodeStatus = "leader" diff --git a/consensus/consensus.go b/consensus/consensus.go index a3ee43197..497048a02 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -34,6 +34,8 @@ type Consensus struct { blockHash []byte // BlockHeader to run consensus on blockHeader []byte + // Shard Id which this node belongs to + ShardId uint32 // Signal channel for starting a new consensus process ReadySignal chan int @@ -76,7 +78,7 @@ func (state ConsensusState) String() string { } // Create a new Consensus object -func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus { +func NewConsensus(ip, port, shardId string, peers []p2p.Peer, leader p2p.Peer) Consensus { // The first Ip, port passed will be leader. consensus := Consensus{} peer := p2p.Peer{Port: port, Ip: ip} @@ -99,6 +101,11 @@ func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus log.Fatal(err) } consensus.consensusId = 0 + myShardId, err := strconv.Atoi(shardId) + if err != nil { + panic("Unparseable shard Id" + shardId) + } + consensus.ShardId = uint32(myShardId) // For now use socket address as 16 byte Id // TODO: populate with correct Id diff --git a/consensus/consensus_leader_test.go b/consensus/consensus_leader_test.go index 4233509ad..9dee33b27 100644 --- a/consensus/consensus_leader_test.go +++ b/consensus/consensus_leader_test.go @@ -9,7 +9,7 @@ func TestConstructAnnounceMessage(test *testing.T) { header := getBlockHeader() leader := p2p.Peer{Ip: "1", Port:"2"} validator := p2p.Peer{Ip: "3", Port:"5"} - consensus := NewConsensus("1", "2", []p2p.Peer{leader, validator}, leader) + consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = getBlockHash(make([]byte, 10)) msg, err := consensus.constructAnnounceMessage() @@ -24,7 +24,7 @@ func TestConstructAnnounceMessage(test *testing.T) { func TestConstructChallengeMessage(test *testing.T) { leader := p2p.Peer{Ip: "1", Port:"2"} validator := p2p.Peer{Ip: "3", Port:"5"} - consensus := NewConsensus("1", "2", []p2p.Peer{leader, validator}, leader) + consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = getBlockHash(make([]byte, 10)) msg := consensus.constructChallengeMessage() diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 232242cde..72e68641f 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -8,7 +8,7 @@ import ( func TestNewConsensus(test *testing.T) { leader := p2p.Peer{Ip: "1", Port:"2"} validator := p2p.Peer{Ip: "3", Port:"5"} - consensus := NewConsensus("1", "2", []p2p.Peer{leader, validator}, leader) + consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) if consensus.consensusId != 0 { test.Errorf("Consensus Id is initialized to the wrong value: %d", consensus.consensusId) } diff --git a/consensus/consensus_validator_test.go b/consensus/consensus_validator_test.go index 3728145b5..0fc65d3d4 100644 --- a/consensus/consensus_validator_test.go +++ b/consensus/consensus_validator_test.go @@ -8,7 +8,7 @@ import ( func TestConstructCommitMessage(test *testing.T) { leader := p2p.Peer{Ip: "1", Port:"2"} validator := p2p.Peer{Ip: "3", Port:"5"} - consensus := NewConsensus("1", "2", []p2p.Peer{leader, validator}, leader) + consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = getBlockHash(make([]byte, 10)) msg := consensus.constructCommitMessage() @@ -20,7 +20,7 @@ func TestConstructCommitMessage(test *testing.T) { func TestConstructResponseMessage(test *testing.T) { leader := p2p.Peer{Ip: "1", Port:"2"} validator := p2p.Peer{Ip: "3", Port:"5"} - consensus := NewConsensus("1", "2", []p2p.Peer{leader, validator}, leader) + consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = getBlockHash(make([]byte, 10)) msg := consensus.constructResponseMessage() diff --git a/deploy.sh b/deploy.sh index ed2139748..71148722f 100755 --- a/deploy.sh +++ b/deploy.sh @@ -1,9 +1,9 @@ ./kill_node.sh -ipfile=$1 +config=$1 while IFS='' read -r line || [[ -n "$line" ]]; do IFS=' ' read ip port mode <<< $line - #echo $ip $port $mode $ipfile - go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile& -done < $ipfile + #echo $ip $port $mode $config + go run ./benchmark_main.go -ip $ip -port $port -config_file $config& +done < $config -go run ./aws-code/transaction_generator.go -ipfile $ipfile \ No newline at end of file +go run ./aws-code/transaction_generator.go -config_file $config \ No newline at end of file diff --git a/deploy_linux.sh b/deploy_linux.sh index 655a1cad3..d6841a92d 100755 --- a/deploy_linux.sh +++ b/deploy_linux.sh @@ -14,9 +14,9 @@ echo "Inside deploy linux" echo $GOPATH echo "Inside deploy linux line 2" -ipfile=$1 +config=$1 while read ip port mode; do - #echo $ip $port $mode $ipfile - go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile& -done < $ipfile -go run ./aws-code/transaction_generator.go -ipfile $ipfile + #echo $ip $port $mode $config + go run ./benchmark_main.go -ip $ip -port $port -config_file $config& +done < $config +go run ./aws-code/transaction_generator.go -config_file $config diff --git a/local_config.txt b/local_config.txt new file mode 100644 index 000000000..e17db776d --- /dev/null +++ b/local_config.txt @@ -0,0 +1,101 @@ +127.0.0.1 9001 validator 0 +127.0.0.1 9002 validator 0 +127.0.0.1 9003 validator 0 +127.0.0.1 9004 validator 0 +127.0.0.1 9005 validator 0 +127.0.0.1 9006 validator 0 +127.0.0.1 9007 validator 0 +127.0.0.1 9008 validator 0 +127.0.0.1 9009 validator 0 +127.0.0.1 9010 validator 0 +127.0.0.1 9011 validator 0 +127.0.0.1 9012 validator 0 +127.0.0.1 9013 validator 0 +127.0.0.1 9014 validator 0 +127.0.0.1 9015 validator 0 +127.0.0.1 9016 validator 0 +127.0.0.1 9017 validator 0 +127.0.0.1 9018 validator 0 +127.0.0.1 9019 validator 0 +127.0.0.1 9020 validator 0 +127.0.0.1 9021 validator 0 +127.0.0.1 9022 validator 0 +127.0.0.1 9023 validator 0 +127.0.0.1 9024 validator 0 +127.0.0.1 9025 validator 0 +127.0.0.1 9026 validator 0 +127.0.0.1 9027 validator 0 +127.0.0.1 9028 validator 0 +127.0.0.1 9029 validator 0 +127.0.0.1 9030 validator 0 +127.0.0.1 9031 validator 0 +127.0.0.1 9032 validator 0 +127.0.0.1 9033 validator 0 +127.0.0.1 9034 validator 0 +127.0.0.1 9035 validator 0 +127.0.0.1 9036 validator 0 +127.0.0.1 9037 validator 0 +127.0.0.1 9038 validator 0 +127.0.0.1 9039 validator 0 +127.0.0.1 9040 validator 0 +127.0.0.1 9041 validator 0 +127.0.0.1 9042 validator 0 +127.0.0.1 9043 validator 0 +127.0.0.1 9044 validator 0 +127.0.0.1 9045 validator 0 +127.0.0.1 9046 validator 0 +127.0.0.1 9047 validator 0 +127.0.0.1 9048 validator 0 +127.0.0.1 9049 validator 0 +127.0.0.1 9050 validator 0 +127.0.0.1 9051 validator 0 +127.0.0.1 9052 validator 0 +127.0.0.1 9053 validator 0 +127.0.0.1 9054 validator 0 +127.0.0.1 9055 validator 0 +127.0.0.1 9056 validator 0 +127.0.0.1 9057 validator 0 +127.0.0.1 9058 validator 0 +127.0.0.1 9059 validator 0 +127.0.0.1 9060 validator 0 +127.0.0.1 9061 validator 0 +127.0.0.1 9062 validator 0 +127.0.0.1 9063 validator 0 +127.0.0.1 9064 validator 0 +127.0.0.1 9065 validator 0 +127.0.0.1 9066 validator 0 +127.0.0.1 9067 validator 0 +127.0.0.1 9068 validator 0 +127.0.0.1 9069 validator 0 +127.0.0.1 9070 validator 0 +127.0.0.1 9071 validator 0 +127.0.0.1 9072 validator 0 +127.0.0.1 9073 validator 0 +127.0.0.1 9074 validator 0 +127.0.0.1 9075 validator 0 +127.0.0.1 9076 validator 0 +127.0.0.1 9077 validator 0 +127.0.0.1 9078 validator 0 +127.0.0.1 9079 validator 0 +127.0.0.1 9080 validator 0 +127.0.0.1 9081 validator 0 +127.0.0.1 9082 validator 0 +127.0.0.1 9083 validator 0 +127.0.0.1 9084 validator 0 +127.0.0.1 9085 validator 0 +127.0.0.1 9086 validator 0 +127.0.0.1 9087 validator 0 +127.0.0.1 9088 validator 0 +127.0.0.1 9089 validator 0 +127.0.0.1 9090 validator 0 +127.0.0.1 9091 validator 0 +127.0.0.1 9092 validator 0 +127.0.0.1 9093 validator 0 +127.0.0.1 9094 validator 0 +127.0.0.1 9095 validator 0 +127.0.0.1 9096 validator 0 +127.0.0.1 9097 validator 0 +127.0.0.1 9098 validator 0 +127.0.0.1 9099 validator 0 +127.0.0.1 9100 validator 0 +127.0.0.1 9000 leader 0 diff --git a/local_config2.txt b/local_config2.txt new file mode 100644 index 000000000..2cf8e2b7f --- /dev/null +++ b/local_config2.txt @@ -0,0 +1,11 @@ +127.0.0.1 9001 validator 0 +127.0.0.1 9002 validator 0 +127.0.0.1 9003 validator 0 +127.0.0.1 9004 validator 0 +127.0.0.1 9005 validator 0 +127.0.0.1 9006 validator 0 +127.0.0.1 9007 validator 0 +127.0.0.1 9008 validator 0 +127.0.0.1 9009 validator 0 +127.0.0.1 9010 validator 0 +127.0.0.1 9000 leader 0 diff --git a/local_iplist.txt b/local_iplist.txt deleted file mode 100644 index 9f5d026fe..000000000 --- a/local_iplist.txt +++ /dev/null @@ -1,101 +0,0 @@ -127.0.0.1 9001 validator -127.0.0.1 9002 validator -127.0.0.1 9003 validator -127.0.0.1 9004 validator -127.0.0.1 9005 validator -127.0.0.1 9006 validator -127.0.0.1 9007 validator -127.0.0.1 9008 validator -127.0.0.1 9009 validator -127.0.0.1 9010 validator -127.0.0.1 9011 validator -127.0.0.1 9012 validator -127.0.0.1 9013 validator -127.0.0.1 9014 validator -127.0.0.1 9015 validator -127.0.0.1 9016 validator -127.0.0.1 9017 validator -127.0.0.1 9018 validator -127.0.0.1 9019 validator -127.0.0.1 9020 validator -127.0.0.1 9021 validator -127.0.0.1 9022 validator -127.0.0.1 9023 validator -127.0.0.1 9024 validator -127.0.0.1 9025 validator -127.0.0.1 9026 validator -127.0.0.1 9027 validator -127.0.0.1 9028 validator -127.0.0.1 9029 validator -127.0.0.1 9030 validator -127.0.0.1 9031 validator -127.0.0.1 9032 validator -127.0.0.1 9033 validator -127.0.0.1 9034 validator -127.0.0.1 9035 validator -127.0.0.1 9036 validator -127.0.0.1 9037 validator -127.0.0.1 9038 validator -127.0.0.1 9039 validator -127.0.0.1 9040 validator -127.0.0.1 9041 validator -127.0.0.1 9042 validator -127.0.0.1 9043 validator -127.0.0.1 9044 validator -127.0.0.1 9045 validator -127.0.0.1 9046 validator -127.0.0.1 9047 validator -127.0.0.1 9048 validator -127.0.0.1 9049 validator -127.0.0.1 9050 validator -127.0.0.1 9051 validator -127.0.0.1 9052 validator -127.0.0.1 9053 validator -127.0.0.1 9054 validator -127.0.0.1 9055 validator -127.0.0.1 9056 validator -127.0.0.1 9057 validator -127.0.0.1 9058 validator -127.0.0.1 9059 validator -127.0.0.1 9060 validator -127.0.0.1 9061 validator -127.0.0.1 9062 validator -127.0.0.1 9063 validator -127.0.0.1 9064 validator -127.0.0.1 9065 validator -127.0.0.1 9066 validator -127.0.0.1 9067 validator -127.0.0.1 9068 validator -127.0.0.1 9069 validator -127.0.0.1 9070 validator -127.0.0.1 9071 validator -127.0.0.1 9072 validator -127.0.0.1 9073 validator -127.0.0.1 9074 validator -127.0.0.1 9075 validator -127.0.0.1 9076 validator -127.0.0.1 9077 validator -127.0.0.1 9078 validator -127.0.0.1 9079 validator -127.0.0.1 9080 validator -127.0.0.1 9081 validator -127.0.0.1 9082 validator -127.0.0.1 9083 validator -127.0.0.1 9084 validator -127.0.0.1 9085 validator -127.0.0.1 9086 validator -127.0.0.1 9087 validator -127.0.0.1 9088 validator -127.0.0.1 9089 validator -127.0.0.1 9090 validator -127.0.0.1 9091 validator -127.0.0.1 9092 validator -127.0.0.1 9093 validator -127.0.0.1 9094 validator -127.0.0.1 9095 validator -127.0.0.1 9096 validator -127.0.0.1 9097 validator -127.0.0.1 9098 validator -127.0.0.1 9099 validator -127.0.0.1 9100 validator -127.0.0.1 9000 leader diff --git a/local_iplist2.txt b/local_iplist2.txt deleted file mode 100644 index 6da5dfc4c..000000000 --- a/local_iplist2.txt +++ /dev/null @@ -1,11 +0,0 @@ -127.0.0.1 9001 validator -127.0.0.1 9002 validator -127.0.0.1 9003 validator -127.0.0.1 9004 validator -127.0.0.1 9005 validator -127.0.0.1 9006 validator -127.0.0.1 9007 validator -127.0.0.1 9008 validator -127.0.0.1 9009 validator -127.0.0.1 9010 validator -127.0.0.1 9000 leader From 6da433661478833f44a583b4f7b1f330631ce14c Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 15:55:31 -0700 Subject: [PATCH 08/13] Add sharding config with 2 shard; update tx generator accordingly --- aws-code/transaction_generator.go | 45 +++++++++++++++++++++++-------- benchmark_main.go | 6 ++--- consensus/consensus_leader.go | 4 +-- local_config_shards.txt | 22 +++++++++++++++ 4 files changed, 61 insertions(+), 16 deletions(-) create mode 100644 local_config_shards.txt diff --git a/aws-code/transaction_generator.go b/aws-code/transaction_generator.go index 9aea7a5f5..e46301f1e 100644 --- a/aws-code/transaction_generator.go +++ b/aws-code/transaction_generator.go @@ -11,6 +11,7 @@ import ( "os" "strings" "time" + "log" ) func newRandTransaction() blockchain.Transaction { @@ -22,14 +23,14 @@ func newRandTransaction() blockchain.Transaction { return tx } -func getPeers(Ip, Port, config string) []p2p.Peer { +func getValidators(config string) []p2p.Peer { file, _ := os.Open(config) fscanner := bufio.NewScanner(file) var peerList []p2p.Peer for fscanner.Scan() { p := strings.Split(fscanner.Text(), " ") ip, port, status := p[0], p[1], p[2] - if status == "leader" || ip == Ip && port == Port { + if status == "leader" { continue } peer := p2p.Peer{Port: port, Ip: ip} @@ -37,15 +38,39 @@ func getPeers(Ip, Port, config string) []p2p.Peer { } return peerList } -func main() { - ip := flag.String("ip", "127.0.0.1", "IP of the leader") - port := flag.String("port", "9000", "port of the leader.") +func getLeaders(config *[][]string) []p2p.Peer { + var peerList []p2p.Peer + for _, node := range *config { + ip, port, status := node[0], node[1], node[2] + if status == "leader" { + peerList = append(peerList, p2p.Peer{Ip: ip, Port: port}) + } + } + return peerList +} + +func readConfigFile(configFile string) [][]string{ + file, _ := os.Open(configFile) + fscanner := bufio.NewScanner(file) + + result := [][]string{} + for fscanner.Scan() { + p := strings.Split(fscanner.Text(), " ") + result = append(result, p) + } + return result +} + +func main() { configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") - //getLeader to get ip,port and get totaltime I want to run + flag.Parse() + config := readConfigFile(*configFile) + start := time.Now() totalTime := 60.0 txs := make([]blockchain.Transaction, 10) + leaders := getLeaders(&config) for true { t := time.Now() if t.Sub(start).Seconds() >= totalTime { @@ -57,13 +82,11 @@ func main() { } msg := node.ConstructTransactionListMessage(txs) - p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg) + log.Printf("[Generator] Sending txs to %d leader[s]\n", len(leaders)) + p2p.BroadcastMessage(leaders, msg) time.Sleep(1 * time.Second) // 10 transactions per second } msg := node.ConstructStopMessage() - var leaderPeer p2p.Peer - leaderPeer.Ip = *ip - leaderPeer.Port = *port - peers := append(getPeers(*ip, *port, *configFile), leaderPeer) + peers := append(getValidators(*configFile), leaders...) p2p.BroadcastMessage(peers, msg) } diff --git a/benchmark_main.go b/benchmark_main.go index 2652697c5..5b6c1b70d 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -36,8 +36,8 @@ func getLeader(myShardId string, config *[][]string) p2p.Peer { func getPeers(myIp, myPort, myShardId string, config *[][]string) []p2p.Peer { var peerList []p2p.Peer for _, node := range *config { - ip, port, status := node[0], node[1], node[2] - if status == "leader" || ip == myIp && port == myPort { + ip, port, status, shardId := node[0], node[1], node[2], node[3] + if status == "leader" || ip == myIp && port == myPort || myShardId != shardId { continue } peer := p2p.Peer{Port: port, Ip: ip} @@ -79,7 +79,7 @@ func main() { } log.Println("======================================") - log.Printf("This node is a %s node listening on ip: %s and port: %s\n", nodeStatus, *ip, *port) + log.Printf("This node is a %s node in shard %s listening on ip: %s and port: %s\n", nodeStatus, shardId, *ip, *port) log.Println("======================================") node := node.NewNode(&consensus) diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 59c93f21f..5b707ce86 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -37,7 +37,7 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) { log.Print(err) } - log.Printf("[Leader] Received and processing message: %s\n", msgType) + log.Printf("[Leader-%d] Received and processing message: %s\n", consensus.ShardId, msgType) switch msgType { case ANNOUNCE: log.Printf("Unexpected message type: %s", msgType) @@ -287,7 +287,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { // Set state to FINISHED consensus.state = FINISHED // TODO: do followups on the consensus - log.Printf("HOORAY!!! CONSENSUS REACHED AMONG %d NODES!!!\n", len(consensus.validators)) + log.Printf("[Shard %d] HOORAY!!! CONSENSUS REACHED AMONG %d NODES WITH CONSENSUS ID %d!!!\n", consensus.ShardId, len(consensus.validators), consensus.consensusId) consensus.ResetState() consensus.consensusId++ consensus.ReadySignal <- 1 diff --git a/local_config_shards.txt b/local_config_shards.txt new file mode 100644 index 000000000..9d1c3c3ae --- /dev/null +++ b/local_config_shards.txt @@ -0,0 +1,22 @@ +127.0.0.1 9010 validator 0 +127.0.0.1 9011 validator 0 +127.0.0.1 9012 validator 0 +127.0.0.1 9013 validator 0 +127.0.0.1 9014 validator 0 +127.0.0.1 9015 validator 0 +127.0.0.1 9016 validator 0 +127.0.0.1 9017 validator 0 +127.0.0.1 9018 validator 0 +127.0.0.1 9019 validator 0 +127.0.0.1 9020 validator 1 +127.0.0.1 9021 validator 1 +127.0.0.1 9022 validator 1 +127.0.0.1 9023 validator 1 +127.0.0.1 9024 validator 1 +127.0.0.1 9025 validator 1 +127.0.0.1 9026 validator 1 +127.0.0.1 9027 validator 1 +127.0.0.1 9028 validator 1 +127.0.0.1 9029 validator 1 +127.0.0.1 9000 leader 0 +127.0.0.1 9001 leader 1 From d771fe7df9fe03c9f196def9750b562b70c0dbbd Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 16:03:20 -0700 Subject: [PATCH 09/13] Format existing code according to gofmt --- aws-code/transaction_generator.go | 4 ++-- benchmark_main.go | 8 ++++---- consensus/consensus.go | 9 ++++----- consensus/consensus_leader_test.go | 16 ++++++++-------- consensus/consensus_test.go | 11 ++++++----- consensus/consensus_validator_test.go | 16 ++++++++-------- p2p/message.go | 1 - 7 files changed, 32 insertions(+), 33 deletions(-) diff --git a/aws-code/transaction_generator.go b/aws-code/transaction_generator.go index e46301f1e..786d47b9b 100644 --- a/aws-code/transaction_generator.go +++ b/aws-code/transaction_generator.go @@ -7,11 +7,11 @@ import ( "harmony-benchmark/blockchain" "harmony-benchmark/node" "harmony-benchmark/p2p" + "log" "math/rand" "os" "strings" "time" - "log" ) func newRandTransaction() blockchain.Transaction { @@ -50,7 +50,7 @@ func getLeaders(config *[][]string) []p2p.Peer { return peerList } -func readConfigFile(configFile string) [][]string{ +func readConfigFile(configFile string) [][]string { file, _ := os.Open(configFile) fscanner := bufio.NewScanner(file) diff --git a/benchmark_main.go b/benchmark_main.go index 5b6c1b70d..0c82234a3 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -1,13 +1,13 @@ package main import ( + "bufio" "flag" "harmony-benchmark/consensus" - "harmony-benchmark/p2p" "harmony-benchmark/node" + "harmony-benchmark/p2p" "log" "os" - "bufio" "strings" ) @@ -25,7 +25,7 @@ func getLeader(myShardId string, config *[][]string) p2p.Peer { var leaderPeer p2p.Peer for _, node := range *config { ip, port, status, shardId := node[0], node[1], node[2], node[3] - if status == "leader" && myShardId == shardId{ + if status == "leader" && myShardId == shardId { leaderPeer.Ip = ip leaderPeer.Port = port } @@ -46,7 +46,7 @@ func getPeers(myIp, myPort, myShardId string, config *[][]string) []p2p.Peer { return peerList } -func readConfigFile(configFile string) [][]string{ +func readConfigFile(configFile string) [][]string { file, _ := os.Open(configFile) fscanner := bufio.NewScanner(file) diff --git a/consensus/consensus.go b/consensus/consensus.go index 497048a02..4850ac892 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -2,11 +2,11 @@ package consensus // consensus import ( + "harmony-benchmark/message" "harmony-benchmark/p2p" - "regexp" "log" + "regexp" "strconv" - "harmony-benchmark/message" ) // Consensus data containing all info related to one consensus process @@ -42,7 +42,7 @@ type Consensus struct { //// Network related fields msgCategory byte - actionType byte + actionType byte } // Consensus state enum for both leader and validator @@ -126,10 +126,9 @@ func NewConsensus(ip, port, shardId string, peers []p2p.Peer, leader p2p.Peer) C return consensus } - // Reset the state of the consensus func (consensus *Consensus) ResetState() { consensus.state = READY consensus.commits = make(map[string]string) consensus.responses = make(map[string]string) -} \ No newline at end of file +} diff --git a/consensus/consensus_leader_test.go b/consensus/consensus_leader_test.go index 9dee33b27..f500c8d25 100644 --- a/consensus/consensus_leader_test.go +++ b/consensus/consensus_leader_test.go @@ -1,14 +1,14 @@ package consensus import ( - "testing" "harmony-benchmark/p2p" + "testing" ) func TestConstructAnnounceMessage(test *testing.T) { header := getBlockHeader() - leader := p2p.Peer{Ip: "1", Port:"2"} - validator := p2p.Peer{Ip: "3", Port:"5"} + leader := p2p.Peer{Ip: "1", Port: "2"} + validator := p2p.Peer{Ip: "3", Port: "5"} consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = getBlockHash(make([]byte, 10)) msg, err := consensus.constructAnnounceMessage() @@ -16,19 +16,19 @@ func TestConstructAnnounceMessage(test *testing.T) { if err != nil { test.Error("Annouce message is not constructed successfully") } - if len(msg) != 1 + 1 + 1 + 4 + 32 + 2 + 4 + 64 + len(header) { + if len(msg) != 1+1+1+4+32+2+4+64+len(header) { test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg)) } } func TestConstructChallengeMessage(test *testing.T) { - leader := p2p.Peer{Ip: "1", Port:"2"} - validator := p2p.Peer{Ip: "3", Port:"5"} + leader := p2p.Peer{Ip: "1", Port: "2"} + validator := p2p.Peer{Ip: "3", Port: "5"} consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = getBlockHash(make([]byte, 10)) msg := consensus.constructChallengeMessage() - if len(msg) != 1 + 1 + 1 + 4 + 32 + 2 + 33 + 33 + 32 + 64 { + if len(msg) != 1+1+1+4+32+2+33+33+32+64 { test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg)) } -} \ No newline at end of file +} diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 72e68641f..01977d44f 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -1,13 +1,14 @@ package consensus import ( - "testing" - "harmony-benchmark/p2p" "harmony-benchmark/message" + "harmony-benchmark/p2p" + "testing" ) + func TestNewConsensus(test *testing.T) { - leader := p2p.Peer{Ip: "1", Port:"2"} - validator := p2p.Peer{Ip: "3", Port:"5"} + leader := p2p.Peer{Ip: "1", Port: "2"} + validator := p2p.Peer{Ip: "3", Port: "5"} consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) if consensus.consensusId != 0 { test.Errorf("Consensus Id is initialized to the wrong value: %d", consensus.consensusId) @@ -32,4 +33,4 @@ func TestNewConsensus(test *testing.T) { if consensus.leader != leader { test.Error("Consensus Leader is set to wrong Peer") } -} \ No newline at end of file +} diff --git a/consensus/consensus_validator_test.go b/consensus/consensus_validator_test.go index 0fc65d3d4..a8e2b0e7e 100644 --- a/consensus/consensus_validator_test.go +++ b/consensus/consensus_validator_test.go @@ -1,30 +1,30 @@ package consensus import ( - "testing" "harmony-benchmark/p2p" + "testing" ) func TestConstructCommitMessage(test *testing.T) { - leader := p2p.Peer{Ip: "1", Port:"2"} - validator := p2p.Peer{Ip: "3", Port:"5"} + leader := p2p.Peer{Ip: "1", Port: "2"} + validator := p2p.Peer{Ip: "3", Port: "5"} consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = getBlockHash(make([]byte, 10)) msg := consensus.constructCommitMessage() - if len(msg) != 1 + 1 + 1 + 4 + 32 + 2 + 33 + 64 { + if len(msg) != 1+1+1+4+32+2+33+64 { test.Errorf("Commit message is not constructed in the correct size: %d", len(msg)) } } func TestConstructResponseMessage(test *testing.T) { - leader := p2p.Peer{Ip: "1", Port:"2"} - validator := p2p.Peer{Ip: "3", Port:"5"} + leader := p2p.Peer{Ip: "1", Port: "2"} + validator := p2p.Peer{Ip: "3", Port: "5"} consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = getBlockHash(make([]byte, 10)) msg := consensus.constructResponseMessage() - if len(msg) != 1 + 1 + 1 + 4 + 32 + 2 + 32 + 64 { + if len(msg) != 1+1+1+4+32+2+32+64 { test.Errorf("Response message is not constructed in the correct size: %d", len(msg)) } -} \ No newline at end of file +} diff --git a/p2p/message.go b/p2p/message.go index 6b8766eec..e04e4876b 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -48,7 +48,6 @@ func ReadMessageContent(conn net.Conn) ([]byte, error) { } // TODO: check on msgType and take actions accordingly - //// Read 4 bytes for message size fourBytes := make([]byte, 4) n, err := r.Read(fourBytes) From 461b3fa3d151a5d6be7a7605e54815d7517e2997 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 16:45:11 -0700 Subject: [PATCH 10/13] Refactor message/message.go to common/message.go --- {message => common}/message.go | 10 ++++++---- consensus/consensus.go | 6 +++--- consensus/consensus_test.go | 6 +++--- node/message.go | 10 +++++----- node/node.go | 22 +++++++++++----------- 5 files changed, 28 insertions(+), 26 deletions(-) rename {message => common}/message.go (92%) diff --git a/message/message.go b/common/message.go similarity index 92% rename from message/message.go rename to common/message.go index c951379c3..2f7b61328 100644 --- a/message/message.go +++ b/common/message.go @@ -1,4 +1,4 @@ -package message +package common import ( "errors" @@ -26,7 +26,7 @@ n - 2 bytes - actual message payload const NODE_TYPE_BYTES = 1 const ACTION_TYPE_BYTES = 1 -// The category of messages +// The CATEGORY of messages type MessageCategory byte const ( @@ -35,7 +35,8 @@ const ( // TODO: add more types ) -// The specific types of message under committee category + +// The specific types of message under COMMITTEE category type CommitteeMessageType byte const ( @@ -43,7 +44,7 @@ const ( // TODO: add more types ) -// The specific types of message under node category +// The specific types of message under NODE category type NodeMessageType byte const ( @@ -52,6 +53,7 @@ const ( // TODO: add more types ) + // Get the message category from the p2p message content func GetMessageCategory(message []byte) (MessageCategory, error) { if len(message) < NODE_TYPE_BYTES { diff --git a/consensus/consensus.go b/consensus/consensus.go index 4850ac892..8f87ee408 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -2,7 +2,7 @@ package consensus // consensus import ( - "harmony-benchmark/message" + "harmony-benchmark/common" "harmony-benchmark/p2p" "log" "regexp" @@ -121,8 +121,8 @@ func NewConsensus(ip, port, shardId string, peers []p2p.Peer, leader p2p.Peer) C }() } - consensus.msgCategory = byte(message.COMMITTEE) - consensus.actionType = byte(message.CONSENSUS) + consensus.msgCategory = byte(common.COMMITTEE) + consensus.actionType = byte(common.CONSENSUS) return consensus } diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 01977d44f..fba6c55c1 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -1,7 +1,7 @@ package consensus import ( - "harmony-benchmark/message" + "harmony-benchmark/common" "harmony-benchmark/p2p" "testing" ) @@ -22,11 +22,11 @@ func TestNewConsensus(test *testing.T) { test.Error("Consensus ReadySignal should be initialized") } - if consensus.actionType != byte(message.CONSENSUS) { + if consensus.actionType != byte(common.CONSENSUS) { test.Error("Consensus actionType should be CONSENSUS") } - if consensus.msgCategory != byte(message.COMMITTEE) { + if consensus.msgCategory != byte(common.COMMITTEE) { test.Error("Consensus msgCategory should be COMMITTEE") } diff --git a/node/message.go b/node/message.go index 0a071bb31..dc537b1b3 100644 --- a/node/message.go +++ b/node/message.go @@ -4,7 +4,7 @@ import ( "bytes" "encoding/gob" "harmony-benchmark/blockchain" - "harmony-benchmark/message" + "harmony-benchmark/common" ) type TransactionMessageType int @@ -21,8 +21,8 @@ const ( //ConstructTransactionListMessage constructs serialized transactions func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)}) - byteBuffer.WriteByte(byte(message.TRANSACTION)) + byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) + byteBuffer.WriteByte(byte(common.TRANSACTION)) byteBuffer.WriteByte(byte(SEND)) encoder := gob.NewEncoder(byteBuffer) encoder.Encode(transactions) @@ -31,8 +31,8 @@ func ConstructTransactionListMessage(transactions []blockchain.Transaction) []by //ConstructStopMessage is STOP message func ConstructStopMessage() []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)}) - byteBuffer.WriteByte(byte(message.CONTROL)) + byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) + byteBuffer.WriteByte(byte(common.CONTROL)) byteBuffer.WriteByte(byte(STOP)) return byteBuffer.Bytes() } diff --git a/node/node.go b/node/node.go index f14f8cc8b..062e17a85 100644 --- a/node/node.go +++ b/node/node.go @@ -5,7 +5,7 @@ import ( "encoding/gob" "harmony-benchmark/blockchain" "harmony-benchmark/consensus" - "harmony-benchmark/message" + "harmony-benchmark/common" "harmony-benchmark/p2p" "log" "net" @@ -60,7 +60,7 @@ func (node *Node) NodeHandler(conn net.Conn) { return } - msgCategory, err := message.GetMessageCategory(content) + msgCategory, err := common.GetMessageCategory(content) if err != nil { if consensus.IsLeader { log.Printf("[Leader] Read node type failed:%s", err) @@ -70,7 +70,7 @@ func (node *Node) NodeHandler(conn net.Conn) { return } - msgType, err := message.GetMessageType(content) + msgType, err := common.GetMessageType(content) if err != nil { if consensus.IsLeader { log.Printf("[Leader] Read action type failed:%s", err) @@ -80,7 +80,7 @@ func (node *Node) NodeHandler(conn net.Conn) { return } - msgPayload, err := message.GetMessagePayload(content) + msgPayload, err := common.GetMessagePayload(content) if err != nil { if consensus.IsLeader { log.Printf("[Leader] Read message payload failed:%s", err) @@ -91,20 +91,20 @@ func (node *Node) NodeHandler(conn net.Conn) { } switch msgCategory { - case message.COMMITTEE: - actionType := message.CommitteeMessageType(msgType) + case common.COMMITTEE: + actionType := common.CommitteeMessageType(msgType) switch actionType { - case message.CONSENSUS: + case common.CONSENSUS: if consensus.IsLeader { consensus.ProcessMessageLeader(msgPayload) } else { consensus.ProcessMessageValidator(msgPayload) } } - case message.NODE: - actionType := message.NodeMessageType(msgType) + case common.NODE: + actionType := common.NodeMessageType(msgType) switch actionType { - case message.TRANSACTION: + case common.TRANSACTION: txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type txList := new([]blockchain.Transaction) @@ -114,7 +114,7 @@ func (node *Node) NodeHandler(conn net.Conn) { } node.pendingTransactions = append(node.pendingTransactions, *txList...) log.Println(len(node.pendingTransactions)) - case message.CONTROL: + case common.CONTROL: controlType := msgPayload[0] if ControlMessageType(controlType) == STOP { log.Println("Stopping Node") From bcdb352ae08f4249fd9f03a0998e5b9c4694bdbf Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 17:25:54 -0700 Subject: [PATCH 11/13] Add RequestTransactionsMessage for requesting missing transactions in a node --- node/message.go | 16 ++++++++++++++-- node/node.go | 32 +++++++++++++++++++++----------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/node/message.go b/node/message.go index dc537b1b3..3910eb85c 100644 --- a/node/message.go +++ b/node/message.go @@ -7,14 +7,15 @@ import ( "harmony-benchmark/common" ) +// The types of messages used for NODE/TRANSACTION type TransactionMessageType int - const ( SEND TransactionMessageType = iota + REQUEST ) +// The types of messages used for NODE/CONTROL type ControlMessageType int - const ( STOP ControlMessageType = iota ) @@ -29,6 +30,17 @@ func ConstructTransactionListMessage(transactions []blockchain.Transaction) []by return byteBuffer.Bytes() } +//ConstructTransactionListMessage constructs serialized transactions +func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) + byteBuffer.WriteByte(byte(common.TRANSACTION)) + byteBuffer.WriteByte(byte(REQUEST)) + for _, txId := range transactionIds { + byteBuffer.Write(txId) + } + return byteBuffer.Bytes() +} + //ConstructStopMessage is STOP message func ConstructStopMessage() []byte { byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) diff --git a/node/node.go b/node/node.go index 062e17a85..c9a9e2e93 100644 --- a/node/node.go +++ b/node/node.go @@ -1,8 +1,6 @@ package node import ( - "bytes" - "encoding/gob" "harmony-benchmark/blockchain" "harmony-benchmark/consensus" "harmony-benchmark/common" @@ -11,6 +9,8 @@ import ( "net" "os" "time" + "bytes" + "encoding/gob" ) // A node represents a program (machine) participating in the network @@ -105,15 +105,7 @@ func (node *Node) NodeHandler(conn net.Conn) { actionType := common.NodeMessageType(msgType) switch actionType { case common.TRANSACTION: - txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type - - txList := new([]blockchain.Transaction) - err := txDecoder.Decode(&txList) - if err != nil { - log.Println("Failed deserializing transaction list") - } - node.pendingTransactions = append(node.pendingTransactions, *txList...) - log.Println(len(node.pendingTransactions)) + node.transactionMessageHandler(msgPayload) case common.CONTROL: controlType := msgPayload[0] if ControlMessageType(controlType) == STOP { @@ -125,6 +117,24 @@ func (node *Node) NodeHandler(conn net.Conn) { } } +func (node *Node) transactionMessageHandler(msgPayload []byte) { + txMessageType := TransactionMessageType(msgPayload[0]) + + switch txMessageType { + case SEND: + txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type + + txList := new([]blockchain.Transaction) + err := txDecoder.Decode(&txList) + if err != nil { + log.Println("Failed deserializing transaction list") + } + node.pendingTransactions = append(node.pendingTransactions, *txList...) + case REQUEST: + // TODO: fill in logic to return the request transactions + } +} + func (node *Node) WaitForConsensusReady(readySignal chan int) { for { // keep waiting for consensus ready <-readySignal From 95ca9314315cca83bc257c0892850d286bd8c579 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 17:55:04 -0700 Subject: [PATCH 12/13] Add logic to get the requested transactions --- node/node.go | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/node/node.go b/node/node.go index c9a9e2e93..1f5bd37d7 100644 --- a/node/node.go +++ b/node/node.go @@ -131,8 +131,36 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) { } node.pendingTransactions = append(node.pendingTransactions, *txList...) case REQUEST: - // TODO: fill in logic to return the request transactions + reader := bytes.NewBuffer(msgPayload[1:]) + var txIds map[[32]byte]bool + txId := make([]byte, 32) // 32 byte hash Id + for { + _, err := reader.Read(txId) + if err != nil { + break + } + + txIds[getFixedByteTxId(txId)] = true + } + + var txToReturn []blockchain.Transaction + for _, tx := range node.pendingTransactions { + if txIds[getFixedByteTxId(tx.ID)] { + txToReturn = append(txToReturn, tx) + } + } + + // TODO: return the transaction list to requester + } +} + +// Copy the txId byte slice over to 32 byte array so the map can key on it +func getFixedByteTxId(txId []byte) [32]byte { + var id [32]byte + for i := range id { + id[i] = txId[i] } + return id } func (node *Node) WaitForConsensusReady(readySignal chan int) { From 6adafcb8c976b2282f3ed1897884295bac42f71f Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 21:19:09 -0700 Subject: [PATCH 13/13] Fill in multisig mocks --- consensus/consensus.go | 2 +- consensus/consensus_leader.go | 54 +++++++++++++++++++------------- consensus/consensus_validator.go | 6 ++-- 3 files changed, 36 insertions(+), 26 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index 8f87ee408..074a50a3e 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -31,7 +31,7 @@ type Consensus struct { // Consensus Id (View Id) - 4 byte consensusId uint32 // Blockhash - 32 byte - blockHash []byte + blockHash [32]byte // BlockHeader to run consensus on blockHeader []byte // Shard Id which this node belongs to diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 5b707ce86..5714b0689 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -10,6 +10,8 @@ import ( "fmt" "harmony-benchmark/blockchain" "harmony-benchmark/p2p" + "crypto/sha256" + "strings" ) var mutex = &sync.Mutex{} @@ -64,7 +66,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { // prepare message and broadcast to validators // Construct new block //newBlock := constructNewBlock() - consensus.blockHash = newBlock.Hash + copy(newBlock.Hash[:32], consensus.blockHash[:]) msgToSend, err := consensus.constructAnnounceMessage() if err != nil { @@ -88,7 +90,7 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) { if len(consensus.blockHash) != 32 { return buffer.Bytes(), errors.New(fmt.Sprintf("Block Hash size is %d bytes", len(consensus.blockHash))) } - buffer.Write(consensus.blockHash) + buffer.Write(consensus.blockHash[:]) // 2 byte leader id twoBytes := make([]byte, 2) @@ -111,14 +113,9 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) { return consensus.ConstructConsensusMessage(ANNOUNCE, buffer.Bytes()), nil } -// TODO: fill in this function -func constructNewBlock() []byte { - return make([]byte, 200) -} - -// TODO: fill in this function -func getBlockHash(block []byte) []byte { - return make([]byte, 32) +// Get the hash of a block's byte stream +func getBlockHash(block []byte) [32]byte { + return sha256.Sum256(block) } // TODO: fill in this function @@ -126,9 +123,10 @@ func getBlockHeader() []byte { return make([]byte, 200) } -// TODO: fill in this function func signMessage(message []byte) []byte { - return make([]byte, 64) + // TODO: implement real ECC signature + mockSignature := sha256.Sum256(message) + return append(mockSignature[:], mockSignature[:]...) } func (consensus *Consensus) processCommitMessage(payload []byte) { @@ -199,7 +197,7 @@ func (consensus Consensus) constructChallengeMessage() []byte { buffer.Write(fourBytes) // 32 byte block hash - buffer.Write(consensus.blockHash) + buffer.Write(consensus.blockHash[:]) // 2 byte leader id twoBytes := make([]byte, 2) @@ -207,10 +205,10 @@ func (consensus Consensus) constructChallengeMessage() []byte { buffer.Write(twoBytes) // 33 byte aggregated commit - buffer.Write(getAggregatedCommit()) + buffer.Write(getAggregatedCommit(consensus.commits)) // 33 byte aggregated key - buffer.Write(getAggregatedKey()) + buffer.Write(getAggregatedKey(consensus.commits)) // 32 byte challenge buffer.Write(getChallenge()) @@ -222,18 +220,30 @@ func (consensus Consensus) constructChallengeMessage() []byte { return consensus.ConstructConsensusMessage(CHALLENGE, buffer.Bytes()) } -// TODO: fill in this function -func getAggregatedCommit() []byte { - return make([]byte, 33) +func getAggregatedCommit(commits map[string]string) []byte { + // TODO: implement actual commit aggregation + var commitArray []string + for _, val := range commits { + commitArray = append(commitArray, val) + } + var commit [32]byte + commit = sha256.Sum256([]byte(strings.Join(commitArray, ""))) + return append(commit[:], byte(0)) } -// TODO: fill in this function -func getAggregatedKey() []byte { - return make([]byte, 33) +func getAggregatedKey(commits map[string]string) []byte { + // TODO: implement actual key aggregation + var commitArray []string + for key := range commits { + commitArray = append(commitArray, key) + } + var commit [32]byte + commit = sha256.Sum256([]byte(strings.Join(commitArray, ""))) + return append(commit[:], byte(0)) } -// TODO: fill in this function func getChallenge() []byte { + // TODO: implement actual challenge data return make([]byte, 32) } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index c34d343f3..e426cacda 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -71,7 +71,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { _ = blockHeaderSize _ = signature - consensus.blockHash = blockHash + copy(blockHash[:32], consensus.blockHash[:]) // verify block data if consensusId != consensus.consensusId { log.Printf("Received message with consensus Id: %d. My consensus Id: %d\n", consensusId, consensus.consensusId) @@ -98,7 +98,7 @@ func (consensus Consensus) constructCommitMessage() []byte { buffer.Write(fourBytes) // 32 byte block hash - buffer.Write(consensus.blockHash) + buffer.Write(consensus.blockHash[:]) // 2 byte validator id twoBytes := make([]byte, 2) @@ -190,7 +190,7 @@ func (consensus Consensus) constructResponseMessage() []byte { buffer.Write(fourBytes) // 32 byte block hash - buffer.Write(consensus.blockHash) + buffer.Write(consensus.blockHash[:32]) // 2 byte validator id twoBytes := make([]byte, 2)