merged conflict

pull/5/head
Richard Liu 7 years ago
commit d0021c93f0
  1. 2
      .travis.yml
  2. 2
      README.md
  3. 47
      aws-code/transaction_generator.go
  4. 2
      aws-scripts/parse_json.py
  5. 2
      aws-scripts/run_instances.sh
  6. 60
      benchmark_main.go
  7. 90
      blockchain/utxopool.go
  8. 10
      common/message.go
  9. 17
      consensus/consensus.go
  10. 62
      consensus/consensus_leader.go
  11. 34
      consensus/consensus_leader_test.go
  12. 36
      consensus/consensus_test.go
  13. 16
      consensus/consensus_validator.go
  14. 30
      consensus/consensus_validator_test.go
  15. 10
      deploy.sh
  16. 10
      deploy_linux.sh
  17. 101
      local_config.txt
  18. 11
      local_config2.txt
  19. 22
      local_config_shards.txt
  20. 101
      local_iplist.txt
  21. 11
      local_iplist2.txt
  22. 26
      node/message.go
  23. 82
      node/node.go
  24. 1
      p2p/message.go

@ -9,4 +9,4 @@ install: |
cd $HOME/gopath/src/harmony-benchmark
notifications:
slack:
secure: F6PRLJ7fIVpvZAEAIikE9u53CwklytznkNSJPuzB7cxLttAejls8ou3jsFFUkdTyyyy+QKFP9Bj+IFcAI9dq3CDYp7MnTQx/ajn8TC0xBwKW3gEKYoptIQJkXzvdwp9OanpU78QynRe1oRLefkN8qrL0zWLvoqcaHSYwjh1kzDCbO9G13aiI5CiFdW7jKwBKEPlleze4so0UdAaaGxyXUiSsmvHrOwF72ElnahKLy9DzmIgT8PdQkf1BmV3yaH3VQoU4fyu8t5P/jy8NYIXomcpo+pavgUBfXwd3mPf1KkAoB2RXdGCkORzHNZ6h+63ZVDzi1t8FWAbuNmvJ/Hq07IScGcsjZCEdy+79NSSBHThUejnHnqD4QdcXdhr30PoWBX85+phwPEVehAtddzgLyzivGByKHQMfWcetvPRww7Im+MpJW3OssgHlPxbRCE0uFtSDYgD9UoKevuZzFnME3U1rT9xqTweqOzxdsGRpvU/fLbh0PQCJnDQJjOB054l6ZTu9mtt+ogbYpOhB2PO6qLJZ64cn7M5/wLMiswRWooT9DeN+5a7n5MRo1i/TWFDFWBFYQVWTYdiTaU+oZdSZsu25TMboVraHPw3HE5tKIGgqOBLLd187HbE5ZeR62bzUnlN1wK1tphfgVa4ztmL+NY9nTgHMKki7LiNIL1p7M68=
secure: RPB3ThYIGuDUidvaWfOA7Hc9x1bDfd5+Y10r7xwY+NGCN3zW86s/GNLpLutI0MWTV9e2CJupHvz5clp8Ktle/tVjLhs6jHQnNV7U8PTWKkL5By6IFVAHN12unMQn/m0RPwqMfdubajXoV51XhbFA/iow/0fqwsd61VdPIuBrlQjy9z7kyVnRLNoGvYjDqKEkJfYVb3qFNFLzD0F7Y2AgxnezIRjsTLgHzR4owLJYqVMhvTYIV9/vSf1w4UUPzhHyZRESl6bri+a1+g7GxE32OtNwq68xxVeeJcrO/MbjAHHW9V6BW1MjJfYzD5T+7JHIfZOjV2WgzJ7uCkVYztfq+02yOCSWsLNxFVojIDhVFEhhJ6Vd2Zf1otolS7j0svK/qNmShID9q9NAasaI105GsQgtaSPAUGd88J/vyX2ndG1nDOvxmgOo10tZFOnPHW7JnWMybk3PLza8o1ujA7X3JFdvDA8BPP9h6MVP4N7doCQ/n4Crts53HvEWlvcv5sBNu61WYlSTBzf1qNwBKMyN2E0rNubsxKmW8B6jLdWYdlx57nyTRPraNKGE1fnUW5nWRZGax3F1tQRwEfpQMk22qgeUK0RYWsPgHFaPciKCA3dJX7t1k/ib9pyR4nc9SZnYw54KMhkAXPIVQ0iy0EpTAH1DNYV6v8zXCwjl+BdkhlY=

@ -16,7 +16,7 @@ git clone git@github.com:simple-rules/harmony-benchmark.git
## Usage
```
./deploy.sh local_iplist.txt
./deploy.sh local_config.txt
./send_txn.sh
```

