pull/16/head
Richard Liu 6 years ago
commit 925de232ed
  1. 4
      .gitignore
  2. 22
      aws-code/loghost/main.go
  3. 2
      benchmark.go
  4. 2
      blockchain/utxopool.go
  5. 37
      client/client.go
  6. 2
      client/message.go
  7. 355
      client/txgen/main.go
  8. 2
      consensus/consensus_leader.go
  9. 8
      deploy.sh
  10. 2
      deploy_linux.sh
  11. 2
      deploy_one_instance.sh
  12. 8
      node/node.go
  13. 2
      node/node_handler.go
  14. 2
      node/node_test.go

4
.gitignore vendored

@ -11,9 +11,7 @@
*.i*86
*.x86_64
*.hex
benchmark
txgen
!txgen/ # unignore txgen folder
bin/
# Mac
.DS_Store

@ -8,20 +8,22 @@ import (
)
const (
CONN_HOST = "localhost"
CONN_PORT = "3000"
CONN_TYPE = "tcp"
CONN_URL = CONN_HOST + ":" + CONN_PORT
)
func main() {
// Listen for incoming connections.
l, err := net.Listen(CONN_TYPE, ":"+CONN_PORT)
l, err := net.Listen(CONN_TYPE, CONN_URL)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
// Close the listener when the application closes.
defer l.Close()
fmt.Println("Listening on " + ":" + CONN_PORT)
fmt.Println("Listening on " + CONN_URL)
for {
// Listen for an incoming connection.
conn, err := l.Accept()
@ -36,18 +38,12 @@ func main() {
// Handles incoming requests.
func handleRequest(conn net.Conn) {
// // Make a buffer to hold incoming data.
// buf := make([]byte, 1024)
// // Read the incoming connection into the buffer.
// reqLen, err := conn.Read(buf)
status, err := bufio.NewReader(conn).ReadString('\n')
for {
data, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
fmt.Println("Error reading:", err.Error())
break
}
fmt.Println(data)
}
// fmt.Printf("Received %v: %v", reqLen, buf)
fmt.Println(status)
// // Send a response back to person contacting xus.
// conn.Write([]byte("Message received."))
// Close the connection when you're done with it.
conn.Close()
}

@ -108,7 +108,7 @@ func main() {
consensus.OnConsensusDone = node.PostConsensusProcessing
// Temporary testing code, to be removed.
node.AddMoreFakeTransactions(10000)
node.AddTestingAddresses(10000)
if consensus.IsLeader {
// Let consensus run

@ -248,8 +248,6 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
}
}
} // If it's a cross shard locking Tx, then don't update so the input UTXOs are locked (removed), and the money is not spendable until unlock-to-commit or unlock-to-abort
// TODO: unlock-to-commit and unlock-to-abort
}
}

