diff --git a/.travis.yml b/.travis.yml index 42e529457..120c9a6a6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,4 +9,4 @@ install: | cd $HOME/gopath/src/harmony-benchmark notifications: slack: - secure: F6PRLJ7fIVpvZAEAIikE9u53CwklytznkNSJPuzB7cxLttAejls8ou3jsFFUkdTyyyy+QKFP9Bj+IFcAI9dq3CDYp7MnTQx/ajn8TC0xBwKW3gEKYoptIQJkXzvdwp9OanpU78QynRe1oRLefkN8qrL0zWLvoqcaHSYwjh1kzDCbO9G13aiI5CiFdW7jKwBKEPlleze4so0UdAaaGxyXUiSsmvHrOwF72ElnahKLy9DzmIgT8PdQkf1BmV3yaH3VQoU4fyu8t5P/jy8NYIXomcpo+pavgUBfXwd3mPf1KkAoB2RXdGCkORzHNZ6h+63ZVDzi1t8FWAbuNmvJ/Hq07IScGcsjZCEdy+79NSSBHThUejnHnqD4QdcXdhr30PoWBX85+phwPEVehAtddzgLyzivGByKHQMfWcetvPRww7Im+MpJW3OssgHlPxbRCE0uFtSDYgD9UoKevuZzFnME3U1rT9xqTweqOzxdsGRpvU/fLbh0PQCJnDQJjOB054l6ZTu9mtt+ogbYpOhB2PO6qLJZ64cn7M5/wLMiswRWooT9DeN+5a7n5MRo1i/TWFDFWBFYQVWTYdiTaU+oZdSZsu25TMboVraHPw3HE5tKIGgqOBLLd187HbE5ZeR62bzUnlN1wK1tphfgVa4ztmL+NY9nTgHMKki7LiNIL1p7M68= + secure: RPB3ThYIGuDUidvaWfOA7Hc9x1bDfd5+Y10r7xwY+NGCN3zW86s/GNLpLutI0MWTV9e2CJupHvz5clp8Ktle/tVjLhs6jHQnNV7U8PTWKkL5By6IFVAHN12unMQn/m0RPwqMfdubajXoV51XhbFA/iow/0fqwsd61VdPIuBrlQjy9z7kyVnRLNoGvYjDqKEkJfYVb3qFNFLzD0F7Y2AgxnezIRjsTLgHzR4owLJYqVMhvTYIV9/vSf1w4UUPzhHyZRESl6bri+a1+g7GxE32OtNwq68xxVeeJcrO/MbjAHHW9V6BW1MjJfYzD5T+7JHIfZOjV2WgzJ7uCkVYztfq+02yOCSWsLNxFVojIDhVFEhhJ6Vd2Zf1otolS7j0svK/qNmShID9q9NAasaI105GsQgtaSPAUGd88J/vyX2ndG1nDOvxmgOo10tZFOnPHW7JnWMybk3PLza8o1ujA7X3JFdvDA8BPP9h6MVP4N7doCQ/n4Crts53HvEWlvcv5sBNu61WYlSTBzf1qNwBKMyN2E0rNubsxKmW8B6jLdWYdlx57nyTRPraNKGE1fnUW5nWRZGax3F1tQRwEfpQMk22qgeUK0RYWsPgHFaPciKCA3dJX7t1k/ib9pyR4nc9SZnYw54KMhkAXPIVQ0iy0EpTAH1DNYV6v8zXCwjl+BdkhlY= diff --git a/README.md b/README.md index 4184cbb1d..4a0618ac0 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ git clone git@github.com:simple-rules/harmony-benchmark.git ## Usage ``` -./deploy.sh local_iplist.txt +./deploy.sh local_config.txt ./send_txn.sh ``` diff --git a/aws-code/transaction_generator.go b/aws-code/transaction_generator.go index 2cb8de967..786d47b9b 100644 --- a/aws-code/transaction_generator.go +++ b/aws-code/transaction_generator.go @@ -7,6 +7,7 @@ import ( "harmony-benchmark/blockchain" "harmony-benchmark/node" "harmony-benchmark/p2p" + "log" "math/rand" "os" "strings" @@ -22,14 +23,14 @@ func newRandTransaction() blockchain.Transaction { return tx } -func getPeers(Ip, Port, iplist string) []p2p.Peer { - file, _ := os.Open(iplist) +func getValidators(config string) []p2p.Peer { + file, _ := os.Open(config) fscanner := bufio.NewScanner(file) var peerList []p2p.Peer for fscanner.Scan() { p := strings.Split(fscanner.Text(), " ") ip, port, status := p[0], p[1], p[2] - if status == "leader" || ip == Ip && port == Port { + if status == "leader" { continue } peer := p2p.Peer{Port: port, Ip: ip} @@ -37,15 +38,39 @@ func getPeers(Ip, Port, iplist string) []p2p.Peer { } return peerList } + +func getLeaders(config *[][]string) []p2p.Peer { + var peerList []p2p.Peer + for _, node := range *config { + ip, port, status := node[0], node[1], node[2] + if status == "leader" { + peerList = append(peerList, p2p.Peer{Ip: ip, Port: port}) + } + } + return peerList +} + +func readConfigFile(configFile string) [][]string { + file, _ := os.Open(configFile) + fscanner := bufio.NewScanner(file) + + result := [][]string{} + for fscanner.Scan() { + p := strings.Split(fscanner.Text(), " ") + result = append(result, p) + } + return result +} + func main() { + configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") + flag.Parse() + config := readConfigFile(*configFile) - ip := flag.String("ip", "127.0.0.1", "IP of the leader") - port := flag.String("port", "9000", "port of the leader.") - ipfile := flag.String("ipfile", "local_iplist.txt", "file containing all ip addresses") - //getLeader to get ip,port and get totaltime I want to run start := time.Now() totalTime := 60.0 txs := make([]blockchain.Transaction, 10) + leaders := getLeaders(&config) for true { t := time.Now() if t.Sub(start).Seconds() >= totalTime { @@ -57,13 +82,11 @@ func main() { } msg := node.ConstructTransactionListMessage(txs) - p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg) + log.Printf("[Generator] Sending txs to %d leader[s]\n", len(leaders)) + p2p.BroadcastMessage(leaders, msg) time.Sleep(1 * time.Second) // 10 transactions per second } msg := node.ConstructStopMessage() - var leaderPeer p2p.Peer - leaderPeer.Ip = *ip - leaderPeer.Port = *port - peers := append(getPeers(*ip, *port, *ipfile), leaderPeer) + peers := append(getValidators(*configFile), leaders...) p2p.BroadcastMessage(peers, msg) } diff --git a/aws-scripts/parse_json.py b/aws-scripts/parse_json.py index 8e8eb643b..196b10fa5 100644 --- a/aws-scripts/parse_json.py +++ b/aws-scripts/parse_json.py @@ -9,7 +9,7 @@ def get_public_ip(all_reservations): all_public_ip_addresses.append(instance_information['PublicIpAddress']) return all_public_ip_addresses -def make_peers_list(all_reservations,port="9001",filename="ipList.txt"): +def make_peers_list(all_reservations,port="9001",filename="config.txt"): p = get_public_ip(all_reservations) f = open(filename,"w") for i in range(len(p)): diff --git a/aws-scripts/run_instances.sh b/aws-scripts/run_instances.sh index b326edd34..bd4b7f59f 100755 --- a/aws-scripts/run_instances.sh +++ b/aws-scripts/run_instances.sh @@ -1,3 +1,3 @@ #!/bin/bash -x cd /home/ec2-user/projects/src/harmony-benchmark -./deploy_linux.sh local_iplist2.txt \ No newline at end of file +./deploy_linux.sh local_config2.txt \ No newline at end of file diff --git a/benchmark_main.go b/benchmark_main.go index ba30661f0..0c82234a3 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -1,24 +1,31 @@ package main import ( + "bufio" "flag" "harmony-benchmark/consensus" - "harmony-benchmark/p2p" "harmony-benchmark/node" + "harmony-benchmark/p2p" "log" "os" - "bufio" "strings" ) -func getLeader(iplist string) p2p.Peer { - file, _ := os.Open(iplist) - fscanner := bufio.NewScanner(file) +func getShardId(myIp, myPort string, config *[][]string) string { + for _, node := range *config { + ip, port, shardId := node[0], node[1], node[3] + if ip == myIp && port == myPort { + return shardId + } + } + return "N/A" +} + +func getLeader(myShardId string, config *[][]string) p2p.Peer { var leaderPeer p2p.Peer - for fscanner.Scan() { - p := strings.Split(fscanner.Text(), " ") - ip, port, status := p[0], p[1], p[2] - if status == "leader" { + for _, node := range *config { + ip, port, status, shardId := node[0], node[1], node[2], node[3] + if status == "leader" && myShardId == shardId { leaderPeer.Ip = ip leaderPeer.Port = port } @@ -26,14 +33,11 @@ func getLeader(iplist string) p2p.Peer { return leaderPeer } -func getPeers(Ip, Port, iplist string) []p2p.Peer { - file, _ := os.Open(iplist) - fscanner := bufio.NewScanner(file) +func getPeers(myIp, myPort, myShardId string, config *[][]string) []p2p.Peer { var peerList []p2p.Peer - for fscanner.Scan() { - p := strings.Split(fscanner.Text(), " ") - ip, port, status := p[0], p[1], p[2] - if status == "leader" || ip == Ip && port == Port { + for _, node := range *config { + ip, port, status, shardId := node[0], node[1], node[2], node[3] + if status == "leader" || ip == myIp && port == myPort || myShardId != shardId { continue } peer := p2p.Peer{Port: port, Ip: ip} @@ -42,13 +46,31 @@ func getPeers(Ip, Port, iplist string) []p2p.Peer { return peerList } +func readConfigFile(configFile string) [][]string { + file, _ := os.Open(configFile) + fscanner := bufio.NewScanner(file) + + result := [][]string{} + for fscanner.Scan() { + p := strings.Split(fscanner.Text(), " ") + result = append(result, p) + } + return result +} + func main() { ip := flag.String("ip", "127.0.0.1", "IP of the node") port := flag.String("port", "9000", "port of the node.") - ipfile := flag.String("ipfile", "iplist.txt", "file containing all ip addresses") + configFile := flag.String("config_file", "config.txt", "file containing all ip addresses") flag.Parse() - consensus := consensus.NewConsensus(*ip, *port, getPeers(*ip, *port, *ipfile), getLeader(*ipfile)) + config := readConfigFile(*configFile) + shardId := getShardId(*ip, *port, &config) + peers := getPeers(*ip, *port, shardId, &config) + leader := getLeader(shardId, &config) + + consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader) + var nodeStatus string if consensus.IsLeader { nodeStatus = "leader" @@ -57,7 +79,7 @@ func main() { } log.Println("======================================") - log.Printf("This node is a %s node listening on ip: %s and port: %s\n", nodeStatus, *ip, *port) + log.Printf("This node is a %s node in shard %s listening on ip: %s and port: %s\n", nodeStatus, shardId, *ip, *port) log.Println("======================================") node := node.NewNode(&consensus) diff --git a/blockchain/utxopool.go b/blockchain/utxopool.go index cb198c054..e79e88f62 100644 --- a/blockchain/utxopool.go +++ b/blockchain/utxopool.go @@ -4,6 +4,10 @@ import ( "encoding/hex" ) +const ( + MaxNumberOfTransactions = 100 +) + // UTXOPool stores transactions and balance associated with each address. type UTXOPool struct { // Mapping from address to a map of transaction id to a map of the index of output @@ -57,6 +61,80 @@ func (utxopool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool { return true } +// VerifyOneTransactionAndUpdate verifies if a list of transactions valid. +func (utxopool *UTXOPool) VerifyOneTransaction(tx *Transaction) bool { + spentTXOs := make(map[string]map[string]map[int]bool) + txID := hex.EncodeToString(tx.ID) + inTotal := 0 + // Calculate the sum of TxInput + for _, in := range tx.TxInput { + inTxID := hex.EncodeToString(in.TxID) + index := in.TxOutputIndex + // Check if the transaction with the addres is spent or not. + if val, ok := utxopool.utxo[in.Address][inTxID][index]; ok { + if spentTXOs[in.Address][inTxID][index] { + return false + } + inTotal += val + } else { + return false + } + // Mark the transactions with the address and index spent. + if _, ok := spentTXOs[in.Address]; !ok { + spentTXOs[in.Address] = make(map[string]map[int]bool) + } + if _, ok := spentTXOs[in.Address][inTxID]; !ok { + spentTXOs[in.Address][inTxID] = make(map[int]bool) + } + spentTXOs[in.Address][inTxID][index] = true + } + + outTotal := 0 + // Calculate the sum of TxOutput + for index, out := range tx.TxOutput { + if val, ok := spentTXOs[out.Address][txID][index]; ok { + return false + } + outTotal += out.Value + } + if inTotal != outTotal { + return false + } + return true +} + +func (utxopool *UTXOPool) Update(tx* Transaction) { + if utxopool != nil { + txID := hex.EncodeToString(tx.ID) + + // Remove + for _, in := range tx.TxInput { + inTxID := hex.EncodeToString(in.TxID) + delete(utxopool.utxo[in.Address][inTxID], in.TxOutputIndex) + } + + // Update + for index, out := range tx.TxOutput { + if _, ok := utxopool.utxo[out.Address]; !ok { + utxopool.utxo[out.Address] = make(map[string]map[int]int) + utxopool.utxo[out.Address][txID] = make(map[int]int) + } + if _, ok := utxopool.utxo[out.Address][txID]; !ok { + utxopool.utxo[out.Address][txID] = make(map[int]int) + } + utxopool.utxo[out.Address][txID][index] = out.Value + } + } +} + +func (utxopool *UTXOPool) VerifyOneTransactionAndUpdate(tx *Transaction) bool { + if utxopool.VerifyOneTransaction(tx) { + utxopool.Update(tx) + return true + } + retur false +} + // VerifyAndUpdate verifies a list of transactions and update utxoPool. func (utxopool *UTXOPool) VerifyAndUpdate(transactions []*Transaction) bool { if utxopool.VerifyTransactions(transactions) { @@ -111,3 +189,15 @@ func CreateUTXOPoolFromGenesisBlockChain(bc *Blockchain) *UTXOPool { tx := bc.blocks[0].Transactions[0] return CreateUTXOPoolFromTransaction(tx) } + +// SelectTransactionsForNewBlock returns a list of index of valid transactions for the new block. +func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transaction) (selected, unselected []*Transaction) { + selected, unselected = []*Transaction{}, []*Transaction{} + for id, tx := range transactions { + if len(selected) < MaxNumberOfTransactions && utxoPool.VerifyOneTransactionAndUpdate(tx) { + append(selected, tx) + } else { + append(unselected, tx) + } + } +} diff --git a/message/message.go b/common/message.go similarity index 92% rename from message/message.go rename to common/message.go index c951379c3..2f7b61328 100644 --- a/message/message.go +++ b/common/message.go @@ -1,4 +1,4 @@ -package message +package common import ( "errors" @@ -26,7 +26,7 @@ n - 2 bytes - actual message payload const NODE_TYPE_BYTES = 1 const ACTION_TYPE_BYTES = 1 -// The category of messages +// The CATEGORY of messages type MessageCategory byte const ( @@ -35,7 +35,8 @@ const ( // TODO: add more types ) -// The specific types of message under committee category + +// The specific types of message under COMMITTEE category type CommitteeMessageType byte const ( @@ -43,7 +44,7 @@ const ( // TODO: add more types ) -// The specific types of message under node category +// The specific types of message under NODE category type NodeMessageType byte const ( @@ -52,6 +53,7 @@ const ( // TODO: add more types ) + // Get the message category from the p2p message content func GetMessageCategory(message []byte) (MessageCategory, error) { if len(message) < NODE_TYPE_BYTES { diff --git a/consensus/consensus.go b/consensus/consensus.go index 0680fb7a9..2864b360b 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -4,6 +4,7 @@ package consensus // consensus import ( "fmt" "harmony-benchmark/message" + "harmony-benchmark/common" "harmony-benchmark/p2p" "log" "regexp" @@ -32,9 +33,11 @@ type Consensus struct { // Consensus Id (View Id) - 4 byte consensusId uint32 // Blockhash - 32 byte - blockHash []byte + blockHash [32]byte // BlockHeader to run consensus on blockHeader []byte + // Shard Id which this node belongs to + ShardId uint32 // Signal channel for starting a new consensus process ReadySignal chan int @@ -77,7 +80,7 @@ func (state ConsensusState) String() string { } // Create a new Consensus object -func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus { +func NewConsensus(ip, port, shardId string, peers []p2p.Peer, leader p2p.Peer) Consensus { // The first Ip, port passed will be leader. consensus := Consensus{} peer := p2p.Peer{Port: port, Ip: ip} @@ -100,6 +103,11 @@ func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus log.Fatal(err) } consensus.consensusId = 0 + myShardId, err := strconv.Atoi(shardId) + if err != nil { + panic("Unparseable shard Id" + shardId) + } + consensus.ShardId = uint32(myShardId) // For now use socket address as 16 byte Id // TODO: populate with correct Id @@ -115,8 +123,8 @@ func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus }() } - consensus.msgCategory = byte(message.COMMITTEE) - consensus.actionType = byte(message.CONSENSUS) + consensus.msgCategory = byte(common.COMMITTEE) + consensus.actionType = byte(common.CONSENSUS) return consensus } @@ -126,7 +134,6 @@ func (consensus *Consensus) ResetState() { consensus.commits = make(map[string]string) consensus.responses = make(map[string]string) } - // Returns ID of this consensus func (consensus *Consensus) GetIdentityString() string { var duty string diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 1f3eb4732..26a794500 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -10,6 +10,8 @@ import ( "fmt" "harmony-benchmark/blockchain" "harmony-benchmark/p2p" + "crypto/sha256" + "strings" ) var mutex = &sync.Mutex{} @@ -37,7 +39,8 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) { log.Print(err) } - consensus.Logf("Received and processing message: %s\n", msgType) + + log.Printf("[Leader-%d] Received and processing message: %s\n", consensus.ShardId, msgType) switch msgType { case ANNOUNCE: consensus.Logf("Unexpected message type: %s", msgType) @@ -64,7 +67,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { // prepare message and broadcast to validators // Construct new block //newBlock := constructNewBlock() - consensus.blockHash = newBlock.Hash + copy(newBlock.Hash[:32], consensus.blockHash[:]) msgToSend, err := consensus.constructAnnounceMessage() if err != nil { @@ -88,7 +91,7 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) { if len(consensus.blockHash) != 32 { return buffer.Bytes(), errors.New(fmt.Sprintf("Block Hash size is %d bytes", len(consensus.blockHash))) } - buffer.Write(consensus.blockHash) + buffer.Write(consensus.blockHash[:]) // 2 byte leader id twoBytes := make([]byte, 2) @@ -111,14 +114,9 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) { return consensus.ConstructConsensusMessage(ANNOUNCE, buffer.Bytes()), nil } -// TODO: fill in this function -func constructNewBlock() []byte { - return make([]byte, 200) -} - -// TODO: fill in this function -func getBlockHash(block []byte) []byte { - return make([]byte, 32) +// Get the hash of a block's byte stream +func getBlockHash(block []byte) [32]byte { + return sha256.Sum256(block) } // TODO: fill in this function @@ -126,9 +124,10 @@ func getBlockHeader() []byte { return make([]byte, 200) } -// TODO: fill in this function func signMessage(message []byte) []byte { - return make([]byte, 64) + // TODO: implement real ECC signature + mockSignature := sha256.Sum256(message) + return append(mockSignature[:], mockSignature[:]...) } func (consensus *Consensus) processCommitMessage(payload []byte) { @@ -199,7 +198,7 @@ func (consensus Consensus) constructChallengeMessage() []byte { buffer.Write(fourBytes) // 32 byte block hash - buffer.Write(consensus.blockHash) + buffer.Write(consensus.blockHash[:]) // 2 byte leader id twoBytes := make([]byte, 2) @@ -207,10 +206,10 @@ func (consensus Consensus) constructChallengeMessage() []byte { buffer.Write(twoBytes) // 33 byte aggregated commit - buffer.Write(getAggregatedCommit()) + buffer.Write(getAggregatedCommit(consensus.commits)) // 33 byte aggregated key - buffer.Write(getAggregatedKey()) + buffer.Write(getAggregatedKey(consensus.commits)) // 32 byte challenge buffer.Write(getChallenge()) @@ -222,18 +221,30 @@ func (consensus Consensus) constructChallengeMessage() []byte { return consensus.ConstructConsensusMessage(CHALLENGE, buffer.Bytes()) } -// TODO: fill in this function -func getAggregatedCommit() []byte { - return make([]byte, 33) +func getAggregatedCommit(commits map[string]string) []byte { + // TODO: implement actual commit aggregation + var commitArray []string + for _, val := range commits { + commitArray = append(commitArray, val) + } + var commit [32]byte + commit = sha256.Sum256([]byte(strings.Join(commitArray, ""))) + return append(commit[:], byte(0)) } -// TODO: fill in this function -func getAggregatedKey() []byte { - return make([]byte, 33) +func getAggregatedKey(commits map[string]string) []byte { + // TODO: implement actual key aggregation + var commitArray []string + for key := range commits { + commitArray = append(commitArray, key) + } + var commit [32]byte + commit = sha256.Sum256([]byte(strings.Join(commitArray, ""))) + return append(commit[:], byte(0)) } -// TODO: fill in this function func getChallenge() []byte { + // TODO: implement actual challenge data return make([]byte, 32) } @@ -287,8 +298,11 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { // Set state to FINISHED consensus.state = FINISHED // TODO: do followups on the consensus - consensus.Logf("HOORAY!!! CONSENSUS REACHED AMONG %d NODES!!!\n", len(consensus.validators)) + + log.Printf("[Shard %d] HOORAY!!! CONSENSUS REACHED AMONG %d NODES WITH CONSENSUS ID %d!!!\n", consensus.ShardId, len(consensus.validators), consensus.consensusId) + consensus.ResetState() + consensus.consensusId++ consensus.ReadySignal <- 1 } // TODO: composes new block and broadcast the new block to validators diff --git a/consensus/consensus_leader_test.go b/consensus/consensus_leader_test.go new file mode 100644 index 000000000..f500c8d25 --- /dev/null +++ b/consensus/consensus_leader_test.go @@ -0,0 +1,34 @@ +package consensus + +import ( + "harmony-benchmark/p2p" + "testing" +) + +func TestConstructAnnounceMessage(test *testing.T) { + header := getBlockHeader() + leader := p2p.Peer{Ip: "1", Port: "2"} + validator := p2p.Peer{Ip: "3", Port: "5"} + consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus.blockHash = getBlockHash(make([]byte, 10)) + msg, err := consensus.constructAnnounceMessage() + + if err != nil { + test.Error("Annouce message is not constructed successfully") + } + if len(msg) != 1+1+1+4+32+2+4+64+len(header) { + test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg)) + } +} + +func TestConstructChallengeMessage(test *testing.T) { + leader := p2p.Peer{Ip: "1", Port: "2"} + validator := p2p.Peer{Ip: "3", Port: "5"} + consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus.blockHash = getBlockHash(make([]byte, 10)) + msg := consensus.constructChallengeMessage() + + if len(msg) != 1+1+1+4+32+2+33+33+32+64 { + test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg)) + } +} diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go new file mode 100644 index 000000000..fba6c55c1 --- /dev/null +++ b/consensus/consensus_test.go @@ -0,0 +1,36 @@ +package consensus + +import ( + "harmony-benchmark/common" + "harmony-benchmark/p2p" + "testing" +) + +func TestNewConsensus(test *testing.T) { + leader := p2p.Peer{Ip: "1", Port: "2"} + validator := p2p.Peer{Ip: "3", Port: "5"} + consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + if consensus.consensusId != 0 { + test.Errorf("Consensus Id is initialized to the wrong value: %d", consensus.consensusId) + } + + if consensus.IsLeader != true { + test.Error("Consensus should belong to a leader") + } + + if consensus.ReadySignal == nil { + test.Error("Consensus ReadySignal should be initialized") + } + + if consensus.actionType != byte(common.CONSENSUS) { + test.Error("Consensus actionType should be CONSENSUS") + } + + if consensus.msgCategory != byte(common.COMMITTEE) { + test.Error("Consensus msgCategory should be COMMITTEE") + } + + if consensus.leader != leader { + test.Error("Consensus Leader is set to wrong Peer") + } +} diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 81f75a5fe..afa21c14d 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -71,9 +71,12 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { _ = blockHeaderSize _ = signature - consensus.blockHash = blockHash + copy(blockHash[:32], consensus.blockHash[:]) // verify block data - + if consensusId != consensus.consensusId { + log.Printf("Received message with consensus Id: %d. My consensus Id: %d\n", consensusId, consensus.consensusId) + return + } // sign block // TODO: return the signature(commit) to leader @@ -95,7 +98,7 @@ func (consensus Consensus) constructCommitMessage() []byte { buffer.Write(fourBytes) // 32 byte block hash - buffer.Write(consensus.blockHash) + buffer.Write(consensus.blockHash[:]) // 2 byte validator id twoBytes := make([]byte, 2) @@ -160,6 +163,10 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) { _ = signature // verify block data and the aggregated signatures + if consensusId != consensus.consensusId { + log.Printf("Received message with consensus Id: %d. My consensus Id: %d\n", consensusId, consensus.consensusId) + return + } // sign the message @@ -170,6 +177,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) { // Set state to RESPONSE_DONE consensus.state = RESPONSE_DONE + consensus.consensusId++ } // Construct the response message to send to leader (assumption the consensus data is already verified) @@ -182,7 +190,7 @@ func (consensus Consensus) constructResponseMessage() []byte { buffer.Write(fourBytes) // 32 byte block hash - buffer.Write(consensus.blockHash) + buffer.Write(consensus.blockHash[:32]) // 2 byte validator id twoBytes := make([]byte, 2) diff --git a/consensus/consensus_validator_test.go b/consensus/consensus_validator_test.go new file mode 100644 index 000000000..a8e2b0e7e --- /dev/null +++ b/consensus/consensus_validator_test.go @@ -0,0 +1,30 @@ +package consensus + +import ( + "harmony-benchmark/p2p" + "testing" +) + +func TestConstructCommitMessage(test *testing.T) { + leader := p2p.Peer{Ip: "1", Port: "2"} + validator := p2p.Peer{Ip: "3", Port: "5"} + consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus.blockHash = getBlockHash(make([]byte, 10)) + msg := consensus.constructCommitMessage() + + if len(msg) != 1+1+1+4+32+2+33+64 { + test.Errorf("Commit message is not constructed in the correct size: %d", len(msg)) + } +} + +func TestConstructResponseMessage(test *testing.T) { + leader := p2p.Peer{Ip: "1", Port: "2"} + validator := p2p.Peer{Ip: "3", Port: "5"} + consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus.blockHash = getBlockHash(make([]byte, 10)) + msg := consensus.constructResponseMessage() + + if len(msg) != 1+1+1+4+32+2+32+64 { + test.Errorf("Response message is not constructed in the correct size: %d", len(msg)) + } +} diff --git a/deploy.sh b/deploy.sh index ed2139748..71148722f 100755 --- a/deploy.sh +++ b/deploy.sh @@ -1,9 +1,9 @@ ./kill_node.sh -ipfile=$1 +config=$1 while IFS='' read -r line || [[ -n "$line" ]]; do IFS=' ' read ip port mode <<< $line - #echo $ip $port $mode $ipfile - go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile& -done < $ipfile + #echo $ip $port $mode $config + go run ./benchmark_main.go -ip $ip -port $port -config_file $config& +done < $config -go run ./aws-code/transaction_generator.go -ipfile $ipfile \ No newline at end of file +go run ./aws-code/transaction_generator.go -config_file $config \ No newline at end of file diff --git a/deploy_linux.sh b/deploy_linux.sh index 655a1cad3..d6841a92d 100755 --- a/deploy_linux.sh +++ b/deploy_linux.sh @@ -14,9 +14,9 @@ echo "Inside deploy linux" echo $GOPATH echo "Inside deploy linux line 2" -ipfile=$1 +config=$1 while read ip port mode; do - #echo $ip $port $mode $ipfile - go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile& -done < $ipfile -go run ./aws-code/transaction_generator.go -ipfile $ipfile + #echo $ip $port $mode $config + go run ./benchmark_main.go -ip $ip -port $port -config_file $config& +done < $config +go run ./aws-code/transaction_generator.go -config_file $config diff --git a/local_config.txt b/local_config.txt new file mode 100644 index 000000000..e17db776d --- /dev/null +++ b/local_config.txt @@ -0,0 +1,101 @@ +127.0.0.1 9001 validator 0 +127.0.0.1 9002 validator 0 +127.0.0.1 9003 validator 0 +127.0.0.1 9004 validator 0 +127.0.0.1 9005 validator 0 +127.0.0.1 9006 validator 0 +127.0.0.1 9007 validator 0 +127.0.0.1 9008 validator 0 +127.0.0.1 9009 validator 0 +127.0.0.1 9010 validator 0 +127.0.0.1 9011 validator 0 +127.0.0.1 9012 validator 0 +127.0.0.1 9013 validator 0 +127.0.0.1 9014 validator 0 +127.0.0.1 9015 validator 0 +127.0.0.1 9016 validator 0 +127.0.0.1 9017 validator 0 +127.0.0.1 9018 validator 0 +127.0.0.1 9019 validator 0 +127.0.0.1 9020 validator 0 +127.0.0.1 9021 validator 0 +127.0.0.1 9022 validator 0 +127.0.0.1 9023 validator 0 +127.0.0.1 9024 validator 0 +127.0.0.1 9025 validator 0 +127.0.0.1 9026 validator 0 +127.0.0.1 9027 validator 0 +127.0.0.1 9028 validator 0 +127.0.0.1 9029 validator 0 +127.0.0.1 9030 validator 0 +127.0.0.1 9031 validator 0 +127.0.0.1 9032 validator 0 +127.0.0.1 9033 validator 0 +127.0.0.1 9034 validator 0 +127.0.0.1 9035 validator 0 +127.0.0.1 9036 validator 0 +127.0.0.1 9037 validator 0 +127.0.0.1 9038 validator 0 +127.0.0.1 9039 validator 0 +127.0.0.1 9040 validator 0 +127.0.0.1 9041 validator 0 +127.0.0.1 9042 validator 0 +127.0.0.1 9043 validator 0 +127.0.0.1 9044 validator 0 +127.0.0.1 9045 validator 0 +127.0.0.1 9046 validator 0 +127.0.0.1 9047 validator 0 +127.0.0.1 9048 validator 0 +127.0.0.1 9049 validator 0 +127.0.0.1 9050 validator 0 +127.0.0.1 9051 validator 0 +127.0.0.1 9052 validator 0 +127.0.0.1 9053 validator 0 +127.0.0.1 9054 validator 0 +127.0.0.1 9055 validator 0 +127.0.0.1 9056 validator 0 +127.0.0.1 9057 validator 0 +127.0.0.1 9058 validator 0 +127.0.0.1 9059 validator 0 +127.0.0.1 9060 validator 0 +127.0.0.1 9061 validator 0 +127.0.0.1 9062 validator 0 +127.0.0.1 9063 validator 0 +127.0.0.1 9064 validator 0 +127.0.0.1 9065 validator 0 +127.0.0.1 9066 validator 0 +127.0.0.1 9067 validator 0 +127.0.0.1 9068 validator 0 +127.0.0.1 9069 validator 0 +127.0.0.1 9070 validator 0 +127.0.0.1 9071 validator 0 +127.0.0.1 9072 validator 0 +127.0.0.1 9073 validator 0 +127.0.0.1 9074 validator 0 +127.0.0.1 9075 validator 0 +127.0.0.1 9076 validator 0 +127.0.0.1 9077 validator 0 +127.0.0.1 9078 validator 0 +127.0.0.1 9079 validator 0 +127.0.0.1 9080 validator 0 +127.0.0.1 9081 validator 0 +127.0.0.1 9082 validator 0 +127.0.0.1 9083 validator 0 +127.0.0.1 9084 validator 0 +127.0.0.1 9085 validator 0 +127.0.0.1 9086 validator 0 +127.0.0.1 9087 validator 0 +127.0.0.1 9088 validator 0 +127.0.0.1 9089 validator 0 +127.0.0.1 9090 validator 0 +127.0.0.1 9091 validator 0 +127.0.0.1 9092 validator 0 +127.0.0.1 9093 validator 0 +127.0.0.1 9094 validator 0 +127.0.0.1 9095 validator 0 +127.0.0.1 9096 validator 0 +127.0.0.1 9097 validator 0 +127.0.0.1 9098 validator 0 +127.0.0.1 9099 validator 0 +127.0.0.1 9100 validator 0 +127.0.0.1 9000 leader 0 diff --git a/local_config2.txt b/local_config2.txt new file mode 100644 index 000000000..2cf8e2b7f --- /dev/null +++ b/local_config2.txt @@ -0,0 +1,11 @@ +127.0.0.1 9001 validator 0 +127.0.0.1 9002 validator 0 +127.0.0.1 9003 validator 0 +127.0.0.1 9004 validator 0 +127.0.0.1 9005 validator 0 +127.0.0.1 9006 validator 0 +127.0.0.1 9007 validator 0 +127.0.0.1 9008 validator 0 +127.0.0.1 9009 validator 0 +127.0.0.1 9010 validator 0 +127.0.0.1 9000 leader 0 diff --git a/local_config_shards.txt b/local_config_shards.txt new file mode 100644 index 000000000..9d1c3c3ae --- /dev/null +++ b/local_config_shards.txt @@ -0,0 +1,22 @@ +127.0.0.1 9010 validator 0 +127.0.0.1 9011 validator 0 +127.0.0.1 9012 validator 0 +127.0.0.1 9013 validator 0 +127.0.0.1 9014 validator 0 +127.0.0.1 9015 validator 0 +127.0.0.1 9016 validator 0 +127.0.0.1 9017 validator 0 +127.0.0.1 9018 validator 0 +127.0.0.1 9019 validator 0 +127.0.0.1 9020 validator 1 +127.0.0.1 9021 validator 1 +127.0.0.1 9022 validator 1 +127.0.0.1 9023 validator 1 +127.0.0.1 9024 validator 1 +127.0.0.1 9025 validator 1 +127.0.0.1 9026 validator 1 +127.0.0.1 9027 validator 1 +127.0.0.1 9028 validator 1 +127.0.0.1 9029 validator 1 +127.0.0.1 9000 leader 0 +127.0.0.1 9001 leader 1 diff --git a/local_iplist.txt b/local_iplist.txt deleted file mode 100644 index 9f5d026fe..000000000 --- a/local_iplist.txt +++ /dev/null @@ -1,101 +0,0 @@ -127.0.0.1 9001 validator -127.0.0.1 9002 validator -127.0.0.1 9003 validator -127.0.0.1 9004 validator -127.0.0.1 9005 validator -127.0.0.1 9006 validator -127.0.0.1 9007 validator -127.0.0.1 9008 validator -127.0.0.1 9009 validator -127.0.0.1 9010 validator -127.0.0.1 9011 validator -127.0.0.1 9012 validator -127.0.0.1 9013 validator -127.0.0.1 9014 validator -127.0.0.1 9015 validator -127.0.0.1 9016 validator -127.0.0.1 9017 validator -127.0.0.1 9018 validator -127.0.0.1 9019 validator -127.0.0.1 9020 validator -127.0.0.1 9021 validator -127.0.0.1 9022 validator -127.0.0.1 9023 validator -127.0.0.1 9024 validator -127.0.0.1 9025 validator -127.0.0.1 9026 validator -127.0.0.1 9027 validator -127.0.0.1 9028 validator -127.0.0.1 9029 validator -127.0.0.1 9030 validator -127.0.0.1 9031 validator -127.0.0.1 9032 validator -127.0.0.1 9033 validator -127.0.0.1 9034 validator -127.0.0.1 9035 validator -127.0.0.1 9036 validator -127.0.0.1 9037 validator -127.0.0.1 9038 validator -127.0.0.1 9039 validator -127.0.0.1 9040 validator -127.0.0.1 9041 validator -127.0.0.1 9042 validator -127.0.0.1 9043 validator -127.0.0.1 9044 validator -127.0.0.1 9045 validator -127.0.0.1 9046 validator -127.0.0.1 9047 validator -127.0.0.1 9048 validator -127.0.0.1 9049 validator -127.0.0.1 9050 validator -127.0.0.1 9051 validator -127.0.0.1 9052 validator -127.0.0.1 9053 validator -127.0.0.1 9054 validator -127.0.0.1 9055 validator -127.0.0.1 9056 validator -127.0.0.1 9057 validator -127.0.0.1 9058 validator -127.0.0.1 9059 validator -127.0.0.1 9060 validator -127.0.0.1 9061 validator -127.0.0.1 9062 validator -127.0.0.1 9063 validator -127.0.0.1 9064 validator -127.0.0.1 9065 validator -127.0.0.1 9066 validator -127.0.0.1 9067 validator -127.0.0.1 9068 validator -127.0.0.1 9069 validator -127.0.0.1 9070 validator -127.0.0.1 9071 validator -127.0.0.1 9072 validator -127.0.0.1 9073 validator -127.0.0.1 9074 validator -127.0.0.1 9075 validator -127.0.0.1 9076 validator -127.0.0.1 9077 validator -127.0.0.1 9078 validator -127.0.0.1 9079 validator -127.0.0.1 9080 validator -127.0.0.1 9081 validator -127.0.0.1 9082 validator -127.0.0.1 9083 validator -127.0.0.1 9084 validator -127.0.0.1 9085 validator -127.0.0.1 9086 validator -127.0.0.1 9087 validator -127.0.0.1 9088 validator -127.0.0.1 9089 validator -127.0.0.1 9090 validator -127.0.0.1 9091 validator -127.0.0.1 9092 validator -127.0.0.1 9093 validator -127.0.0.1 9094 validator -127.0.0.1 9095 validator -127.0.0.1 9096 validator -127.0.0.1 9097 validator -127.0.0.1 9098 validator -127.0.0.1 9099 validator -127.0.0.1 9100 validator -127.0.0.1 9000 leader diff --git a/local_iplist2.txt b/local_iplist2.txt deleted file mode 100644 index 6da5dfc4c..000000000 --- a/local_iplist2.txt +++ /dev/null @@ -1,11 +0,0 @@ -127.0.0.1 9001 validator -127.0.0.1 9002 validator -127.0.0.1 9003 validator -127.0.0.1 9004 validator -127.0.0.1 9005 validator -127.0.0.1 9006 validator -127.0.0.1 9007 validator -127.0.0.1 9008 validator -127.0.0.1 9009 validator -127.0.0.1 9010 validator -127.0.0.1 9000 leader diff --git a/node/message.go b/node/message.go index 0a071bb31..3910eb85c 100644 --- a/node/message.go +++ b/node/message.go @@ -4,35 +4,47 @@ import ( "bytes" "encoding/gob" "harmony-benchmark/blockchain" - "harmony-benchmark/message" + "harmony-benchmark/common" ) +// The types of messages used for NODE/TRANSACTION type TransactionMessageType int - const ( SEND TransactionMessageType = iota + REQUEST ) +// The types of messages used for NODE/CONTROL type ControlMessageType int - const ( STOP ControlMessageType = iota ) //ConstructTransactionListMessage constructs serialized transactions func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)}) - byteBuffer.WriteByte(byte(message.TRANSACTION)) + byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) + byteBuffer.WriteByte(byte(common.TRANSACTION)) byteBuffer.WriteByte(byte(SEND)) encoder := gob.NewEncoder(byteBuffer) encoder.Encode(transactions) return byteBuffer.Bytes() } +//ConstructTransactionListMessage constructs serialized transactions +func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) + byteBuffer.WriteByte(byte(common.TRANSACTION)) + byteBuffer.WriteByte(byte(REQUEST)) + for _, txId := range transactionIds { + byteBuffer.Write(txId) + } + return byteBuffer.Bytes() +} + //ConstructStopMessage is STOP message func ConstructStopMessage() []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)}) - byteBuffer.WriteByte(byte(message.CONTROL)) + byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) + byteBuffer.WriteByte(byte(common.CONTROL)) byteBuffer.WriteByte(byte(STOP)) return byteBuffer.Bytes() } diff --git a/node/node.go b/node/node.go index f8b06beef..7e6cc2861 100644 --- a/node/node.go +++ b/node/node.go @@ -1,16 +1,16 @@ package node import ( - "bytes" - "encoding/gob" "harmony-benchmark/blockchain" "harmony-benchmark/consensus" - "harmony-benchmark/message" + "harmony-benchmark/common" "harmony-benchmark/p2p" "log" "net" "os" "time" + "bytes" + "encoding/gob" ) // A node represents a program (machine) participating in the network @@ -56,49 +56,41 @@ func (node *Node) NodeHandler(conn net.Conn) { return } - msgCategory, err := message.GetMessageCategory(content) + msgCategory, err := common.GetMessageCategory(content) if err != nil { node.Logf("Read node type failed:%s", err) return } - msgType, err := message.GetMessageType(content) + msgType, err := common.GetMessageType(content) if err != nil { node.Logf("Read action type failed:%s", err) return } - msgPayload, err := message.GetMessagePayload(content) + msgPayload, err := common.GetMessagePayload(content) if err != nil { node.Logf("Read message payload failed:%s", err) return } switch msgCategory { - case message.COMMITTEE: - actionType := message.CommitteeMessageType(msgType) + case common.COMMITTEE: + actionType := common.CommitteeMessageType(msgType) switch actionType { - case message.CONSENSUS: + case common.CONSENSUS: if consensus.IsLeader { consensus.ProcessMessageLeader(msgPayload) } else { consensus.ProcessMessageValidator(msgPayload) } } - case message.NODE: - actionType := message.NodeMessageType(msgType) + case common.NODE: + actionType := common.NodeMessageType(msgType) switch actionType { - case message.TRANSACTION: - txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type - - txList := new([]blockchain.Transaction) - err := txDecoder.Decode(&txList) - if err != nil { - node.Logln("Failed deserializing transaction list") - } - node.pendingTransactions = append(node.pendingTransactions, *txList...) - node.Logln(len(node.pendingTransactions)) - case message.CONTROL: + case common.TRANSACTION: + node.transactionMessageHandler(msgPayload) + case common.CONTROL: controlType := msgPayload[0] if ControlMessageType(controlType) == STOP { node.Logln("Stopping Node") @@ -109,6 +101,52 @@ func (node *Node) NodeHandler(conn net.Conn) { } } +func (node *Node) transactionMessageHandler(msgPayload []byte) { + txMessageType := TransactionMessageType(msgPayload[0]) + + switch txMessageType { + case SEND: + txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type + + txList := new([]blockchain.Transaction) + err := txDecoder.Decode(&txList) + if err != nil { + log.Println("Failed deserializing transaction list") + } + node.pendingTransactions = append(node.pendingTransactions, *txList...) + case REQUEST: + reader := bytes.NewBuffer(msgPayload[1:]) + var txIds map[[32]byte]bool + txId := make([]byte, 32) // 32 byte hash Id + for { + _, err := reader.Read(txId) + if err != nil { + break + } + + txIds[getFixedByteTxId(txId)] = true + } + + var txToReturn []blockchain.Transaction + for _, tx := range node.pendingTransactions { + if txIds[getFixedByteTxId(tx.ID)] { + txToReturn = append(txToReturn, tx) + } + } + + // TODO: return the transaction list to requester + } +} + +// Copy the txId byte slice over to 32 byte array so the map can key on it +func getFixedByteTxId(txId []byte) [32]byte { + var id [32]byte + for i := range id { + id[i] = txId[i] + } + return id +} + func (node *Node) WaitForConsensusReady(readySignal chan int) { for { // keep waiting for consensus ready <-readySignal diff --git a/p2p/message.go b/p2p/message.go index 6b8766eec..e04e4876b 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -48,7 +48,6 @@ func ReadMessageContent(conn net.Conn) ([]byte, error) { } // TODO: check on msgType and take actions accordingly - //// Read 4 bytes for message size fourBytes := make([]byte, 4) n, err := r.Read(fourBytes)