diff --git a/aws-code/transaction_generator.go b/aws-code/transaction_generator.go index cfd81d767..b5730c93f 100644 --- a/aws-code/transaction_generator.go +++ b/aws-code/transaction_generator.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "harmony-benchmark/blockchain" + "harmony-benchmark/client" "harmony-benchmark/consensus" "harmony-benchmark/log" "harmony-benchmark/node" @@ -50,6 +51,32 @@ func getNewFakeTransactions(dataNode *node.Node, numTxs int) []*blockchain.Trans for index, value := range utxoMap { countAll++ if rand.Intn(100) < 30 { // 30% sample rate to select UTXO to use for new transactions + // Sharding related test code. To be deleted soon + //if dataNode.Consensus.ShardID == 0 { + // txin := blockchain.TXInput{txId, index, address, dataNode.Consensus.ShardID} + // txin2 := blockchain.TXInput{txId, index, address, dataNode.Consensus.ShardID+1} + // txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000))} + // tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin, txin2}, []blockchain.TXOutput{txout}} + // tx.SetID() + // + // if count >= numTxs { + // continue + // } + // outputs = append(outputs, &tx) + // count++ + //} else { + // txin := blockchain.TXInput{txId, index, address, dataNode.Consensus.ShardID} + // txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000))} + // tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}} + // tx.SetID() + // + // if count >= numTxs { + // continue + // } + // outputs = append(outputs, &tx) + // count++ + //} + // Spend the money of current UTXO to a random address in [1 - 1000] txin := blockchain.TXInput{txId, index, address, dataNode.Consensus.ShardID} txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000))} @@ -149,7 +176,7 @@ func main() { consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{}) clientNode := node.NewNode(&consensusObj) - + clientNode.Client = client.NewClient() go func() { clientNode.StartServer(clientPort) }() diff --git a/benchmark_main.go b/benchmark_main.go index 03cb6e836..eb8ff2578 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -50,7 +50,7 @@ func getPeers(myIp, myPort, myShardId string, config *[][]string) []p2p.Peer { func getClientPeer(config *[][]string) *p2p.Peer { for _, node := range *config { ip, port, status := node[0], node[1], node[2] - if status == "client" { + if status != "client" { continue } peer := p2p.Peer{Port: port, Ip: ip} diff --git a/client/client.go b/client/client.go index d64a4f9b4..875d44d0e 100644 --- a/client/client.go +++ b/client/client.go @@ -1,14 +1,40 @@ package client import ( + "bytes" + "encoding/gob" "harmony-benchmark/blockchain" + "harmony-benchmark/log" ) // A client represent a entity/user which send transactions and receive responses from the harmony network type Client struct { pendingCrossTxs map[[32]byte]*blockchain.Transaction // map of TxId to pending cross shard txs + + log log.Logger // Log utility } func (client *Client) TransactionMessageHandler(msgPayload []byte) { - // TODO: Implement this + messageType := TransactionMessageType(msgPayload[0]) + switch messageType { + case CROSS_TX: + txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the CROSS_TX messge type + + proofList := new([]blockchain.CrossShardTxProof) + err := txDecoder.Decode(&proofList) + if err != nil { + client.log.Error("Failed deserializing cross transaction proof list") + } + + // TODO: process the proof list + } +} + +// Create a new Node +func NewClient() *Client { + client := Client{} + + // Logger + client.log = log.New() + return &client } diff --git a/client/message.go b/client/message.go index 11f2ac719..b9e5225a5 100644 --- a/client/message.go +++ b/client/message.go @@ -2,6 +2,8 @@ package client import ( "bytes" + "encoding/gob" + "harmony-benchmark/blockchain" "harmony-benchmark/common" ) @@ -21,9 +23,11 @@ const ( ) //ConstructStopMessage is STOP message -func ConstructProofOfAcceptOrRejectMessage() []byte { +func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(common.CLIENT)}) byteBuffer.WriteByte(byte(TRANSACTION)) byteBuffer.WriteByte(byte(CROSS_TX)) + encoder := gob.NewEncoder(byteBuffer) + encoder.Encode(proofs) return byteBuffer.Bytes() } diff --git a/node/node_handler.go b/node/node_handler.go index 642406576..b79491ddb 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -76,7 +76,9 @@ func (node *Node) NodeHandler(conn net.Conn) { actionType := client.ClientMessageType(msgType) switch actionType { case client.TRANSACTION: - node.Client.TransactionMessageHandler(msgPayload) + if node.Client != nil { + node.Client.TransactionMessageHandler(msgPayload) + } } } } @@ -169,7 +171,15 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) { // This is called by consensus participants to verify the block they are running consensus on func (node *Node) SendBackProofOfAcceptOrReject() { if node.ClientPeer != nil { - p2p.SendMessage(*node.ClientPeer, client.ConstructProofOfAcceptOrRejectMessage()) + node.crossTxToReturnMutex.Lock() + proofs := []blockchain.CrossShardTxProof{} + for _, txAndProof := range node.CrossTxsToReturn { + proofs = append(proofs, *txAndProof.Proof) + } + node.CrossTxsToReturn = nil + node.crossTxToReturnMutex.Unlock() + + p2p.SendMessage(*node.ClientPeer, client.ConstructProofOfAcceptOrRejectMessage(proofs)) } } @@ -190,7 +200,7 @@ func (node *Node) PostConsensusProcessing(newBlock *blockchain.Block) { node.transactionInConsensus = []*blockchain.Transaction{} if node.Consensus.IsLeader { - // Move crossTx in consensus into the list to be returned to client + // Move crossTx-in-consensus into the list to be returned to client for _, crossTxAndProof := range node.CrossTxsInConsensus { crossTxAndProof.Proof.BlockHash = newBlock.Hash // TODO: fill in the signature proofs