Merge pull request #3 from simple-rules/stopConsensus

adding message to stop running consensus
pull/5/head
alajko 7 years ago committed by GitHub
commit b68be08197
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      aws-code/transaction_generator.go
  2. 25
      consensus/consensus_leader.go
  3. 3
      consensus/message.go
  4. 2
      deploy.sh
  5. 9
      message/message.go
  6. 21
      node/message.go
  7. 27
      node/node.go

@ -1,12 +1,16 @@
package main package main
import ( import (
"harmony-benchmark/blockchain" "bufio"
"math/rand"
"time"
"flag" "flag"
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/node" "harmony-benchmark/node"
"harmony-benchmark/p2p" "harmony-benchmark/p2p"
"math/rand"
"os"
"strings"
"time"
) )
func newRandTransaction() blockchain.Transaction { func newRandTransaction() blockchain.Transaction {
@ -18,16 +22,35 @@ func newRandTransaction() blockchain.Transaction {
return tx return tx
} }
func getPeers(Ip, Port, iplist string) []p2p.Peer {
file, _ := os.Open(iplist)
fscanner := bufio.NewScanner(file)
var peerList []p2p.Peer
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
ip, port, status := p[0], p[1], p[2]
if status == "leader" || ip == Ip && port == Port {
continue
}
peer := p2p.Peer{Port: port, Ip: ip}
peerList = append(peerList, peer)
}
return peerList
}
func main() { func main() {
ip := flag.String("ip", "127.0.0.1", "IP of the leader") ip := flag.String("ip", "127.0.0.1", "IP of the leader")
port := flag.String("port", "9000", "port of the leader.") port := flag.String("port", "9000", "port of the leader.")
txToSend := flag.Int("tx_count", 100, "number of transaction") ipfile := flag.String("ipfile", "local_iplist.txt", "file containing all ip addresses")
//getLeader to get ip,port and get totaltime I want to run
start := time.Now()
totalTime := 60.0
txs := make([]blockchain.Transaction, 10) txs := make([]blockchain.Transaction, 10)
txCount := 0 txCount := 0
for true { for true {
if txCount >= *txToSend { t := time.Now()
if t.Sub(start).Seconds() >= totalTime {
fmt.Println(int(t.Sub(start)), start, totalTime)
break break
} }
for i := range txs { for i := range txs {
@ -36,7 +59,12 @@ func main() {
} }
msg := node.ConstructTransactionListMessage(txs) msg := node.ConstructTransactionListMessage(txs)
p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg) p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg)
txCount += len(txs) time.Sleep(1 * time.Second) // 10 transactions per second
time.Sleep(1 * time.Second) // 10 transactions per second
} }
msg := node.ConstructStopMessage()
var leaderPeer p2p.Peer
leaderPeer.Ip = *ip
leaderPeer.Port = *port
peers := append(getPeers(*ip, *port, *ipfile), leaderPeer)
p2p.BroadcastMessage(peers, msg)
} }

