From bcdb352ae08f4249fd9f03a0998e5b9c4694bdbf Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 Jun 2018 17:25:54 -0700 Subject: [PATCH] Add RequestTransactionsMessage for requesting missing transactions in a node --- node/message.go | 16 ++++++++++++++-- node/node.go | 32 +++++++++++++++++++++----------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/node/message.go b/node/message.go index dc537b1b3..3910eb85c 100644 --- a/node/message.go +++ b/node/message.go @@ -7,14 +7,15 @@ import ( "harmony-benchmark/common" ) +// The types of messages used for NODE/TRANSACTION type TransactionMessageType int - const ( SEND TransactionMessageType = iota + REQUEST ) +// The types of messages used for NODE/CONTROL type ControlMessageType int - const ( STOP ControlMessageType = iota ) @@ -29,6 +30,17 @@ func ConstructTransactionListMessage(transactions []blockchain.Transaction) []by return byteBuffer.Bytes() } +//ConstructTransactionListMessage constructs serialized transactions +func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) + byteBuffer.WriteByte(byte(common.TRANSACTION)) + byteBuffer.WriteByte(byte(REQUEST)) + for _, txId := range transactionIds { + byteBuffer.Write(txId) + } + return byteBuffer.Bytes() +} + //ConstructStopMessage is STOP message func ConstructStopMessage() []byte { byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) diff --git a/node/node.go b/node/node.go index 062e17a85..c9a9e2e93 100644 --- a/node/node.go +++ b/node/node.go @@ -1,8 +1,6 @@ package node import ( - "bytes" - "encoding/gob" "harmony-benchmark/blockchain" "harmony-benchmark/consensus" "harmony-benchmark/common" @@ -11,6 +9,8 @@ import ( "net" "os" "time" + "bytes" + "encoding/gob" ) // A node represents a program (machine) participating in the network @@ -105,15 +105,7 @@ func (node *Node) NodeHandler(conn net.Conn) { actionType := common.NodeMessageType(msgType) switch actionType { case common.TRANSACTION: - txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type - - txList := new([]blockchain.Transaction) - err := txDecoder.Decode(&txList) - if err != nil { - log.Println("Failed deserializing transaction list") - } - node.pendingTransactions = append(node.pendingTransactions, *txList...) - log.Println(len(node.pendingTransactions)) + node.transactionMessageHandler(msgPayload) case common.CONTROL: controlType := msgPayload[0] if ControlMessageType(controlType) == STOP { @@ -125,6 +117,24 @@ func (node *Node) NodeHandler(conn net.Conn) { } } +func (node *Node) transactionMessageHandler(msgPayload []byte) { + txMessageType := TransactionMessageType(msgPayload[0]) + + switch txMessageType { + case SEND: + txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type + + txList := new([]blockchain.Transaction) + err := txDecoder.Decode(&txList) + if err != nil { + log.Println("Failed deserializing transaction list") + } + node.pendingTransactions = append(node.pendingTransactions, *txList...) + case REQUEST: + // TODO: fill in logic to return the request transactions + } +} + func (node *Node) WaitForConsensusReady(readySignal chan int) { for { // keep waiting for consensus ready <-readySignal