merged conflict

pull/5/head
Richard Liu 7 years ago
commit 28d329e23a
  1. 43
      aws-code/transaction_generator.go
  2. 3
      aws-scripts/run_instances.sh
  3. 14
      benchmark_main.go
  4. 11
      consensus/consensus.go
  5. 47
      consensus/consensus_leader.go
  6. 3
      consensus/message.go
  7. 2
      deploy.sh
  8. 1
      deploy_linux.sh
  9. 2
      local_iplist.txt
  10. 9
      message/message.go
  11. 19
      node/message.go
  12. 50
      node/node.go

@ -1,12 +1,16 @@
package main
import (
"harmony-benchmark/blockchain"
"math/rand"
"time"
"bufio"
"flag"
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/node"
"harmony-benchmark/p2p"
"math/rand"
"os"
"strings"
"time"
)
func newRandTransaction() blockchain.Transaction {
@ -18,19 +22,48 @@ func newRandTransaction() blockchain.Transaction {
return tx
}
func getPeers(Ip, Port, iplist string) []p2p.Peer {
file, _ := os.Open(iplist)
fscanner := bufio.NewScanner(file)
var peerList []p2p.Peer
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
ip, port, status := p[0], p[1], p[2]
if status == "leader" || ip == Ip && port == Port {
continue
}
peer := p2p.Peer{Port: port, Ip: ip}
peerList = append(peerList, peer)
}
return peerList
}
func main() {
ip := flag.String("ip", "127.0.0.1", "IP of the leader")
port := flag.String("port", "9000", "port of the leader.")
ipfile := flag.String("ipfile", "local_iplist.txt", "file containing all ip addresses")
//getLeader to get ip,port and get totaltime I want to run
start := time.Now()
totalTime := 60.0
txs := make([]blockchain.Transaction, 10)
for true {
t := time.Now()
if t.Sub(start).Seconds() >= totalTime {
fmt.Println(int(t.Sub(start)), start, totalTime)
break
}
for i := range txs {
txs[i] = newRandTransaction()
}
msg := node.ConstructTransactionListMessage(txs)
p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg)
time.Sleep(1 * time.Second) // 10 transactions per second
time.Sleep(1 * time.Second) // 10 transactions per second
}
msg := node.ConstructStopMessage()
var leaderPeer p2p.Peer
leaderPeer.Ip = *ip
leaderPeer.Port = *port
peers := append(getPeers(*ip, *port, *ipfile), leaderPeer)
p2p.BroadcastMessage(peers, msg)
}

@ -1,4 +1,3 @@
#!/bin/bash -x
cd /home/ec2-user/projects/src/harmony-benchmark
./deploy_linux.sh local_iplist.txt
./send_txn.sh
./deploy_linux.sh local_iplist2.txt

@ -61,5 +61,19 @@ func main() {
log.Println("======================================")
node := node.NewNode(&consensus)
if consensus.IsLeader {
// Let consensus run
go func() {
log.Println("Waiting for block")
consensus.WaitForNewBlock(node.BlockChannel)
}()
// Node waiting for consensus readiness to create new block
go func() {
log.Println("Waiting for consensus ready")
node.WaitForConsensusReady(consensus.ReadySignal)
}()
}
node.StartServer(*port)
}

