Add RequestTransactionsMessage for requesting missing transactions in a node

pull/6/head
Rongjian Lan 7 years ago
parent 461b3fa3d1
commit bcdb352ae0
  1. 16
      node/message.go
  2. 32
      node/node.go

@ -7,14 +7,15 @@ import (
"harmony-benchmark/common" "harmony-benchmark/common"
) )
// The types of messages used for NODE/TRANSACTION
type TransactionMessageType int type TransactionMessageType int
const ( const (
SEND TransactionMessageType = iota SEND TransactionMessageType = iota
REQUEST
) )
// The types of messages used for NODE/CONTROL
type ControlMessageType int type ControlMessageType int
const ( const (
STOP ControlMessageType = iota STOP ControlMessageType = iota
) )
@ -29,6 +30,17 @@ func ConstructTransactionListMessage(transactions []blockchain.Transaction) []by
return byteBuffer.Bytes() return byteBuffer.Bytes()
} }
//ConstructTransactionListMessage constructs serialized transactions
func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)})
byteBuffer.WriteByte(byte(common.TRANSACTION))
byteBuffer.WriteByte(byte(REQUEST))
for _, txId := range transactionIds {
byteBuffer.Write(txId)
}
return byteBuffer.Bytes()
}
//ConstructStopMessage is STOP message //ConstructStopMessage is STOP message
func ConstructStopMessage() []byte { func ConstructStopMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)})

@ -1,8 +1,6 @@
package node package node
import ( import (
"bytes"
"encoding/gob"
"harmony-benchmark/blockchain" "harmony-benchmark/blockchain"
"harmony-benchmark/consensus" "harmony-benchmark/consensus"
"harmony-benchmark/common" "harmony-benchmark/common"
@ -11,6 +9,8 @@ import (
"net" "net"
"os" "os"
"time" "time"
"bytes"
"encoding/gob"
) )
// A node represents a program (machine) participating in the network // A node represents a program (machine) participating in the network
@ -105,15 +105,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
actionType := common.NodeMessageType(msgType) actionType := common.NodeMessageType(msgType)
switch actionType { switch actionType {
case common.TRANSACTION: case common.TRANSACTION:
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type node.transactionMessageHandler(msgPayload)
txList := new([]blockchain.Transaction)
err := txDecoder.Decode(&txList)
if err != nil {
log.Println("Failed deserializing transaction list")
}
node.pendingTransactions = append(node.pendingTransactions, *txList...)
log.Println(len(node.pendingTransactions))
case common.CONTROL: case common.CONTROL:
controlType := msgPayload[0] controlType := msgPayload[0]
if ControlMessageType(controlType) == STOP { if ControlMessageType(controlType) == STOP {
@ -125,6 +117,24 @@ func (node *Node) NodeHandler(conn net.Conn) {
} }
} }
func (node *Node) transactionMessageHandler(msgPayload []byte) {
txMessageType := TransactionMessageType(msgPayload[0])
switch txMessageType {
case SEND:
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type
txList := new([]blockchain.Transaction)
err := txDecoder.Decode(&txList)
if err != nil {
log.Println("Failed deserializing transaction list")
}
node.pendingTransactions = append(node.pendingTransactions, *txList...)
case REQUEST:
// TODO: fill in logic to return the request transactions
}
}
func (node *Node) WaitForConsensusReady(readySignal chan int) { func (node *Node) WaitForConsensusReady(readySignal chan int) {
for { // keep waiting for consensus ready for { // keep waiting for consensus ready
<-readySignal <-readySignal

Loading…
Cancel
Save