From 855ed6d603cf5359a63f72f187a26d7986c0a804 Mon Sep 17 00:00:00 2001 From: alok Date: Thu, 14 Jun 2018 22:49:57 -0700 Subject: [PATCH 1/2] WIP: adding message to stop running consensus --- aws-code/transaction_generator.go | 17 ++++++++++++----- consensus/consensus_leader.go | 24 +++++++++++++----------- consensus/message.go | 7 +++++-- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/aws-code/transaction_generator.go b/aws-code/transaction_generator.go index 3877439a7..69829338e 100644 --- a/aws-code/transaction_generator.go +++ b/aws-code/transaction_generator.go @@ -1,12 +1,12 @@ package main import ( - "harmony-benchmark/blockchain" - "math/rand" - "time" "flag" + "harmony-benchmark/blockchain" "harmony-benchmark/node" "harmony-benchmark/p2p" + "math/rand" + "time" ) func newRandTransaction() blockchain.Transaction { @@ -22,15 +22,22 @@ func main() { ip := flag.String("ip", "127.0.0.1", "IP of the leader") port := flag.String("port", "9000", "port of the leader.") - + start := time.Now() + totalTime = 30000 txs := make([]blockchain.Transaction, 10) for true { + t = time.Now() + if t.Sub(start) >= totalTime { + break + } for i := range txs { txs[i] = newRandTransaction() } msg := node.ConstructTransactionListMessage(txs) p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg) - time.Sleep(1 * time.Second) // 10 transactions per second + time.Sleep(1 * time.Second) // 10 transactions per second } + msg := node.ConstructTransactionListMessage(txs) + p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg) } diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index b87763b8d..62c830dcb 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -4,11 +4,11 @@ import ( "log" "sync" - "harmony-benchmark/p2p" "bytes" "encoding/binary" "errors" "fmt" + "harmony-benchmark/p2p" ) var mutex = &sync.Mutex{} @@ -38,6 +38,8 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) { consensus.processResponseMessage(payload) case START_CONSENSUS: consensus.processStartConsensusMessage(msg) + case STOP_CONSENSUS: + consensus.stopConsensusMessage(msg) default: log.Println("Unexpected message type: %s", msgType) } @@ -119,23 +121,23 @@ func (consensus *Consensus) processCommitMessage(payload []byte) { //#### Read payload data offset := 0 // 4 byte consensus id - consensusId := binary.BigEndian.Uint32(payload[offset:offset+4]) + consensusId := binary.BigEndian.Uint32(payload[offset : offset+4]) offset += 4 // 32 byte block hash - blockHash := payload[offset:offset+32] + blockHash := payload[offset : offset+32] offset += 32 // 2 byte validator id - validatorId := string(payload[offset:offset+2]) + validatorId := string(payload[offset : offset+2]) offset += 2 // 33 byte commit - commit := payload[offset:offset+33] + commit := payload[offset : offset+33] offset += 33 // 64 byte of signature on previous data - signature := payload[offset:offset+64] + signature := payload[offset : offset+64] offset += 64 //#### END: Read payload data @@ -225,23 +227,23 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { //#### Read payload data offset := 0 // 4 byte consensus id - consensusId := binary.BigEndian.Uint32(payload[offset:offset+4]) + consensusId := binary.BigEndian.Uint32(payload[offset : offset+4]) offset += 4 // 32 byte block hash - blockHash := payload[offset:offset+32] + blockHash := payload[offset : offset+32] offset += 32 // 2 byte validator id - validatorId := string(payload[offset:offset+2]) + validatorId := string(payload[offset : offset+2]) offset += 2 // 32 byte response - response := payload[offset:offset+32] + response := payload[offset : offset+32] offset += 32 // 64 byte of signature on previous data - signature := payload[offset:offset+64] + signature := payload[offset : offset+64] offset += 64 //#### END: Read payload data diff --git a/consensus/message.go b/consensus/message.go index 26c4e9863..878117b5e 100644 --- a/consensus/message.go +++ b/consensus/message.go @@ -78,6 +78,7 @@ const ( CHALLENGE RESPONSE START_CONSENSUS + STOP_CONSENSUS ) // Returns string name for the MessageType enum @@ -87,9 +88,11 @@ func (msgType MessageType) String() string { "COMMIT", "CHALLENGE", "RESPONSE", - "START_CONSENSUS"} + "START_CONSENSUS" + "STOP_CONSENSUS" + } - if msgType < ANNOUNCE || msgType > START_CONSENSUS { + if msgType < ANNOUNCE || msgType > STOP_CONSENSUS { return "Unknown" } return names[msgType] From 55113dc0a9cfdc7447c0386a8c4c570ee5a13ff6 Mon Sep 17 00:00:00 2001 From: alok Date: Fri, 15 Jun 2018 20:34:37 -0700 Subject: [PATCH 2/2] killing consensus --- aws-code/transaction_generator.go | 36 ++++++++++++++++++++++++++----- consensus/consensus_leader.go | 2 -- consensus/message.go | 6 ++---- deploy.sh | 2 +- message/message.go | 9 ++++---- node/message.go | 21 +++++++++++++++--- node/node.go | 23 +++++++++++++------- 7 files changed, 72 insertions(+), 27 deletions(-) diff --git a/aws-code/transaction_generator.go b/aws-code/transaction_generator.go index 69829338e..2cb8de967 100644 --- a/aws-code/transaction_generator.go +++ b/aws-code/transaction_generator.go @@ -1,11 +1,15 @@ package main import ( + "bufio" "flag" + "fmt" "harmony-benchmark/blockchain" "harmony-benchmark/node" "harmony-benchmark/p2p" "math/rand" + "os" + "strings" "time" ) @@ -18,16 +22,34 @@ func newRandTransaction() blockchain.Transaction { return tx } +func getPeers(Ip, Port, iplist string) []p2p.Peer { + file, _ := os.Open(iplist) + fscanner := bufio.NewScanner(file) + var peerList []p2p.Peer + for fscanner.Scan() { + p := strings.Split(fscanner.Text(), " ") + ip, port, status := p[0], p[1], p[2] + if status == "leader" || ip == Ip && port == Port { + continue + } + peer := p2p.Peer{Port: port, Ip: ip} + peerList = append(peerList, peer) + } + return peerList +} func main() { ip := flag.String("ip", "127.0.0.1", "IP of the leader") port := flag.String("port", "9000", "port of the leader.") + ipfile := flag.String("ipfile", "local_iplist.txt", "file containing all ip addresses") + //getLeader to get ip,port and get totaltime I want to run start := time.Now() - totalTime = 30000 + totalTime := 60.0 txs := make([]blockchain.Transaction, 10) for true { - t = time.Now() - if t.Sub(start) >= totalTime { + t := time.Now() + if t.Sub(start).Seconds() >= totalTime { + fmt.Println(int(t.Sub(start)), start, totalTime) break } for i := range txs { @@ -38,6 +60,10 @@ func main() { p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg) time.Sleep(1 * time.Second) // 10 transactions per second } - msg := node.ConstructTransactionListMessage(txs) - p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg) + msg := node.ConstructStopMessage() + var leaderPeer p2p.Peer + leaderPeer.Ip = *ip + leaderPeer.Port = *port + peers := append(getPeers(*ip, *port, *ipfile), leaderPeer) + p2p.BroadcastMessage(peers, msg) } diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 62c830dcb..fc3c24124 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -38,8 +38,6 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) { consensus.processResponseMessage(payload) case START_CONSENSUS: consensus.processStartConsensusMessage(msg) - case STOP_CONSENSUS: - consensus.stopConsensusMessage(msg) default: log.Println("Unexpected message type: %s", msgType) } diff --git a/consensus/message.go b/consensus/message.go index 878117b5e..21003be81 100644 --- a/consensus/message.go +++ b/consensus/message.go @@ -78,7 +78,6 @@ const ( CHALLENGE RESPONSE START_CONSENSUS - STOP_CONSENSUS ) // Returns string name for the MessageType enum @@ -88,11 +87,10 @@ func (msgType MessageType) String() string { "COMMIT", "CHALLENGE", "RESPONSE", - "START_CONSENSUS" - "STOP_CONSENSUS" + "START_CONSENSUS", } - if msgType < ANNOUNCE || msgType > STOP_CONSENSUS { + if msgType < ANNOUNCE || msgType > START_CONSENSUS { return "Unknown" } return names[msgType] diff --git a/deploy.sh b/deploy.sh index 0db1f2ee4..0c889daf5 100755 --- a/deploy.sh +++ b/deploy.sh @@ -5,4 +5,4 @@ while read ip port mode; do go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile& done < $ipfile -go run ./aws-code/transaction_generator.go \ No newline at end of file +go run ./aws-code/transaction_generator.go -ipfile $ipfile \ No newline at end of file diff --git a/message/message.go b/message/message.go index db4696263..6e2be292a 100644 --- a/message/message.go +++ b/message/message.go @@ -25,9 +25,9 @@ n - 2 bytes - actual message payload const NODE_TYPE_BYTES = 1 const ACTION_TYPE_BYTES = 1 - // The category of messages type MessageCategory byte + const ( COMMITTEE MessageCategory = iota NODE @@ -36,20 +36,21 @@ const ( // The specific types of message under committee category type CommitteeMessageType byte + const ( CONSENSUS CommitteeMessageType = iota // TODO: add more types ) - // The specific types of message under node category type NodeMessageType byte + const ( TRANSACTION NodeMessageType = iota + CONTROL // TODO: add more types ) - // Get the message category from the p2p message content func GetMessageCategory(message []byte) (MessageCategory, error) { if len(message) < NODE_TYPE_BYTES { @@ -60,7 +61,7 @@ func GetMessageCategory(message []byte) (MessageCategory, error) { // Get the action type from the p2p message content func GetMessageType(message []byte) (byte, error) { - if len(message) < NODE_TYPE_BYTES + ACTION_TYPE_BYTES { + if len(message) < NODE_TYPE_BYTES+ACTION_TYPE_BYTES { return 0, errors.New("Failed to get action type: no data available.") } return byte(message[NODE_TYPE_BYTES+ACTION_TYPE_BYTES-1]), nil diff --git a/node/message.go b/node/message.go index fafb99e6d..0a071bb31 100644 --- a/node/message.go +++ b/node/message.go @@ -1,10 +1,10 @@ package node import ( - "harmony-benchmark/blockchain" "bytes" - "harmony-benchmark/message" "encoding/gob" + "harmony-benchmark/blockchain" + "harmony-benchmark/message" ) type TransactionMessageType int @@ -13,6 +13,13 @@ const ( SEND TransactionMessageType = iota ) +type ControlMessageType int + +const ( + STOP ControlMessageType = iota +) + +//ConstructTransactionListMessage constructs serialized transactions func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)}) byteBuffer.WriteByte(byte(message.TRANSACTION)) @@ -20,4 +27,12 @@ func ConstructTransactionListMessage(transactions []blockchain.Transaction) []by encoder := gob.NewEncoder(byteBuffer) encoder.Encode(transactions) return byteBuffer.Bytes() -} \ No newline at end of file +} + +//ConstructStopMessage is STOP message +func ConstructStopMessage() []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)}) + byteBuffer.WriteByte(byte(message.CONTROL)) + byteBuffer.WriteByte(byte(STOP)) + return byteBuffer.Bytes() +} diff --git a/node/node.go b/node/node.go index d3d96cc28..4f68608a4 100644 --- a/node/node.go +++ b/node/node.go @@ -1,20 +1,20 @@ package node import ( + "bytes" + "encoding/gob" + "harmony-benchmark/blockchain" + "harmony-benchmark/consensus" + "harmony-benchmark/message" + "harmony-benchmark/p2p" "log" "net" "os" - "harmony-benchmark/p2p" - "harmony-benchmark/consensus" - "harmony-benchmark/message" - "harmony-benchmark/blockchain" - "bytes" - "encoding/gob" ) // A node represents a program (machine) participating in the network type Node struct { - consensus *consensus.Consensus + consensus *consensus.Consensus pendingTransactions []blockchain.Transaction } @@ -112,6 +112,13 @@ func (node *Node) NodeHandler(conn net.Conn) { } node.pendingTransactions = append(node.pendingTransactions, *txList...) log.Println(len(node.pendingTransactions)) + case message.CONTROL: + controlType := msgPayload[0] + if ControlMessageType(controlType) == STOP { + log.Println("Stopping Node") + os.Exit(0) + } + } } } @@ -121,4 +128,4 @@ func NewNode(consensus *consensus.Consensus) Node { node := Node{} node.consensus = consensus return node -} \ No newline at end of file +}