@ -9,40 +9,45 @@ import (
"sync"
)
// A client represent a entity/user which send transactions and receive responses from the harmony network
// A client represent a node (e.g. wallet) which sends transactions and receive responses from the harmony network
type Client struct {
PendingCrossTxs map[[32]byte]*blockchain.Transaction // map of TxId to pending cross shard txs
PendingCrossTxsMutex sync.Mutex
leaders *[]p2p.Peer
UpdateBlocks func([]*blockchain.Block) // func used to sync blocks with the leader
PendingCrossTxs map[[32]byte]*blockchain.Transaction // Map of TxId to pending cross shard txs. Pending means the proof-of-accept/rejects are not complete
PendingCrossTxsMutex sync.Mutex // Mutex for the pending txs list
leaders *[]p2p.Peer // All the leaders for each shard
UpdateBlocks func([]*blockchain.Block) // Closure function used to sync new block with the leader. Once the leader finishes the consensus on a new block, it will send it to the clients. Clients use this method to update their blockchain
log log.Logger // Log utility
}
// The message handler for CLIENT/TRANSACTION messages.
func (client *Client) TransactionMessageHandler(msgPayload []byte) {
messageType := TransactionMessageType(msgPayload[0])
switch messageType {
case PROOF_OF_LOCK:
// Decode the list of blockchain.CrossShardTxProof
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the PROOF_OF_LOCK messge type
proofList := new([]blockchain.CrossShardTxProof)
err := txDecoder.Decode(&proofList)
if err != nil {
client.log.Error("Failed deserializing cross transaction proof list")
}
txsToSend := []blockchain.Transaction{}
client.PendingCrossTxsMutex.Lock()
// Loop through the newly received list of proofs
client.PendingCrossTxsMutex.Lock()
for _, proof := range *proofList {
// Find the corresponding pending cross tx
txAndProofs, ok := client.PendingCrossTxs[proof.TxID]
readyToUnlock := true
readyToUnlock := true // A flag used to mark whether whether this pending cross tx have all the proofs for its utxo input
if ok {
// Add the new proof to the cross tx's proof list
txAndProofs.Proofs = append(txAndProofs.Proofs, proof)
// Check whether this pending cross tx have all the proofs for its utxo input
txInputs := make(map[blockchain.TXInput]bool)
for _, curProof := range txAndProofs.Proofs {
for _, txInput := range curProof.TxInput {
@ -58,30 +63,30 @@ func (client *Client) TransactionMessageHandler(msgPayload []byte) {
} else {
readyToUnlock = false
}
if readyToUnlock {
txsToSend = append(txsToSend, *txAndProofs)
}
}
// Delete all the transactions with full proofs from the pending cross txs
for _, txToSend := range txsToSend {
delete(client.PendingCrossTxs, txToSend.ID)
}
client.PendingCrossTxsMutex.Unlock()
// Broadcast the cross txs with full proofs for unlock-to-commit/abort
if len(txsToSend) != 0 {
client.sendCrossShardTxUnlockMessage(&txsToSend)
tempList := []*blockchain.Transaction{}
for i, _ := range txsToSend {
tempList = append(tempList, &txsToSend[i])
}
client.broadcastCrossShardTxUnlockMessage(&txsToSend)
}
}
}
func (client *Client) sendCrossShardTxUnlockMessage(txsToSend *[]blockchain.Transaction) {
func (client *Client) broadcastCrossShardTxUnlockMessage(txsToSend *[]blockchain.Transaction) {
p2p.BroadcastMessage(*client.leaders, ConstructUnlockToCommitOrAbortMessage(*txsToSend))
}
// Create a new Node
// Create a new Cient
func NewClient(leaders *[]p2p.Peer) *Client {
client := Client{}
client.PendingCrossTxs = make(map[[32]byte]*blockchain.Transaction)

@ -19,7 +19,7 @@ const (
type TransactionMessageType int
const (
PROOF_OF_LOCK TransactionMessageType = iota // The proof of accept or reject returned by the leader to the cross shard transaction client.
PROOF_OF_LOCK TransactionMessageType = iota // The proof of accept or reject returned by the leader to the client tnat issued cross shard transactions.
)
// [leader] Constructs the proof of accept or reject message that will be sent to client

@ -0,0 +1,355 @@
package main
import (
"bufio"
"encoding/hex"
"flag"
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/client"
"harmony-benchmark/consensus"
"harmony-benchmark/log"
"harmony-benchmark/node"
"harmony-benchmark/p2p"
"math/rand"
"os"
"strconv"
"strings"
"sync"
"time"
)
var utxoPoolMutex sync.Mutex
// Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards.
// The transactions are generated by going through the existing utxos and
// randomly select a subset of them as the input for each new transaction. The output
// address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses.
//
// When crossShard=true, besides the selected utxo input, select another valid utxo as input from the same address in a second shard.
// Similarly, generate another utxo output in that second shard.
//
// NOTE: the genesis block should contain N coinbase transactions which add
// token (1000) to each address in [0 - N). See node.AddTestingAddresses()
//
// Params:
// shardId - the shardId for current shard
// dataNodes - nodes containing utxopools of all shards
// maxNumTxs - the max number of txs to generate
// crossShard - whether to generate cross shard txs
// Returns:
// all single-shard txs
// all cross-shard txs
func generateSimulatedTransactions(shardId int, dataNodes []*node.Node, maxNumTxs int, crossShard bool) ([]*blockchain.Transaction, []*blockchain.Transaction) {
/*
UTXO map structure:
address - [
txId1 - [
outputIndex1 - value1
outputIndex2 - value2
]
txId2 - [
outputIndex1 - value1
outputIndex2 - value2
]
]
*/
var txs []*blockchain.Transaction
var crossTxs []*blockchain.Transaction
txsCount := 0
utxoPoolMutex.Lock()
UTXOLOOP:
// Loop over all addresses
for address, txMap := range dataNodes[shardId].UtxoPool.UtxoMap {
// Loop over all txIds for the address
for txIdStr, utxoMap := range txMap {
// Parse TxId
id, err := hex.DecodeString(txIdStr)
if err != nil {
continue
}
txId := [32]byte{}
copy(txId[:], id[:])
// Loop over all utxos for the txId
for index, value := range utxoMap {
if txsCount >= maxNumTxs {
break UTXOLOOP
}
randNum := rand.Intn(100)
// 30% sample rate to select UTXO to use for new transactions
if randNum < 30 {
if crossShard && randNum < 10 { // 30% cross shard transactions: add another txinput from another shard
// shard with neighboring Id
crossShardId := (int(dataNodes[shardId].Consensus.ShardID) + 1) % len(dataNodes)
crossShardNode := dataNodes[crossShardId]
crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[address]
// Get the cross shard utxo from another shard
var crossTxin *blockchain.TXInput
crossUtxoValue := 0
// Loop over utxos for the same address from the other shard and use the first utxo as the second cross tx input
for crossTxIdStr, crossShardUtxos := range crossShardUtxosMap {
// Parse TxId
id, err := hex.DecodeString(crossTxIdStr)
if err != nil {
continue
}
crossTxId := [32]byte{}
copy(crossTxId[:], id[:])
for crossShardIndex, crossShardValue := range crossShardUtxos {
crossUtxoValue = crossShardValue
crossTxin = &blockchain.TXInput{crossTxId, crossShardIndex, address, uint32(crossShardId)}
break
}
if crossTxin != nil {
break
}
}
// Add the utxo from current shard
txin := blockchain.TXInput{txId, index, address, dataNodes[shardId].Consensus.ShardID}
txInputs := []blockchain.TXInput{txin}
// Add the utxo from the other shard, if any
if crossTxin != nil {
txInputs = append(txInputs, *crossTxin)
}
// Spend the utxo from the current shard to a random address in [0 - N)
txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000)), dataNodes[shardId].Consensus.ShardID}
txOutputs := []blockchain.TXOutput{txout}
// Spend the utxo from the other shard, if any, to a random address in [0 - N)
if crossTxin != nil {
crossTxout := blockchain.TXOutput{crossUtxoValue, strconv.Itoa(rand.Intn(10000)), uint32(crossShardId)}
txOutputs = append(txOutputs, crossTxout)
}
// Construct the new transaction
tx := blockchain.Transaction{[32]byte{}, txInputs, txOutputs, nil}
tx.SetID()
crossTxs = append(crossTxs, &tx)
txsCount++
} else {
// Add the utxo as new tx input
txin := blockchain.TXInput{txId, index, address, dataNodes[shardId].Consensus.ShardID}
// Spend the utxo to a random address in [0 - N)
txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000)), dataNodes[shardId].Consensus.ShardID}
tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}, nil}
tx.SetID()
txs = append(txs, &tx)
txsCount++
}
}
}
}
}
utxoPoolMutex.Unlock()
return txs, crossTxs
}
// Gets all the validator peers
func getValidators(config string) []p2p.Peer {
file, _ := os.Open(config)
fscanner := bufio.NewScanner(file)
var peerList []p2p.Peer
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
ip, port, status := p[0], p[1], p[2]
if status == "leader" || status == "client" {
continue
}
peer := p2p.Peer{Port: port, Ip: ip}
peerList = append(peerList, peer)
}
return peerList
}
// Gets all the leader peers and corresponding shard Ids
func getLeadersAndShardIds(config *[][]string) ([]p2p.Peer, []uint32) {
var peerList []p2p.Peer
var shardIds []uint32
for _, node := range *config {
ip, port, status, shardId := node[0], node[1], node[2], node[3]
if status == "leader" {
peerList = append(peerList, p2p.Peer{Ip: ip, Port: port})
val, err := strconv.Atoi(shardId)
if err == nil {
shardIds = append(shardIds, uint32(val))
} else {
log.Error("[Generator] Error parsing the shard Id ", shardId)
}
}
}
return peerList, shardIds
}
// Parse the config file and return a 2d array containing the file data
func readConfigFile(configFile string) [][]string {
file, _ := os.Open(configFile)
fscanner := bufio.NewScanner(file)
result := [][]string{}
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
result = append(result, p)
}
return result
}
// Gets the port of the client node in the config
func getClientPort(config *[][]string) string {
for _, node := range *config {
_, port, status, _ := node[0], node[1], node[2], node[3]
if status == "client" {
return port
}
}
return ""
}
// A utility func that counts the total number of utxos in a pool.
func countNumOfUtxos(utxoPool *blockchain.UTXOPool) int {
countAll := 0
for _, utxoMap := range utxoPool.UtxoMap {
for txIdStr, val := range utxoMap {
_ = val
id, err := hex.DecodeString(txIdStr)
if err != nil {
continue
}
txId := [32]byte{}
copy(txId[:], id[:])
for _, utxo := range val {
_ = utxo
countAll++
}
}
}
return countAll
}
func main() {
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 10000, "number of transactions to send per message")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
flag.Parse()
// Read the configs
config := readConfigFile(*configFile)
leaders, shardIds := getLeadersAndShardIds(&config)
// Do cross shard tx if there are more than one shard
crossShard := len(shardIds) > 1
// TODO(Richard): refactor this chuck to a single method
// Setup a logger to stdout and log file.
logFileName := fmt.Sprintf("./%v/tx-generator.log", *logFolder)
h := log.MultiHandler(
log.Must.FileHandler(logFileName, log.LogfmtFormat()),
log.StdoutHandler)
// In cases where you just want a stdout logger, use the following one instead.
// h := log.CallerFileHandler(log.StdoutHandler)
log.Root().SetHandler(h)
// Nodes containing utxopools to mirror the shards' data in the network
nodes := []*node.Node{}
for _, shardId := range shardIds {
node := node.NewNode(&consensus.Consensus{ShardID: shardId})
// Assign many fake addresses so we have enough address to place with at first
node.AddTestingAddresses(10000)
nodes = append(nodes, &node)
}
// Client/txgenerator server node setup
clientPort := getClientPort(&config)
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{})
clientNode := node.NewNode(&consensusObj)
if clientPort != "" {
clientNode.Client = client.NewClient(&leaders)
// This func is used to update the client's utxopool when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*blockchain.Block) {
log.Debug("Received new block from leader", "len", len(blocks))
for _, block := range blocks {
for _, node := range nodes {
if node.Consensus.ShardID == block.ShardId {
log.Debug("Adding block from leader", "shardId", block.ShardId)
// Add it to blockchain
utxoPoolMutex.Lock()
node.AddNewBlock(block)
utxoPoolMutex.Unlock()
} else {
continue
}
}
}
}
clientNode.Client.UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go func() {
clientNode.StartServer(clientPort)
}()
}
// Transaction generation process
time.Sleep(10 * time.Second) // wait for nodes to be ready
start := time.Now()
totalTime := 60.0
for true {
t := time.Now()
if t.Sub(start).Seconds() >= totalTime {
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break
}
allCrossTxs := []*blockchain.Transaction{}
// Generate simulated transactions
for i, leader := range leaders {
txs, crossTxs := generateSimulatedTransactions(i, nodes, *maxNumTxsPerBatch, crossShard)
allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs))
msg := node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg)
// Note cross shard txs are later sent in batch
}
if len(allCrossTxs) > 0 {
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
msg := node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range allCrossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
}
time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically
}
// Send a stop message to stop the nodes at the end
msg := node.ConstructStopMessage()
peers := append(getValidators(*configFile), leaders...)
p2p.BroadcastMessage(peers, msg)
}

