Add transaction generater program which sends transactions to leaders periodically

pull/3/head
Rongjian Lan 6 years ago
parent 0b4d06ce5e
commit f2de27fa32
  1. 119
      aws-code/node.go
  2. 36
      aws-code/transaction_generator.go
  3. 4
      consensus/message.go
  4. 4
      deploy.sh
  5. 2
      kill_node.sh
  6. 23
      node/message.go
  7. 14
      node/node.go
  8. 2
      p2p/message.go

@ -1,119 +0,0 @@
package main
import (
"log"
"math/rand"
"time"
)
type Node struct {
ip int
leader bool
reciveBlock chan string
sendBlock chan string
}
type Nodes struct {
Nodes []*Node
}
func randomInt(min, max int) int {
return min + rand.Intn(max-min)
}
// Generate a random string of A-Z chars with len = l
func randomString(len int) string {
bytes := make([]byte, len)
for i := 0; i < len; i++ {
bytes[i] = byte(randomInt(97, 122))
}
return string(bytes)
}
func (n Node) send(cin <-chan string, id int) {
for msg := range cin {
log.Printf("Leader has sent message %s to %d\n", msg, id)
}
}
func consume(cin <-chan string, id int) {
for msg := range cin {
log.Printf("Leader has sent message %s to %d\n", msg, id)
}
}
func (n Node) receive() {
log.Printf("Node: %d received message\n", n.ip)
}
func createNode(ip int, isLeader bool) Node {
n := Node{ip: ip, leader: isLeader}
return n
}
func pickLeader(i int) Node {
if i == 0 {
return createNode(i, true)
} else {
return createNode(i, false)
}
}
func BufferedTxnQueueWithFanOut(ch <-chan string, size int) []chan string { // This needs
cs := make([]chan string, size)
for i, _ := range cs {
// The size of the channels buffer controls how far behind the recievers
// of the fanOut channels can lag the other channels.
cs[i] = make(chan string)
}
go func() {
for txs := range ch {
for _, c := range cs {
c <- txs
}
}
for _, c := range cs {
// close all our fanOut channels when the input channel is exhausted.
close(c)
}
}()
return cs
}
func TxnGenerator(numOfTxns int, lenOfRandomString int) <-chan string {
out := make(chan string)
go func() {
for i := 0; i < numOfTxns; i++ {
out <- randomString(lenOfRandomString)
log.Printf("Transaction Number %d\n", i)
//time.Sleep(2 * time.Second)
}
close(out)
}()
return out
}
func main() {
var (
//isLeader Node
numOfTxns = 1000
numOfNodes = 10
N = make([]Node, 10)
lenOfRandomString = 10
node_ips = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
)
for i := range node_ips {
m := pickLeader(i)
N[i] = m
// if m.leader {
// isLeader := m
// }
}
txnqueue := TxnGenerator(numOfTxns, lenOfRandomString)
Txns := BufferedTxnQueueWithFanOut(txnqueue, numOfNodes)
for num := range Txns {
txn := Txns[num]
go consume(txn, num)
}
time.Sleep(60 * time.Second)
}

@ -0,0 +1,36 @@
package main
import (
"harmony-benchmark/blockchain"
"math/rand"
"time"
"flag"
"harmony-benchmark/node"
"harmony-benchmark/p2p"
)
func newRandTransaction() blockchain.Transaction {
txin := blockchain.TXInput{[]byte{}, rand.Intn(100), string(rand.Uint64())}
txout := blockchain.TXOutput{rand.Intn(100), string(rand.Uint64())}
tx := blockchain.Transaction{nil, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}}
tx.SetID()
return tx
}
func main() {
ip := flag.String("ip", "127.0.0.1", "IP of the leader")
port := flag.String("port", "9000", "port of the leader.")
txs := make([]blockchain.Transaction, 10)
for true {
for i := range txs {
txs[i] = newRandTransaction()
}
msg := node.ConstructTransactionListMessage(txs)
p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg)
time.Sleep(1 * time.Second) // 10 transactions per second
}
}

@ -112,10 +112,10 @@ func GetConsensusMessagePayload(message []byte) ([]byte, error) {
}
// Concatenate msgType as one byte with payload, and return the whole byte array
func (consensus Consensus) ConstructConsensusMessage(msgType MessageType, payload []byte) []byte {
func (consensus Consensus) ConstructConsensusMessage(consensusMsgType MessageType, payload []byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{consensus.msgCategory})
byteBuffer.WriteByte(consensus.actionType)
byteBuffer.WriteByte(byte(msgType))
byteBuffer.WriteByte(byte(consensusMsgType))
byteBuffer.Write(payload)
return byteBuffer.Bytes()
}

@ -3,4 +3,6 @@ ipfile=$1
while read ip port mode; do
#echo $ip $port $mode $ipfile
go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile&
done < $ipfile
done < $ipfile
go run ./aws-code/transaction_generator.go

@ -1,4 +1,4 @@
for pid in `/bin/ps -fu $USER| grep "slave.go\|slave -port\|leader\|benchmark_main" | grep -v "grep" | awk '{print $2}'`;
for pid in `/bin/ps -fu $USER| grep "slave.go\|slave -port\|leader\|benchmark_main\|transaction_generator" | grep -v "grep" | awk '{print $2}'`;
do
echo 'Killed process: '$pid
kill -9 $pid

@ -0,0 +1,23 @@
package node
import (
"harmony-benchmark/blockchain"
"bytes"
"harmony-benchmark/message"
"encoding/gob"
)
type TransactionMessageType int
const (
SEND TransactionMessageType = iota
)
func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)})
byteBuffer.WriteByte(byte(message.TRANSACTION))
byteBuffer.WriteByte(byte(SEND))
encoder := gob.NewEncoder(byteBuffer)
encoder.Encode(transactions)
return byteBuffer.Bytes()
}

@ -7,11 +7,15 @@ import (
"harmony-benchmark/p2p"
"harmony-benchmark/consensus"
"harmony-benchmark/message"
"harmony-benchmark/blockchain"
"bytes"
"encoding/gob"
)
// A node represents a program (machine) participating in the network
type Node struct {
consensus *consensus.Consensus
pendingTransactions []blockchain.Transaction
}
// Start a server and process the request by a handler.
@ -99,7 +103,15 @@ func (node *Node) NodeHandler(conn net.Conn) {
actionType := message.NodeMessageType(msgType)
switch actionType {
case message.TRANSACTION:
// TODO: process transaction
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type
txList := new([]blockchain.Transaction)
err := txDecoder.Decode(&txList)
if err != nil {
log.Println("Failed deserializing transaction list")
}
node.pendingTransactions = append(node.pendingTransactions, *txList...)
log.Println(len(node.pendingTransactions))
}
}
}

@ -34,7 +34,7 @@ func ReadMessageContent(conn net.Conn) ([]byte, error) {
timeoutDuration := 1 * time.Second
conn.SetReadDeadline(time.Now().Add(timeoutDuration))
//// Read 1 byte for messge type
//// Read 1 byte for message type
_, err := r.ReadByte()
switch err {
case io.EOF:

Loading…
Cancel
Save