diff --git a/client/client.go b/client/client.go index 10a148a3a..21d282ae6 100644 --- a/client/client.go +++ b/client/client.go @@ -3,12 +3,14 @@ package client import ( "bytes" "encoding/gob" + "fmt" + "github.com/simple-rules/harmony-benchmark/proto/node" "sync" "github.com/simple-rules/harmony-benchmark/blockchain" "github.com/simple-rules/harmony-benchmark/log" "github.com/simple-rules/harmony-benchmark/p2p" - proto_client "github.com/simple-rules/harmony-benchmark/proto/client" + client_proto "github.com/simple-rules/harmony-benchmark/proto/client" ) // A client represents a node (e.g. a wallet) which sends transactions and receives responses from the harmony network @@ -18,14 +20,30 @@ type Client struct { leaders *[]p2p.Peer // All the leaders for each shard UpdateBlocks func([]*blockchain.Block) // Closure function used to sync new block with the leader. Once the leader finishes the consensus on a new block, it will send it to the clients. Clients use this method to update their blockchain - log log.Logger // Log utility + UtxoMap blockchain.UtxoMap + ShardResponseTracker map[uint32]bool // A map containing the shard id of responded shard. + log log.Logger // Log utility +} + +func (client *Client) PrintUtxoBalance() { + for address, txHash2Vout2AmountMap := range client.UtxoMap { + balance := 0 + for _, vout2AmountMap := range txHash2Vout2AmountMap { + for _, amount := range vout2AmountMap { + balance += amount + } + + } + fmt.Printf("Address: {%x}\n", address) + fmt.Printf("Balance: %d\n", balance) + } } // The message handler for CLIENT/TRANSACTION messages. func (client *Client) TransactionMessageHandler(msgPayload []byte) { - messageType := proto_client.TransactionMessageType(msgPayload[0]) + messageType := client_proto.TransactionMessageType(msgPayload[0]) switch messageType { - case proto_client.PROOF_OF_LOCK: + case client_proto.PROOF_OF_LOCK: // Decode the list of blockchain.CrossShardTxProof txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the PROOF_OF_LOCK messge type proofs := new([]blockchain.CrossShardTxProof) @@ -35,6 +53,16 @@ func (client *Client) TransactionMessageHandler(msgPayload []byte) { client.log.Error("Failed deserializing cross transaction proof list") } client.handleProofOfLockMessage(proofs) + case client_proto.UTXO_RESPONSE: + fmt.Print("Received utxo resposne") + txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the PROOF_OF_LOCK messge type + fetchUtxoResponse := new(client_proto.FetchUtxoResponseMessage) + err := txDecoder.Decode(fetchUtxoResponse) + + if err != nil { + client.log.Error("Failed deserializing utxo resposne") + } + client.handleFetchUtxoResponseMessage(*fetchUtxoResponse) } } @@ -91,8 +119,35 @@ func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTx } } +func (client *Client) handleFetchUtxoResponseMessage(utxoResponse client_proto.FetchUtxoResponseMessage) { + _, ok := client.ShardResponseTracker[utxoResponse.ShardId] + if ok { + return + } + client.ShardResponseTracker[utxoResponse.ShardId] = true + // Merge utxo response into client utxo map. + for address, txHash2Vout2AmountMap := range utxoResponse.UtxoMap { + clientTxHashMap, ok := client.UtxoMap[address] + if ok { + for txHash, vout2AmountMap := range txHash2Vout2AmountMap { + clientVout2AmountMap, ok := clientTxHashMap[txHash] + if ok { + for vout, amount := range vout2AmountMap { + clientVout2AmountMap[vout] = amount + } + } else { + clientTxHashMap[txHash] = vout2AmountMap + } + + } + } else { + client.UtxoMap[address] = txHash2Vout2AmountMap + } + } +} + func (client *Client) broadcastCrossShardTxUnlockMessage(txsToSend *[]blockchain.Transaction) { - p2p.BroadcastMessage(*client.leaders, proto_client.ConstructUnlockToCommitOrAbortMessage(*txsToSend)) + p2p.BroadcastMessage(*client.leaders, node.ConstructUnlockToCommitOrAbortMessage(*txsToSend)) } // Create a new Client diff --git a/client/wallet/main.go b/client/wallet/main.go index 8ff3f9a38..3292f0a9c 100644 --- a/client/wallet/main.go +++ b/client/wallet/main.go @@ -6,16 +6,18 @@ import ( "flag" "fmt" "github.com/dedis/kyber" + "github.com/simple-rules/harmony-benchmark/blockchain" "github.com/simple-rules/harmony-benchmark/client" "github.com/simple-rules/harmony-benchmark/configr" "github.com/simple-rules/harmony-benchmark/crypto" "github.com/simple-rules/harmony-benchmark/crypto/pki" "github.com/simple-rules/harmony-benchmark/node" "github.com/simple-rules/harmony-benchmark/p2p" - proto_client "github.com/simple-rules/harmony-benchmark/proto/client" + proto_node "github.com/simple-rules/harmony-benchmark/proto/node" "io" "io/ioutil" "os" + "time" ) func main() { @@ -96,9 +98,22 @@ func main() { clientPeer := configr.GetClientPeer() walletNode := node.New(nil, nil) walletNode.Client = client.NewClient(&leaders) - fmt.Println(leaders) - p2p.BroadcastMessage(leaders, proto_client.ConstructFetchUtxoMessage(*clientPeer, ReadAddresses())) + go walletNode.StartServer(clientPeer.Port) fmt.Println("Fetching account balance...") + walletNode.Client.ShardResponseTracker = make(map[uint32]bool) + walletNode.Client.UtxoMap = make(blockchain.UtxoMap) + p2p.BroadcastMessage(leaders, proto_node.ConstructFetchUtxoMessage(*clientPeer, ReadAddresses())) + + go func() { + for true { + if len(walletNode.Client.ShardResponseTracker) == len(leaders) { + fmt.Println("All response received") + walletNode.Client.PrintUtxoBalance() + break + } + } + }() + time.Sleep(3 * time.Second) // Wait 3 seconds for the response. Exit afterward. case "test": priKey := pki.GetPrivateKeyScalarFromInt(33) address := pki.GetAddressFromPrivateKey(priKey) diff --git a/node/node_handler.go b/node/node_handler.go index 2f74d58ce..7d86ca873 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -86,11 +86,12 @@ func (node *Node) NodeHandler(conn net.Conn) { case proto_node.LOOKUP_UTXO: decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the LOOKUP_UTXO messge type - fetchUtxoMessage := new(client.FetchUtxoMessage) + fetchUtxoMessage := new(proto_node.FetchUtxoMessage) decoder.Decode(fetchUtxoMessage) utxoMap := node.UtxoPool.GetUtxoMapByAddresses(fetchUtxoMessage.Addresses) + p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) log.Info("Utxo Map", "Detail", utxoMap) } case proto_node.CONTROL: diff --git a/proto/client/client.go b/proto/client/client.go index 04bf232bc..bcb488125 100644 --- a/proto/client/client.go +++ b/proto/client/client.go @@ -3,11 +3,8 @@ package client import ( "bytes" "encoding/gob" - "github.com/simple-rules/harmony-benchmark/p2p" - "github.com/simple-rules/harmony-benchmark/blockchain" "github.com/simple-rules/harmony-benchmark/proto" - "github.com/simple-rules/harmony-benchmark/proto/node" ) // The specific types of message under CLIENT category @@ -23,12 +20,12 @@ type TransactionMessageType int const ( PROOF_OF_LOCK TransactionMessageType = iota // The proof of accept or reject returned by the leader to the client tnat issued cross shard transactions. + UTXO_RESPONSE ) -// The wrapper struct FetchUtxoMessage sent from client wallet -type FetchUtxoMessage struct { - Addresses [][20]byte - Peer p2p.Peer +type FetchUtxoResponseMessage struct { + UtxoMap blockchain.UtxoMap + ShardId uint32 } // [leader] Constructs the proof of accept or reject message that will be sent to client @@ -42,24 +39,13 @@ func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof return byteBuffer.Bytes() } -// [client] Constructs the unlock to commit or abort message that will be sent to leaders -func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []blockchain.Transaction) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) - byteBuffer.WriteByte(byte(node.TRANSACTION)) - byteBuffer.WriteByte(byte(node.UNLOCK)) - encoder := gob.NewEncoder(byteBuffer) - encoder.Encode(txsAndProofs) - return byteBuffer.Bytes() -} - -// [client] Constructs the fetch utxo message that will be sent to Harmony network -func ConstructFetchUtxoMessage(sender p2p.Peer, addresses [][20]byte) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) - byteBuffer.WriteByte(byte(node.CLIENT)) - byteBuffer.WriteByte(byte(node.LOOKUP_UTXO)) - +// Constructs the response message to fetch utxo message +func ConstructFetchUtxoResponseMessage(utxoMap *blockchain.UtxoMap, shardId uint32) []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(proto.CLIENT)}) + byteBuffer.WriteByte(byte(TRANSACTION)) + byteBuffer.WriteByte(byte(UTXO_RESPONSE)) encoder := gob.NewEncoder(byteBuffer) - encoder.Encode(FetchUtxoMessage{Addresses: addresses, Peer: sender}) + encoder.Encode(FetchUtxoResponseMessage{*utxoMap, shardId}) return byteBuffer.Bytes() } diff --git a/proto/node/node.go b/proto/node/node.go index 65b3cbdaf..3b298ef77 100644 --- a/proto/node/node.go +++ b/proto/node/node.go @@ -5,6 +5,7 @@ import ( "encoding/gob" "github.com/simple-rules/harmony-benchmark/blockchain" + "github.com/simple-rules/harmony-benchmark/p2p" "github.com/simple-rules/harmony-benchmark/proto" ) @@ -49,6 +50,34 @@ const ( STOP ControlMessageType = iota ) +// The wrapper struct FetchUtxoMessage sent from client wallet +type FetchUtxoMessage struct { + Addresses [][20]byte + Sender p2p.Peer +} + +// [client] Constructs the unlock to commit or abort message that will be sent to leaders +func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []blockchain.Transaction) []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) + byteBuffer.WriteByte(byte(TRANSACTION)) + byteBuffer.WriteByte(byte(UNLOCK)) + encoder := gob.NewEncoder(byteBuffer) + encoder.Encode(txsAndProofs) + return byteBuffer.Bytes() +} + +// [client] Constructs the fetch utxo message that will be sent to Harmony network +func ConstructFetchUtxoMessage(sender p2p.Peer, addresses [][20]byte) []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) + byteBuffer.WriteByte(byte(CLIENT)) + byteBuffer.WriteByte(byte(LOOKUP_UTXO)) + + encoder := gob.NewEncoder(byteBuffer) + encoder.Encode(FetchUtxoMessage{Addresses: addresses, Sender: sender}) + + return byteBuffer.Bytes() +} + // Constructs serialized transactions func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})