@ -167,7 +167,6 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
return
}
if len(consensus.commits) >= (2*len(consensus.validators))/3+1 && consensus.state < CHALLENGE_DONE {
if len(consensus.commits) >= (2*len(consensus.validators))/3+1 && consensus.state < CHALLENGE_DONE {
consensus.Log.Debug("Enough commits received with signatures", "numOfSignatures", len(consensus.commits))
@ -178,7 +177,6 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
// Set state to CHALLENGE_DONE
consensus.state = CHALLENGE_DONE
}
}
}
// Construct the challenge message to send to validators

@ -6,8 +6,8 @@
# and you won't be able to turn it off. With `go build` generating one
# exe, the dialog will only pop up once at the very first time.
# Also it's recommended to use `go build` for testing the whole exe.
go build -o benchmark # Build the harmony-benchmark.exe
go build -o txgen client/txgen/main.go
go build -o bin/benchmark
go build -o bin/txgen client/txgen/main.go
# Create a tmp folder for logs
t=`date +"%Y%m%d-%H%M%S"`
@ -21,9 +21,9 @@ while IFS='' read -r line || [[ -n "$line" ]]; do
IFS=' ' read ip port mode shardId <<< $line
#echo $ip $port $mode
if [ "$mode" != "client" ]; then
./benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder&
./bin/benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder&
fi
done < $config
# Generate transactions
./txgen -config_file $config -log_folder $log_folder
./bin/txgen -config_file $config -log_folder $log_folder