@ -7,6 +7,7 @@ import (
"harmony-benchmark/blockchain"
"harmony-benchmark/node"
"harmony-benchmark/p2p"
"log"
"math/rand"
"os"
"strings"
@ -22,14 +23,14 @@ func newRandTransaction() blockchain.Transaction {
return tx
}
func getPeers(Ip, Port, iplist string) []p2p.Peer {
file, _ := os.Open(iplist)
func getValidators(config string) []p2p.Peer {
file, _ := os.Open(config)
fscanner := bufio.NewScanner(file)
var peerList []p2p.Peer
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
ip, port, status := p[0], p[1], p[2]
if status == "leader" || ip == Ip && port == Port {
if status == "leader" {
continue
}
peer := p2p.Peer{Port: port, Ip: ip}
@ -37,15 +38,39 @@ func getPeers(Ip, Port, iplist string) []p2p.Peer {
}
return peerList
}
func getLeaders(config *[][]string) []p2p.Peer {
var peerList []p2p.Peer
for _, node := range *config {
ip, port, status := node[0], node[1], node[2]
if status == "leader" {
peerList = append(peerList, p2p.Peer{Ip: ip, Port: port})
}
}
return peerList
}
func readConfigFile(configFile string) [][]string {
file, _ := os.Open(configFile)
fscanner := bufio.NewScanner(file)
result := [][]string{}
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
result = append(result, p)
}
return result
}
func main() {
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
flag.Parse()
config := readConfigFile(*configFile)
ip := flag.String("ip", "127.0.0.1", "IP of the leader")
port := flag.String("port", "9000", "port of the leader.")
ipfile := flag.String("ipfile", "local_iplist.txt", "file containing all ip addresses")
//getLeader to get ip,port and get totaltime I want to run
start := time.Now()
totalTime := 60.0
txs := make([]blockchain.Transaction, 10)
leaders := getLeaders(&config)
for true {
t := time.Now()
if t.Sub(start).Seconds() >= totalTime {
@ -57,13 +82,11 @@ func main() {
}
msg := node.ConstructTransactionListMessage(txs)
p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg)
log.Printf("[Generator] Sending txs to %d leader[s]\n", len(leaders))
p2p.BroadcastMessage(leaders, msg)
time.Sleep(1 * time.Second) // 10 transactions per second
}
msg := node.ConstructStopMessage()
var leaderPeer p2p.Peer
leaderPeer.Ip = *ip
leaderPeer.Port = *port
peers := append(getPeers(*ip, *port, *ipfile), leaderPeer)
peers := append(getValidators(*configFile), leaders...)
p2p.BroadcastMessage(peers, msg)
}

@ -9,7 +9,7 @@ def get_public_ip(all_reservations):
all_public_ip_addresses.append(instance_information['PublicIpAddress'])
return all_public_ip_addresses
def make_peers_list(all_reservations,port="9001",filename="ipList.txt"):
def make_peers_list(all_reservations,port="9001",filename="config.txt"):
p = get_public_ip(all_reservations)
f = open(filename,"w")
for i in range(len(p)):

@ -1,3 +1,3 @@
#!/bin/bash -x
cd /home/ec2-user/projects/src/harmony-benchmark
./deploy_linux.sh local_iplist2.txt
./deploy_linux.sh local_config2.txt

@ -1,24 +1,31 @@
package main
import (
"bufio"
"flag"
"harmony-benchmark/consensus"
"harmony-benchmark/p2p"
"harmony-benchmark/node"
"harmony-benchmark/p2p"
"log"
"os"
"bufio"
"strings"
)
func getLeader(iplist string) p2p.Peer {
file, _ := os.Open(iplist)
fscanner := bufio.NewScanner(file)
func getShardId(myIp, myPort string, config *[][]string) string {
for _, node := range *config {
ip, port, shardId := node[0], node[1], node[3]
if ip == myIp && port == myPort {
return shardId
}
}
return "N/A"
}
func getLeader(myShardId string, config *[][]string) p2p.Peer {
var leaderPeer p2p.Peer
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
ip, port, status := p[0], p[1], p[2]
if status == "leader" {
for _, node := range *config {
ip, port, status, shardId := node[0], node[1], node[2], node[3]
if status == "leader" && myShardId == shardId {
leaderPeer.Ip = ip
leaderPeer.Port = port
}
@ -26,14 +33,11 @@ func getLeader(iplist string) p2p.Peer {
return leaderPeer
}
func getPeers(Ip, Port, iplist string) []p2p.Peer {
file, _ := os.Open(iplist)
fscanner := bufio.NewScanner(file)
func getPeers(myIp, myPort, myShardId string, config *[][]string) []p2p.Peer {
var peerList []p2p.Peer
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
ip, port, status := p[0], p[1], p[2]
if status == "leader" || ip == Ip && port == Port {
for _, node := range *config {
ip, port, status, shardId := node[0], node[1], node[2], node[3]
if status == "leader" || ip == myIp && port == myPort || myShardId != shardId {
continue
}
peer := p2p.Peer{Port: port, Ip: ip}
@ -42,13 +46,31 @@ func getPeers(Ip, Port, iplist string) []p2p.Peer {
return peerList
}
func readConfigFile(configFile string) [][]string {
file, _ := os.Open(configFile)
fscanner := bufio.NewScanner(file)
result := [][]string{}
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
result = append(result, p)
}
return result
}
func main() {
ip := flag.String("ip", "127.0.0.1", "IP of the node")
port := flag.String("port", "9000", "port of the node.")
ipfile := flag.String("ipfile", "iplist.txt", "file containing all ip addresses")
configFile := flag.String("config_file", "config.txt", "file containing all ip addresses")
flag.Parse()
consensus := consensus.NewConsensus(*ip, *port, getPeers(*ip, *port, *ipfile), getLeader(*ipfile))
config := readConfigFile(*configFile)
shardId := getShardId(*ip, *port, &config)
peers := getPeers(*ip, *port, shardId, &config)
leader := getLeader(shardId, &config)
consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader)
var nodeStatus string
if consensus.IsLeader {
nodeStatus = "leader"
@ -57,7 +79,7 @@ func main() {
}
log.Println("======================================")
log.Printf("This node is a %s node listening on ip: %s and port: %s\n", nodeStatus, *ip, *port)
log.Printf("This node is a %s node in shard %s listening on ip: %s and port: %s\n", nodeStatus, shardId, *ip, *port)
log.Println("======================================")
node := node.NewNode(&consensus)

@ -4,6 +4,10 @@ import (
"encoding/hex"
)
const (
MaxNumberOfTransactions = 100
)
// UTXOPool stores transactions and balance associated with each address.
type UTXOPool struct {
// Mapping from address to a map of transaction id to a map of the index of output
@ -57,6 +61,80 @@ func (utxopool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool {
return true
}
// VerifyOneTransactionAndUpdate verifies if a list of transactions valid.
func (utxopool *UTXOPool) VerifyOneTransaction(tx *Transaction) bool {
spentTXOs := make(map[string]map[string]map[int]bool)
txID := hex.EncodeToString(tx.ID)
inTotal := 0
// Calculate the sum of TxInput
for _, in := range tx.TxInput {
inTxID := hex.EncodeToString(in.TxID)
index := in.TxOutputIndex
// Check if the transaction with the addres is spent or not.
if val, ok := utxopool.utxo[in.Address][inTxID][index]; ok {
if spentTXOs[in.Address][inTxID][index] {
return false
}
inTotal += val
} else {
return false
}
// Mark the transactions with the address and index spent.
if _, ok := spentTXOs[in.Address]; !ok {
spentTXOs[in.Address] = make(map[string]map[int]bool)
}
if _, ok := spentTXOs[in.Address][inTxID]; !ok {
spentTXOs[in.Address][inTxID] = make(map[int]bool)
}
spentTXOs[in.Address][inTxID][index] = true
}
outTotal := 0
// Calculate the sum of TxOutput
for index, out := range tx.TxOutput {
if val, ok := spentTXOs[out.Address][txID][index]; ok {
return false
}
outTotal += out.Value
}
if inTotal != outTotal {
return false
}
return true
}
func (utxopool *UTXOPool) Update(tx* Transaction) {
if utxopool != nil {
txID := hex.EncodeToString(tx.ID)
// Remove
for _, in := range tx.TxInput {
inTxID := hex.EncodeToString(in.TxID)
delete(utxopool.utxo[in.Address][inTxID], in.TxOutputIndex)
}
// Update
for index, out := range tx.TxOutput {
if _, ok := utxopool.utxo[out.Address]; !ok {
utxopool.utxo[out.Address] = make(map[string]map[int]int)
utxopool.utxo[out.Address][txID] = make(map[int]int)
}
if _, ok := utxopool.utxo[out.Address][txID]; !ok {
utxopool.utxo[out.Address][txID] = make(map[int]int)
}
utxopool.utxo[out.Address][txID][index] = out.Value
}
}
}
func (utxopool *UTXOPool) VerifyOneTransactionAndUpdate(tx *Transaction) bool {
if utxopool.VerifyOneTransaction(tx) {
utxopool.Update(tx)
return true
}
retur false
}
// VerifyAndUpdate verifies a list of transactions and update utxoPool.
func (utxopool *UTXOPool) VerifyAndUpdate(transactions []*Transaction) bool {
if utxopool.VerifyTransactions(transactions) {
@ -111,3 +189,15 @@ func CreateUTXOPoolFromGenesisBlockChain(bc *Blockchain) *UTXOPool {
tx := bc.blocks[0].Transactions[0]
return CreateUTXOPoolFromTransaction(tx)
}
// SelectTransactionsForNewBlock returns a list of index of valid transactions for the new block.
func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transaction) (selected, unselected []*Transaction) {
selected, unselected = []*Transaction{}, []*Transaction{}
for id, tx := range transactions {
if len(selected) < MaxNumberOfTransactions && utxoPool.VerifyOneTransactionAndUpdate(tx) {
append(selected, tx)
} else {
append(unselected, tx)
}
}
}

@ -1,4 +1,4 @@
package message
package common
import (
"errors"
@ -26,7 +26,7 @@ n - 2 bytes - actual message payload
const NODE_TYPE_BYTES = 1
const ACTION_TYPE_BYTES = 1
// The category of messages
// The CATEGORY of messages
type MessageCategory byte
const (
@ -35,7 +35,8 @@ const (
// TODO: add more types
)
// The specific types of message under committee category
// The specific types of message under COMMITTEE category
type CommitteeMessageType byte
const (
@ -43,7 +44,7 @@ const (
// TODO: add more types
)
// The specific types of message under node category
// The specific types of message under NODE category
type NodeMessageType byte
const (
@ -52,6 +53,7 @@ const (
// TODO: add more types
)
// Get the message category from the p2p message content
func GetMessageCategory(message []byte) (MessageCategory, error) {
if len(message) < NODE_TYPE_BYTES {

@ -4,6 +4,7 @@ package consensus // consensus
import (
"fmt"
"harmony-benchmark/message"
"harmony-benchmark/common"
"harmony-benchmark/p2p"
"log"
"regexp"
@ -32,9 +33,11 @@ type Consensus struct {
// Consensus Id (View Id) - 4 byte
consensusId uint32
// Blockhash - 32 byte
blockHash []byte
blockHash [32]byte
// BlockHeader to run consensus on
blockHeader []byte
// Shard Id which this node belongs to
ShardId uint32
// Signal channel for starting a new consensus process
ReadySignal chan int
@ -77,7 +80,7 @@ func (state ConsensusState) String() string {
}
// Create a new Consensus object
func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus {
func NewConsensus(ip, port, shardId string, peers []p2p.Peer, leader p2p.Peer) Consensus {
// The first Ip, port passed will be leader.
consensus := Consensus{}
peer := p2p.Peer{Port: port, Ip: ip}
@ -100,6 +103,11 @@ func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus
log.Fatal(err)
}
consensus.consensusId = 0
myShardId, err := strconv.Atoi(shardId)
if err != nil {
panic("Unparseable shard Id" + shardId)
}
consensus.ShardId = uint32(myShardId)
// For now use socket address as 16 byte Id
// TODO: populate with correct Id
@ -115,8 +123,8 @@ func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus
}()
}
consensus.msgCategory = byte(message.COMMITTEE)
consensus.actionType = byte(message.CONSENSUS)
consensus.msgCategory = byte(common.COMMITTEE)
consensus.actionType = byte(common.CONSENSUS)
return consensus
}
@ -126,7 +134,6 @@ func (consensus *Consensus) ResetState() {
consensus.commits = make(map[string]string)
consensus.responses = make(map[string]string)
}
// Returns ID of this consensus
func (consensus *Consensus) GetIdentityString() string {
var duty string

@ -10,6 +10,8 @@ import (
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/p2p"
"crypto/sha256"
"strings"
)
var mutex = &sync.Mutex{}
@ -37,7 +39,8 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
log.Print(err)
}
consensus.Logf("Received and processing message: %s\n", msgType)
log.Printf("[Leader-%d] Received and processing message: %s\n", consensus.ShardId, msgType)
switch msgType {
case ANNOUNCE:
consensus.Logf("Unexpected message type: %s", msgType)
@ -64,7 +67,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
// prepare message and broadcast to validators
// Construct new block
//newBlock := constructNewBlock()
consensus.blockHash = newBlock.Hash
copy(newBlock.Hash[:32], consensus.blockHash[:])
msgToSend, err := consensus.constructAnnounceMessage()
if err != nil {
@ -88,7 +91,7 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) {
if len(consensus.blockHash) != 32 {
return buffer.Bytes(), errors.New(fmt.Sprintf("Block Hash size is %d bytes", len(consensus.blockHash)))
}
buffer.Write(consensus.blockHash)
buffer.Write(consensus.blockHash[:])
// 2 byte leader id
twoBytes := make([]byte, 2)
@ -111,14 +114,9 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) {
return consensus.ConstructConsensusMessage(ANNOUNCE, buffer.Bytes()), nil
}
// TODO: fill in this function
func constructNewBlock() []byte {
return make([]byte, 200)
}
// TODO: fill in this function
func getBlockHash(block []byte) []byte {
return make([]byte, 32)
// Get the hash of a block's byte stream
func getBlockHash(block []byte) [32]byte {
return sha256.Sum256(block)
}
// TODO: fill in this function
@ -126,9 +124,10 @@ func getBlockHeader() []byte {
return make([]byte, 200)
}
// TODO: fill in this function
func signMessage(message []byte) []byte {
return make([]byte, 64)
// TODO: implement real ECC signature
mockSignature := sha256.Sum256(message)
return append(mockSignature[:], mockSignature[:]...)
}
func (consensus *Consensus) processCommitMessage(payload []byte) {
@ -199,7 +198,7 @@ func (consensus Consensus) constructChallengeMessage() []byte {
buffer.Write(fourBytes)
// 32 byte block hash
buffer.Write(consensus.blockHash)
buffer.Write(consensus.blockHash[:])
// 2 byte leader id
twoBytes := make([]byte, 2)
@ -207,10 +206,10 @@ func (consensus Consensus) constructChallengeMessage() []byte {
buffer.Write(twoBytes)
// 33 byte aggregated commit
buffer.Write(getAggregatedCommit())
buffer.Write(getAggregatedCommit(consensus.commits))
// 33 byte aggregated key
buffer.Write(getAggregatedKey())
buffer.Write(getAggregatedKey(consensus.commits))
// 32 byte challenge
buffer.Write(getChallenge())
@ -222,18 +221,30 @@ func (consensus Consensus) constructChallengeMessage() []byte {
return consensus.ConstructConsensusMessage(CHALLENGE, buffer.Bytes())
}
// TODO: fill in this function
func getAggregatedCommit() []byte {
return make([]byte, 33)
func getAggregatedCommit(commits map[string]string) []byte {
// TODO: implement actual commit aggregation
var commitArray []string
for _, val := range commits {
commitArray = append(commitArray, val)
}
var commit [32]byte
commit = sha256.Sum256([]byte(strings.Join(commitArray, "")))
return append(commit[:], byte(0))
}
// TODO: fill in this function
func getAggregatedKey() []byte {
return make([]byte, 33)
func getAggregatedKey(commits map[string]string) []byte {
// TODO: implement actual key aggregation
var commitArray []string
for key := range commits {
commitArray = append(commitArray, key)
}
var commit [32]byte
commit = sha256.Sum256([]byte(strings.Join(commitArray, "")))
return append(commit[:], byte(0))
}
// TODO: fill in this function
func getChallenge() []byte {
// TODO: implement actual challenge data
return make([]byte, 32)
}
@ -287,8 +298,11 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
// Set state to FINISHED
consensus.state = FINISHED
// TODO: do followups on the consensus
consensus.Logf("HOORAY!!! CONSENSUS REACHED AMONG %d NODES!!!\n", len(consensus.validators))
log.Printf("[Shard %d] HOORAY!!! CONSENSUS REACHED AMONG %d NODES WITH CONSENSUS ID %d!!!\n", consensus.ShardId, len(consensus.validators), consensus.consensusId)
consensus.ResetState()
consensus.consensusId++
consensus.ReadySignal <- 1
}
// TODO: composes new block and broadcast the new block to validators

@ -0,0 +1,34 @@
package consensus
import (
"harmony-benchmark/p2p"
"testing"
)
func TestConstructAnnounceMessage(test *testing.T) {
header := getBlockHeader()
leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10))
msg, err := consensus.constructAnnounceMessage()
if err != nil {
test.Error("Annouce message is not constructed successfully")
}
if len(msg) != 1+1+1+4+32+2+4+64+len(header) {
test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg))
}
}
func TestConstructChallengeMessage(test *testing.T) {
leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10))
msg := consensus.constructChallengeMessage()
if len(msg) != 1+1+1+4+32+2+33+33+32+64 {
test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg))
}
}

@ -0,0 +1,36 @@
package consensus
import (
"harmony-benchmark/common"
"harmony-benchmark/p2p"
"testing"
)
func TestNewConsensus(test *testing.T) {
leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
if consensus.consensusId != 0 {
test.Errorf("Consensus Id is initialized to the wrong value: %d", consensus.consensusId)
}
if consensus.IsLeader != true {
test.Error("Consensus should belong to a leader")
}
if consensus.ReadySignal == nil {
test.Error("Consensus ReadySignal should be initialized")
}
if consensus.actionType != byte(common.CONSENSUS) {
test.Error("Consensus actionType should be CONSENSUS")
}
if consensus.msgCategory != byte(common.COMMITTEE) {
test.Error("Consensus msgCategory should be COMMITTEE")
}
if consensus.leader != leader {
test.Error("Consensus Leader is set to wrong Peer")
}
}

@ -71,9 +71,12 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
_ = blockHeaderSize
_ = signature
consensus.blockHash = blockHash
copy(blockHash[:32], consensus.blockHash[:])
// verify block data
if consensusId != consensus.consensusId {
log.Printf("Received message with consensus Id: %d. My consensus Id: %d\n", consensusId, consensus.consensusId)
return
}
// sign block
// TODO: return the signature(commit) to leader
@ -95,7 +98,7 @@ func (consensus Consensus) constructCommitMessage() []byte {
buffer.Write(fourBytes)
// 32 byte block hash
buffer.Write(consensus.blockHash)
buffer.Write(consensus.blockHash[:])
// 2 byte validator id
twoBytes := make([]byte, 2)
@ -160,6 +163,10 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
_ = signature
// verify block data and the aggregated signatures
if consensusId != consensus.consensusId {
log.Printf("Received message with consensus Id: %d. My consensus Id: %d\n", consensusId, consensus.consensusId)
return
}
// sign the message
@ -170,6 +177,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
// Set state to RESPONSE_DONE
consensus.state = RESPONSE_DONE
consensus.consensusId++
}
// Construct the response message to send to leader (assumption the consensus data is already verified)
@ -182,7 +190,7 @@ func (consensus Consensus) constructResponseMessage() []byte {
buffer.Write(fourBytes)
// 32 byte block hash
buffer.Write(consensus.blockHash)
buffer.Write(consensus.blockHash[:32])
// 2 byte validator id
twoBytes := make([]byte, 2)

@ -0,0 +1,30 @@
package consensus
import (
"harmony-benchmark/p2p"
"testing"
)
func TestConstructCommitMessage(test *testing.T) {
leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10))
msg := consensus.constructCommitMessage()
if len(msg) != 1+1+1+4+32+2+33+64 {
test.Errorf("Commit message is not constructed in the correct size: %d", len(msg))
}
}
func TestConstructResponseMessage(test *testing.T) {
leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10))
msg := consensus.constructResponseMessage()
if len(msg) != 1+1+1+4+32+2+32+64 {
test.Errorf("Response message is not constructed in the correct size: %d", len(msg))
}
}

@ -1,9 +1,9 @@
./kill_node.sh
ipfile=$1
config=$1
while IFS='' read -r line || [[ -n "$line" ]]; do
IFS=' ' read ip port mode <<< $line
#echo $ip $port $mode $ipfile
go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile&
done < $ipfile
#echo $ip $port $mode $config
go run ./benchmark_main.go -ip $ip -port $port -config_file $config&
done < $config
go run ./aws-code/transaction_generator.go -ipfile $ipfile
go run ./aws-code/transaction_generator.go -config_file $config

@ -14,9 +14,9 @@ echo "Inside deploy linux"
echo $GOPATH
echo "Inside deploy linux line 2"
ipfile=$1
config=$1
while read ip port mode; do
#echo $ip $port $mode $ipfile
go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile&
done < $ipfile
go run ./aws-code/transaction_generator.go -ipfile $ipfile
#echo $ip $port $mode $config
go run ./benchmark_main.go -ip $ip -port $port -config_file $config&
done < $config
go run ./aws-code/transaction_generator.go -config_file $config

@ -0,0 +1,101 @@
127.0.0.1 9001 validator 0
127.0.0.1 9002 validator 0
127.0.0.1 9003 validator 0
127.0.0.1 9004 validator 0
127.0.0.1 9005 validator 0
127.0.0.1 9006 validator 0
127.0.0.1 9007 validator 0
127.0.0.1 9008 validator 0
127.0.0.1 9009 validator 0
127.0.0.1 9010 validator 0
127.0.0.1 9011 validator 0
127.0.0.1 9012 validator 0
127.0.0.1 9013 validator 0
127.0.0.1 9014 validator 0
127.0.0.1 9015 validator 0
127.0.0.1 9016 validator 0
127.0.0.1 9017 validator 0
127.0.0.1 9018 validator 0
127.0.0.1 9019 validator 0
127.0.0.1 9020 validator 0
127.0.0.1 9021 validator 0
127.0.0.1 9022 validator 0
127.0.0.1 9023 validator 0
127.0.0.1 9024 validator 0
127.0.0.1 9025 validator 0
127.0.0.1 9026 validator 0
127.0.0.1 9027 validator 0
127.0.0.1 9028 validator 0
127.0.0.1 9029 validator 0
127.0.0.1 9030 validator 0
127.0.0.1 9031 validator 0
127.0.0.1 9032 validator 0
127.0.0.1 9033 validator 0
127.0.0.1 9034 validator 0
127.0.0.1 9035 validator 0
127.0.0.1 9036 validator 0
127.0.0.1 9037 validator 0
127.0.0.1 9038 validator 0
127.0.0.1 9039 validator 0
127.0.0.1 9040 validator 0
127.0.0.1 9041 validator 0
127.0.0.1 9042 validator 0
127.0.0.1 9043 validator 0
127.0.0.1 9044 validator 0
127.0.0.1 9045 validator 0
127.0.0.1 9046 validator 0
127.0.0.1 9047 validator 0
127.0.0.1 9048 validator 0
127.0.0.1 9049 validator 0
127.0.0.1 9050 validator 0
127.0.0.1 9051 validator 0
127.0.0.1 9052 validator 0
127.0.0.1 9053 validator 0
127.0.0.1 9054 validator 0
127.0.0.1 9055 validator 0
127.0.0.1 9056 validator 0
127.0.0.1 9057 validator 0
127.0.0.1 9058 validator 0
127.0.0.1 9059 validator 0
127.0.0.1 9060 validator 0
127.0.0.1 9061 validator 0
127.0.0.1 9062 validator 0
127.0.0.1 9063 validator 0
127.0.0.1 9064 validator 0
127.0.0.1 9065 validator 0
127.0.0.1 9066 validator 0
127.0.0.1 9067 validator 0
127.0.0.1 9068 validator 0
127.0.0.1 9069 validator 0
127.0.0.1 9070 validator 0
127.0.0.1 9071 validator 0
127.0.0.1 9072 validator 0
127.0.0.1 9073 validator 0
127.0.0.1 9074 validator 0
127.0.0.1 9075 validator 0
127.0.0.1 9076 validator 0
127.0.0.1 9077 validator 0
127.0.0.1 9078 validator 0
127.0.0.1 9079 validator 0
127.0.0.1 9080 validator 0
127.0.0.1 9081 validator 0
127.0.0.1 9082 validator 0
127.0.0.1 9083 validator 0
127.0.0.1 9084 validator 0
127.0.0.1 9085 validator 0
127.0.0.1 9086 validator 0
127.0.0.1 9087 validator 0
127.0.0.1 9088 validator 0
127.0.0.1 9089 validator 0
127.0.0.1 9090 validator 0
127.0.0.1 9091 validator 0
127.0.0.1 9092 validator 0
127.0.0.1 9093 validator 0
127.0.0.1 9094 validator 0
127.0.0.1 9095 validator 0
127.0.0.1 9096 validator 0
127.0.0.1 9097 validator 0
127.0.0.1 9098 validator 0
127.0.0.1 9099 validator 0
127.0.0.1 9100 validator 0
127.0.0.1 9000 leader 0

@ -0,0 +1,11 @@
127.0.0.1 9001 validator 0
127.0.0.1 9002 validator 0
127.0.0.1 9003 validator 0
127.0.0.1 9004 validator 0
127.0.0.1 9005 validator 0
127.0.0.1 9006 validator 0
127.0.0.1 9007 validator 0
127.0.0.1 9008 validator 0
127.0.0.1 9009 validator 0
127.0.0.1 9010 validator 0
127.0.0.1 9000 leader 0

@ -0,0 +1,22 @@
127.0.0.1 9010 validator 0
127.0.0.1 9011 validator 0
127.0.0.1 9012 validator 0
127.0.0.1 9013 validator 0
127.0.0.1 9014 validator 0
127.0.0.1 9015 validator 0
127.0.0.1 9016 validator 0
127.0.0.1 9017 validator 0
127.0.0.1 9018 validator 0
127.0.0.1 9019 validator 0
127.0.0.1 9020 validator 1
127.0.0.1 9021 validator 1
127.0.0.1 9022 validator 1
127.0.0.1 9023 validator 1
127.0.0.1 9024 validator 1
127.0.0.1 9025 validator 1
127.0.0.1 9026 validator 1
127.0.0.1 9027 validator 1
127.0.0.1 9028 validator 1
127.0.0.1 9029 validator 1
127.0.0.1 9000 leader 0
127.0.0.1 9001 leader 1

@ -1,101 +0,0 @@
127.0.0.1 9001 validator
127.0.0.1 9002 validator
127.0.0.1 9003 validator
127.0.0.1 9004 validator
127.0.0.1 9005 validator
127.0.0.1 9006 validator
127.0.0.1 9007 validator
127.0.0.1 9008 validator
127.0.0.1 9009 validator
127.0.0.1 9010 validator
127.0.0.1 9011 validator
127.0.0.1 9012 validator
127.0.0.1 9013 validator
127.0.0.1 9014 validator
127.0.0.1 9015 validator
127.0.0.1 9016 validator
127.0.0.1 9017 validator
127.0.0.1 9018 validator
127.0.0.1 9019 validator
127.0.0.1 9020 validator
127.0.0.1 9021 validator
127.0.0.1 9022 validator
127.0.0.1 9023 validator
127.0.0.1 9024 validator
127.0.0.1 9025 validator
127.0.0.1 9026 validator
127.0.0.1 9027 validator
127.0.0.1 9028 validator
127.0.0.1 9029 validator
127.0.0.1 9030 validator
127.0.0.1 9031 validator
127.0.0.1 9032 validator
127.0.0.1 9033 validator
127.0.0.1 9034 validator
127.0.0.1 9035 validator
127.0.0.1 9036 validator
127.0.0.1 9037 validator
127.0.0.1 9038 validator
127.0.0.1 9039 validator
127.0.0.1 9040 validator
127.0.0.1 9041 validator
127.0.0.1 9042 validator
127.0.0.1 9043 validator
127.0.0.1 9044 validator
127.0.0.1 9045 validator
127.0.0.1 9046 validator
127.0.0.1 9047 validator
127.0.0.1 9048 validator
127.0.0.1 9049 validator
127.0.0.1 9050 validator
127.0.0.1 9051 validator
127.0.0.1 9052 validator
127.0.0.1 9053 validator
127.0.0.1 9054 validator
127.0.0.1 9055 validator
127.0.0.1 9056 validator
127.0.0.1 9057 validator
127.0.0.1 9058 validator
127.0.0.1 9059 validator
127.0.0.1 9060 validator
127.0.0.1 9061 validator
127.0.0.1 9062 validator
127.0.0.1 9063 validator
127.0.0.1 9064 validator
127.0.0.1 9065 validator
127.0.0.1 9066 validator
127.0.0.1 9067 validator
127.0.0.1 9068 validator
127.0.0.1 9069 validator
127.0.0.1 9070 validator
127.0.0.1 9071 validator
127.0.0.1 9072 validator
127.0.0.1 9073 validator
127.0.0.1 9074 validator
127.0.0.1 9075 validator
127.0.0.1 9076 validator
127.0.0.1 9077 validator
127.0.0.1 9078 validator
127.0.0.1 9079 validator
127.0.0.1 9080 validator
127.0.0.1 9081 validator
127.0.0.1 9082 validator
127.0.0.1 9083 validator
127.0.0.1 9084 validator
127.0.0.1 9085 validator
127.0.0.1 9086 validator
127.0.0.1 9087 validator
127.0.0.1 9088 validator
127.0.0.1 9089 validator
127.0.0.1 9090 validator
127.0.0.1 9091 validator
127.0.0.1 9092 validator
127.0.0.1 9093 validator
127.0.0.1 9094 validator
127.0.0.1 9095 validator
127.0.0.1 9096 validator
127.0.0.1 9097 validator
127.0.0.1 9098 validator
127.0.0.1 9099 validator
127.0.0.1 9100 validator
127.0.0.1 9000 leader

@ -1,11 +0,0 @@
127.0.0.1 9001 validator
127.0.0.1 9002 validator
127.0.0.1 9003 validator
127.0.0.1 9004 validator
127.0.0.1 9005 validator
127.0.0.1 9006 validator
127.0.0.1 9007 validator
127.0.0.1 9008 validator
127.0.0.1 9009 validator
127.0.0.1 9010 validator
127.0.0.1 9000 leader

@ -4,35 +4,47 @@ import (
"bytes"
"encoding/gob"
"harmony-benchmark/blockchain"
"harmony-benchmark/message"
"harmony-benchmark/common"
)
// The types of messages used for NODE/TRANSACTION
type TransactionMessageType int
const (
SEND TransactionMessageType = iota
REQUEST
)
// The types of messages used for NODE/CONTROL
type ControlMessageType int
const (
STOP ControlMessageType = iota
)
//ConstructTransactionListMessage constructs serialized transactions
func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)})
byteBuffer.WriteByte(byte(message.TRANSACTION))
byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)})
byteBuffer.WriteByte(byte(common.TRANSACTION))
byteBuffer.WriteByte(byte(SEND))
encoder := gob.NewEncoder(byteBuffer)
encoder.Encode(transactions)
return byteBuffer.Bytes()
}
//ConstructTransactionListMessage constructs serialized transactions
func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)})
byteBuffer.WriteByte(byte(common.TRANSACTION))
byteBuffer.WriteByte(byte(REQUEST))
for _, txId := range transactionIds {
byteBuffer.Write(txId)
}
return byteBuffer.Bytes()
}
//ConstructStopMessage is STOP message
func ConstructStopMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)})
byteBuffer.WriteByte(byte(message.CONTROL))
byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)})
byteBuffer.WriteByte(byte(common.CONTROL))
byteBuffer.WriteByte(byte(STOP))
return byteBuffer.Bytes()
}