@ -35,6 +35,9 @@ type Consensus struct {
// BlockHeader to run consensus on
blockHeader []byte
// Signal channel for starting a new consensus process
ReadySignal chan int
//// Network related fields
msgCategory byte
actionType byte
@ -103,6 +106,14 @@ func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus
value, err := strconv.Atoi(socketId)
consensus.nodeId = uint16(value)
if consensus.IsLeader {
consensus.ReadySignal = make(chan int)
// send a signal to indicate it's ready to run consensus
go func() {
consensus.ReadySignal <- 1
}()
}
consensus.msgCategory = byte(message.COMMITTEE)
consensus.actionType = byte(message.CONSENSUS)
return consensus

@ -4,15 +4,26 @@ import (
"log"
"sync"
"harmony-benchmark/p2p"
"bytes"
"encoding/binary"
"errors"
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/p2p"
)
var mutex = &sync.Mutex{}
func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) {
for { // keep waiting for new blocks
newBlock := <-blockChannel
// TODO: think about potential race condition
if consensus.state == READY {
consensus.startConsensus(&newBlock)
}
}
}
// Leader's consensus message dispatcher
func (consensus *Consensus) ProcessMessageLeader(message []byte) {
msgType, err := GetConsensusMessageType(message)
@ -25,7 +36,6 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
log.Print(err)
}
msg := string(payload)
log.Printf("[Leader] Received and processing message: %s\n", msgType)
switch msgType {
case ANNOUNCE:
@ -37,18 +47,22 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
case RESPONSE:
consensus.processResponseMessage(payload)
case START_CONSENSUS:
consensus.processStartConsensusMessage(msg)
consensus.processStartConsensusMessage(payload)
default:
log.Println("Unexpected message type: %s", msgType)
}
}
// Handler for message which triggers consensus process
func (consensus *Consensus) processStartConsensusMessage(msg string) {
func (consensus *Consensus) processStartConsensusMessage(payload []byte) {
consensus.startConsensus(blockchain.NewGenesisBlock(blockchain.NewCoinbaseTX("x", "y")))
}
func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
// prepare message and broadcast to validators
// Construct new block
newBlock := constructNewBlock()
consensus.blockHash = getBlockHash(newBlock)
//newBlock := constructNewBlock()
consensus.blockHash = newBlock.Hash
msgToSend, err := consensus.constructAnnounceMessage()
if err != nil {
@ -119,23 +133,23 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
//#### Read payload data
offset := 0
// 4 byte consensus id
consensusId := binary.BigEndian.Uint32(payload[offset:offset+4])
consensusId := binary.BigEndian.Uint32(payload[offset : offset+4])
offset += 4
// 32 byte block hash
blockHash := payload[offset:offset+32]
blockHash := payload[offset : offset+32]
offset += 32
// 2 byte validator id
validatorId := string(payload[offset:offset+2])
validatorId := string(payload[offset : offset+2])
offset += 2
// 33 byte commit
commit := payload[offset:offset+33]
commit := payload[offset : offset+33]
offset += 33
// 64 byte of signature on previous data
signature := payload[offset:offset+64]
signature := payload[offset : offset+64]
offset += 64
//#### END: Read payload data
@ -225,23 +239,23 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
//#### Read payload data
offset := 0
// 4 byte consensus id
consensusId := binary.BigEndian.Uint32(payload[offset:offset+4])
consensusId := binary.BigEndian.Uint32(payload[offset : offset+4])
offset += 4
// 32 byte block hash
blockHash := payload[offset:offset+32]
blockHash := payload[offset : offset+32]
offset += 32
// 2 byte validator id
validatorId := string(payload[offset:offset+2])
validatorId := string(payload[offset : offset+2])
offset += 2
// 32 byte response
response := payload[offset:offset+32]
response := payload[offset : offset+32]
offset += 32
// 64 byte of signature on previous data
signature := payload[offset:offset+64]
signature := payload[offset : offset+64]
offset += 64
//#### END: Read payload data
@ -273,6 +287,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
// TODO: do followups on the consensus
log.Printf("HOORAY!!! CONSENSUS REACHED AMONG %d NODES!!!\n", len(consensus.validators))
consensus.ResetState()
consensus.ReadySignal <- 1
}
// TODO: composes new block and broadcast the new block to validators
}