@ -17,6 +17,6 @@ echo "Inside deploy linux line 2"
config=$1
while read ip port mode; do
#echo $ip $port $mode $config
go run ./benchmark_main.go -ip $ip -port $port -config_file $config&
go run ./benchmark.go -ip $ip -port $port -config_file $config&
done < $config
go run ./client/txgen/main.go -config_file $config

@ -34,5 +34,5 @@ fi
if [ -f $FILE ]; then
go run ./client/txgen/main.go -config_file $config -log_folder $log_folder&
else
go run ./benchmark_main.go -ip $current_ip -config_file $config -log_folder $log_folder&
go run ./benchmark.go -ip $current_ip -config_file $config -log_folder $log_folder&
fi

@ -86,10 +86,10 @@ func (node *Node) String() string {
return node.Consensus.String()
}
// [Testing code] Should be deleted for production
// Create in genesis block numTxs transactions which assign 1000 token to each address in [1 - numTxs]
func (node *Node) AddMoreFakeTransactions(numTxs int) {
txs := make([]*blockchain.Transaction, numTxs)
// [Testing code] Should be deleted after production
// Creates in genesis block numAddress transactions which assign 1000 token to each address in [0 - numAddress)
func (node *Node) AddTestingAddresses(numAddress int) {
txs := make([]*blockchain.Transaction, numAddress)
for i := range txs {
txs[i] = blockchain.NewCoinbaseTX(strconv.Itoa(i), "", node.Consensus.ShardID)
}

@ -244,7 +244,7 @@ func (node *Node) SendBackProofOfAcceptOrReject() {
}
// This is called by consensus leader to sync new blocks with other clients/nodes.
// For now, just send to the client.
// NOTE: For now, just send to the client (basically not broadcasting)
func (node *Node) BroadcastNewBlock(newBlock *blockchain.Block) {
if node.ClientPeer != nil {
node.log.Debug("SENDING NEW BLOCK TO CLIENT")

@ -39,7 +39,7 @@ func TestCountNumTransactionsInBlockchain(test *testing.T) {
consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
node := NewNode(&consensus)
node.AddMoreFakeTransactions(1000)
node.AddTestingAddresses(1000)
if node.countNumTransactionsInBlockchain() != 1001 {
test.Error("Count of transactions in the blockchain is incorrect")
}

Loading…
Cancel
Save