@ -1,16 +1,16 @@
package node
import (
"bytes"
"encoding/gob"
"harmony-benchmark/blockchain"
"harmony-benchmark/consensus"
"harmony-benchmark/message"
"harmony-benchmark/common"
"harmony-benchmark/p2p"
"log"
"net"
"os"
"time"
"bytes"
"encoding/gob"
)
// A node represents a program (machine) participating in the network
@ -56,49 +56,41 @@ func (node *Node) NodeHandler(conn net.Conn) {
return
}
msgCategory, err := message.GetMessageCategory(content)
msgCategory, err := common.GetMessageCategory(content)
if err != nil {
node.Logf("Read node type failed:%s", err)
return
}
msgType, err := message.GetMessageType(content)
msgType, err := common.GetMessageType(content)
if err != nil {
node.Logf("Read action type failed:%s", err)
return
}
msgPayload, err := message.GetMessagePayload(content)
msgPayload, err := common.GetMessagePayload(content)
if err != nil {
node.Logf("Read message payload failed:%s", err)
return
}
switch msgCategory {
case message.COMMITTEE:
actionType := message.CommitteeMessageType(msgType)
case common.COMMITTEE:
actionType := common.CommitteeMessageType(msgType)
switch actionType {
case message.CONSENSUS:
case common.CONSENSUS:
if consensus.IsLeader {
consensus.ProcessMessageLeader(msgPayload)
} else {
consensus.ProcessMessageValidator(msgPayload)
}
}
case message.NODE:
actionType := message.NodeMessageType(msgType)
case common.NODE:
actionType := common.NodeMessageType(msgType)
switch actionType {
case message.TRANSACTION:
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type
txList := new([]blockchain.Transaction)
err := txDecoder.Decode(&txList)
if err != nil {
node.Logln("Failed deserializing transaction list")
}
node.pendingTransactions = append(node.pendingTransactions, *txList...)
node.Logln(len(node.pendingTransactions))
case message.CONTROL:
case common.TRANSACTION:
node.transactionMessageHandler(msgPayload)
case common.CONTROL:
controlType := msgPayload[0]
if ControlMessageType(controlType) == STOP {
node.Logln("Stopping Node")
@ -109,6 +101,52 @@ func (node *Node) NodeHandler(conn net.Conn) {
}
}
func (node *Node) transactionMessageHandler(msgPayload []byte) {
txMessageType := TransactionMessageType(msgPayload[0])
switch txMessageType {
case SEND:
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type
txList := new([]blockchain.Transaction)
err := txDecoder.Decode(&txList)
if err != nil {
log.Println("Failed deserializing transaction list")
}
node.pendingTransactions = append(node.pendingTransactions, *txList...)
case REQUEST:
reader := bytes.NewBuffer(msgPayload[1:])
var txIds map[[32]byte]bool
txId := make([]byte, 32) // 32 byte hash Id
for {
_, err := reader.Read(txId)
if err != nil {
break
}
txIds[getFixedByteTxId(txId)] = true
}
var txToReturn []blockchain.Transaction
for _, tx := range node.pendingTransactions {
if txIds[getFixedByteTxId(tx.ID)] {
txToReturn = append(txToReturn, tx)
}
}
// TODO: return the transaction list to requester
}
}
// Copy the txId byte slice over to 32 byte array so the map can key on it
func getFixedByteTxId(txId []byte) [32]byte {
var id [32]byte
for i := range id {
id[i] = txId[i]
}
return id
}
func (node *Node) WaitForConsensusReady(readySignal chan int) {
for { // keep waiting for consensus ready
<-readySignal

@ -48,7 +48,6 @@ func ReadMessageContent(conn net.Conn) ([]byte, error) {
}
// TODO: check on msgType and take actions accordingly
//// Read 4 bytes for message size
fourBytes := make([]byte, 4)
n, err := r.Read(fourBytes)

Loading…
Cancel
Save