@ -87,7 +87,8 @@ func (msgType MessageType) String() string {
"COMMIT",
"CHALLENGE",
"RESPONSE",
"START_CONSENSUS"}
"START_CONSENSUS",
}
if msgType < ANNOUNCE || msgType > START_CONSENSUS {
return "Unknown"

@ -6,4 +6,4 @@ while IFS='' read -r line || [[ -n "$line" ]]; do
go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile&
done < $ipfile
go run ./aws-code/transaction_generator.go
go run ./aws-code/transaction_generator.go -ipfile $ipfile

@ -19,3 +19,4 @@ while read ip port mode; do
#echo $ip $port $mode $ipfile
go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile&
done < $ipfile
go run ./aws-code/transaction_generator.go -ipfile $ipfile

@ -1,4 +1,3 @@
127.0.0.1 9000 leader
127.0.0.1 9001 validator
127.0.0.1 9002 validator
127.0.0.1 9003 validator
@ -99,3 +98,4 @@
127.0.0.1 9098 validator
127.0.0.1 9099 validator
127.0.0.1 9100 validator
127.0.0.1 9000 leader

@ -26,9 +26,9 @@ n - 2 bytes - actual message payload
const NODE_TYPE_BYTES = 1
const ACTION_TYPE_BYTES = 1
// The category of messages
type MessageCategory byte
const (
COMMITTEE MessageCategory = iota
NODE
@ -37,20 +37,21 @@ const (
// The specific types of message under committee category
type CommitteeMessageType byte
const (
CONSENSUS CommitteeMessageType = iota
// TODO: add more types
)
// The specific types of message under node category
type NodeMessageType byte
const (
TRANSACTION NodeMessageType = iota
CONTROL
// TODO: add more types
)
// Get the message category from the p2p message content
func GetMessageCategory(message []byte) (MessageCategory, error) {
if len(message) < NODE_TYPE_BYTES {
@ -61,7 +62,7 @@ func GetMessageCategory(message []byte) (MessageCategory, error) {
// Get the action type from the p2p message content
func GetMessageType(message []byte) (byte, error) {
if len(message) < NODE_TYPE_BYTES + ACTION_TYPE_BYTES {
if len(message) < NODE_TYPE_BYTES+ACTION_TYPE_BYTES {
return 0, errors.New("Failed to get action type: no data available.")
}
return byte(message[NODE_TYPE_BYTES+ACTION_TYPE_BYTES-1]), nil

@ -1,10 +1,10 @@
package node
import (
"harmony-benchmark/blockchain"
"bytes"
"harmony-benchmark/message"
"encoding/gob"
"harmony-benchmark/blockchain"
"harmony-benchmark/message"
)
type TransactionMessageType int
@ -13,6 +13,13 @@ const (
SEND TransactionMessageType = iota
)
type ControlMessageType int
const (
STOP ControlMessageType = iota
)
//ConstructTransactionListMessage constructs serialized transactions
func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)})
byteBuffer.WriteByte(byte(message.TRANSACTION))
@ -21,3 +28,11 @@ func ConstructTransactionListMessage(transactions []blockchain.Transaction) []by
encoder.Encode(transactions)
return byteBuffer.Bytes()
}
//ConstructStopMessage is STOP message
func ConstructStopMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)})
byteBuffer.WriteByte(byte(message.CONTROL))
byteBuffer.WriteByte(byte(STOP))
return byteBuffer.Bytes()
}

@ -1,25 +1,27 @@
package node
import (
"bytes"
"encoding/gob"
"harmony-benchmark/blockchain"
"harmony-benchmark/consensus"
"harmony-benchmark/message"
"harmony-benchmark/p2p"
"log"
"net"
"os"
"harmony-benchmark/p2p"
"harmony-benchmark/consensus"
"harmony-benchmark/message"
"harmony-benchmark/blockchain"
"bytes"
"encoding/gob"
"time"
)
// A node represents a program (machine) participating in the network
type Node struct {
consensus *consensus.Consensus
consensus *consensus.Consensus
BlockChannel chan blockchain.Block
pendingTransactions []blockchain.Transaction
}
// Start a server and process the request by a handler.
func (node Node) StartServer(port string) {
func (node *Node) StartServer(port string) {
listenOnPort(port, node.NodeHandler)
}
@ -112,7 +114,38 @@ func (node *Node) NodeHandler(conn net.Conn) {
}
node.pendingTransactions = append(node.pendingTransactions, *txList...)
log.Println(len(node.pendingTransactions))
case message.CONTROL:
controlType := msgPayload[0]
if ControlMessageType(controlType) == STOP {
log.Println("Stopping Node")
os.Exit(0)
}
}
}
}
func (node *Node) WaitForConsensusReady(readySignal chan int) {
for { // keep waiting for consensus ready
<-readySignal
// create a new block
newBlock := new(blockchain.Block)
for {
if len(node.pendingTransactions) >= 10 {
log.Println("Creating new block")
// TODO (Minh): package actual transactions
// For now, just take out 10 transactions
var txList []*blockchain.Transaction
for _, tx := range node.pendingTransactions[0:10] {
txList = append(txList, &tx)
}
node.pendingTransactions = node.pendingTransactions[10:]
newBlock = blockchain.NewBlock(txList, []byte{})
break
}
time.Sleep(1 * time.Second) // Periodically check whether we have enough transactions to package into block.
}
node.BlockChannel <- *newBlock
}
}
@ -120,5 +153,6 @@ func (node *Node) NodeHandler(conn net.Conn) {
func NewNode(consensus *consensus.Consensus) Node {
node := Node{}
node.consensus = consensus
node.BlockChannel = make(chan blockchain.Block)
return node
}
Loading…
Cancel
Save