@ -4,20 +4,19 @@ import (
"log" "log"
"sync" "sync"
"harmony-benchmark/p2p"
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"harmony-benchmark/blockchain" "harmony-benchmark/blockchain"
"harmony-benchmark/p2p"
) )
var mutex = &sync.Mutex{} var mutex = &sync.Mutex{}
func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) { func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) {
for { // keep waiting for new blocks for { // keep waiting for new blocks
newBlock := <- blockChannel newBlock := <-blockChannel
// TODO: think about potential race condition // TODO: think about potential race condition
if consensus.state == READY { if consensus.state == READY {
consensus.startConsensus(&newBlock) consensus.startConsensus(&newBlock)
@ -134,23 +133,23 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
//#### Read payload data //#### Read payload data
offset := 0 offset := 0
// 4 byte consensus id // 4 byte consensus id
consensusId := binary.BigEndian.Uint32(payload[offset:offset+4]) consensusId := binary.BigEndian.Uint32(payload[offset : offset+4])
offset += 4 offset += 4
// 32 byte block hash // 32 byte block hash
blockHash := payload[offset:offset+32] blockHash := payload[offset : offset+32]
offset += 32 offset += 32
// 2 byte validator id // 2 byte validator id
validatorId := string(payload[offset:offset+2]) validatorId := string(payload[offset : offset+2])
offset += 2 offset += 2
// 33 byte commit // 33 byte commit
commit := payload[offset:offset+33] commit := payload[offset : offset+33]
offset += 33 offset += 33
// 64 byte of signature on previous data // 64 byte of signature on previous data
signature := payload[offset:offset+64] signature := payload[offset : offset+64]
offset += 64 offset += 64
//#### END: Read payload data //#### END: Read payload data
@ -240,23 +239,23 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
//#### Read payload data //#### Read payload data
offset := 0 offset := 0
// 4 byte consensus id // 4 byte consensus id
consensusId := binary.BigEndian.Uint32(payload[offset:offset+4]) consensusId := binary.BigEndian.Uint32(payload[offset : offset+4])
offset += 4 offset += 4
// 32 byte block hash // 32 byte block hash
blockHash := payload[offset:offset+32] blockHash := payload[offset : offset+32]
offset += 32 offset += 32
// 2 byte validator id // 2 byte validator id
validatorId := string(payload[offset:offset+2]) validatorId := string(payload[offset : offset+2])
offset += 2 offset += 2
// 32 byte response // 32 byte response
response := payload[offset:offset+32] response := payload[offset : offset+32]
offset += 32 offset += 32
// 64 byte of signature on previous data // 64 byte of signature on previous data
signature := payload[offset:offset+64] signature := payload[offset : offset+64]
offset += 64 offset += 64
//#### END: Read payload data //#### END: Read payload data

@ -87,7 +87,8 @@ func (msgType MessageType) String() string {
"COMMIT", "COMMIT",
"CHALLENGE", "CHALLENGE",
"RESPONSE", "RESPONSE",
"START_CONSENSUS"} "START_CONSENSUS",
}
if msgType < ANNOUNCE || msgType > START_CONSENSUS { if msgType < ANNOUNCE || msgType > START_CONSENSUS {
return "Unknown" return "Unknown"

@ -5,4 +5,4 @@ while read ip port mode; do
go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile& go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile&
done < $ipfile done < $ipfile
go run ./aws-code/transaction_generator.go go run ./aws-code/transaction_generator.go -ipfile $ipfile

@ -26,9 +26,9 @@ n - 2 bytes - actual message payload
const NODE_TYPE_BYTES = 1 const NODE_TYPE_BYTES = 1
const ACTION_TYPE_BYTES = 1 const ACTION_TYPE_BYTES = 1
// The category of messages // The category of messages
type MessageCategory byte type MessageCategory byte
const ( const (
COMMITTEE MessageCategory = iota COMMITTEE MessageCategory = iota
NODE NODE
@ -37,20 +37,21 @@ const (
// The specific types of message under committee category // The specific types of message under committee category
type CommitteeMessageType byte type CommitteeMessageType byte
const ( const (
CONSENSUS CommitteeMessageType = iota CONSENSUS CommitteeMessageType = iota
// TODO: add more types // TODO: add more types
) )
// The specific types of message under node category // The specific types of message under node category
type NodeMessageType byte type NodeMessageType byte
const ( const (
TRANSACTION NodeMessageType = iota TRANSACTION NodeMessageType = iota
CONTROL
// TODO: add more types // TODO: add more types
) )
// Get the message category from the p2p message content // Get the message category from the p2p message content
func GetMessageCategory(message []byte) (MessageCategory, error) { func GetMessageCategory(message []byte) (MessageCategory, error) {
if len(message) < NODE_TYPE_BYTES { if len(message) < NODE_TYPE_BYTES {
@ -61,7 +62,7 @@ func GetMessageCategory(message []byte) (MessageCategory, error) {
// Get the action type from the p2p message content // Get the action type from the p2p message content
func GetMessageType(message []byte) (byte, error) { func GetMessageType(message []byte) (byte, error) {
if len(message) < NODE_TYPE_BYTES + ACTION_TYPE_BYTES { if len(message) < NODE_TYPE_BYTES+ACTION_TYPE_BYTES {
return 0, errors.New("Failed to get action type: no data available.") return 0, errors.New("Failed to get action type: no data available.")
} }
return byte(message[NODE_TYPE_BYTES+ACTION_TYPE_BYTES-1]), nil return byte(message[NODE_TYPE_BYTES+ACTION_TYPE_BYTES-1]), nil

@ -1,10 +1,10 @@
package node package node
import ( import (
"harmony-benchmark/blockchain"
"bytes" "bytes"
"harmony-benchmark/message"
"encoding/gob" "encoding/gob"
"harmony-benchmark/blockchain"
"harmony-benchmark/message"
) )
type TransactionMessageType int type TransactionMessageType int
@ -13,6 +13,13 @@ const (
SEND TransactionMessageType = iota SEND TransactionMessageType = iota
) )
type ControlMessageType int
const (
STOP ControlMessageType = iota
)
//ConstructTransactionListMessage constructs serialized transactions
func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte { func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)})
byteBuffer.WriteByte(byte(message.TRANSACTION)) byteBuffer.WriteByte(byte(message.TRANSACTION))
@ -20,4 +27,12 @@ func ConstructTransactionListMessage(transactions []blockchain.Transaction) []by
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)
encoder.Encode(transactions) encoder.Encode(transactions)
return byteBuffer.Bytes() return byteBuffer.Bytes()
} }
//ConstructStopMessage is STOP message
func ConstructStopMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)})
byteBuffer.WriteByte(byte(message.CONTROL))
byteBuffer.WriteByte(byte(STOP))
return byteBuffer.Bytes()
}

@ -1,22 +1,23 @@
package node package node
import ( import (
"bytes"
"encoding/gob"
"harmony-benchmark/blockchain"
"harmony-benchmark/consensus"
"harmony-benchmark/message"
"harmony-benchmark/p2p"
"log" "log"
"net" "net"
"os" "os"
"harmony-benchmark/p2p"
"harmony-benchmark/consensus"
"harmony-benchmark/message"
"harmony-benchmark/blockchain"
"bytes"
"encoding/gob"
"time" "time"
) )
// A node represents a program (machine) participating in the network // A node represents a program (machine) participating in the network
type Node struct { type Node struct {
consensus *consensus.Consensus consensus *consensus.Consensus
BlockChannel chan blockchain.Block consensus *consensus.Consensus
BlockChannel chan blockchain.Block
pendingTransactions []blockchain.Transaction pendingTransactions []blockchain.Transaction
} }
@ -114,6 +115,12 @@ func (node *Node) NodeHandler(conn net.Conn) {
} }
node.pendingTransactions = append(node.pendingTransactions, *txList...) node.pendingTransactions = append(node.pendingTransactions, *txList...)
log.Println(len(node.pendingTransactions)) log.Println(len(node.pendingTransactions))
case message.CONTROL:
controlType := msgPayload[0]
if ControlMessageType(controlType) == STOP {
log.Println("Stopping Node")
os.Exit(0)
}
} }
} }
@ -121,7 +128,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
func (node *Node) WaitForConsensusReady(readySignal chan int) { func (node *Node) WaitForConsensusReady(readySignal chan int) {
for { // keep waiting for consensus ready for { // keep waiting for consensus ready
<- readySignal <-readySignal
// create a new block // create a new block
newBlock := new(blockchain.Block) newBlock := new(blockchain.Block)
for { for {
@ -149,4 +156,4 @@ func NewNode(consensus *consensus.Consensus) Node {
node.consensus = consensus node.consensus = consensus
node.BlockChannel = make(chan blockchain.Block) node.BlockChannel = make(chan blockchain.Block)
return node return node
} }

Loading…
Cancel
Save