Add client directory for client related codes; refactor message protocol files

pull/15/head
Rongjian Lan 7 years ago
parent f5b9c7e7e6
commit 4efc1bfcfa
  1. 14
      client/client.go
  2. 29
      client/message.go
  3. 20
      common/message.go
  4. 2
      consensus/consensus.go
  5. 2
      consensus/consensus_test.go
  6. 8
      consensus/message.go
  7. 24
      node/message.go
  8. 5
      node/node.go
  9. 31
      node/node_handler.go

@ -0,0 +1,14 @@
package client
import (
"harmony-benchmark/blockchain"
)
// A client represent a entity/user which send transactions and receive responses from the harmony network
type Client struct {
pendingCrossTxs map[[32]byte]*blockchain.Transaction // map of TxId to pending cross shard txs
}
func (client *Client) TransactionMessageHandler(msgPayload []byte) {
// TODO: Implement this
}

@ -0,0 +1,29 @@
package client
import (
"bytes"
"harmony-benchmark/common"
)
// The specific types of message under CLIENT category
type ClientMessageType byte
const (
TRANSACTION ClientMessageType = iota
// TODO: add more types
)
// The types of messages used for CLIENT/TRANSACTION
type TransactionMessageType int
const (
CROSS_TX TransactionMessageType = iota // The proof of accept or reject returned by the leader to the cross shard transaction client.
)
//ConstructStopMessage is STOP message
func ConstructProofOfAcceptOrRejectMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(common.CLIENT)})
byteBuffer.WriteByte(byte(TRANSACTION))
byteBuffer.WriteByte(byte(CROSS_TX))
return byteBuffer.Bytes()
}

@ -32,23 +32,7 @@ type MessageCategory byte
const (
COMMITTEE MessageCategory = iota
NODE
// TODO: add more types
)
// The specific types of message under COMMITTEE category
type CommitteeMessageType byte
const (
CONSENSUS CommitteeMessageType = iota
// TODO: add more types
)
// The specific types of message under NODE category
type NodeMessageType byte
const (
TRANSACTION NodeMessageType = iota
CONTROL
CLIENT
// TODO: add more types
)
@ -60,7 +44,7 @@ func GetMessageCategory(message []byte) (MessageCategory, error) {
return MessageCategory(message[NODE_TYPE_BYTES-1]), nil
}
// Get the action type from the p2p message content
// Get the message type from the p2p message content
func GetMessageType(message []byte) (byte, error) {
if len(message) < NODE_TYPE_BYTES+ACTION_TYPE_BYTES {
return 0, errors.New("Failed to get action type: no data available.")

@ -155,7 +155,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) C
}
consensus.msgCategory = byte(common.COMMITTEE)
consensus.actionType = byte(common.CONSENSUS)
consensus.actionType = byte(CONSENSUS)
consensus.Log = log.New()
return consensus

@ -22,7 +22,7 @@ func TestNewConsensus(test *testing.T) {
test.Error("Consensus ReadySignal should be initialized")
}
if consensus.actionType != byte(common.CONSENSUS) {
if consensus.actionType != byte(CONSENSUS) {
test.Error("Consensus actionType should be CONSENSUS")
}

@ -68,6 +68,14 @@ RESPONSE:
const MESSAGE_TYPE_BYTES = 1
// The specific types of message under COMMITTEE category
type CommitteeMessageType byte
const (
CONSENSUS CommitteeMessageType = iota
// TODO: add more types
)
// Consensus communication message type.
// Leader and validator dispatch messages based on incoming message type
type MessageType int

@ -7,13 +7,21 @@ import (
"harmony-benchmark/common"
)
// The specific types of message under NODE category
type NodeMessageType byte
const (
TRANSACTION NodeMessageType = iota
CONTROL
// TODO: add more types
)
// The types of messages used for NODE/TRANSACTION
type TransactionMessageType int
const (
SEND TransactionMessageType = iota
REQUEST
CROSS_TX_PROOF // The proof of accept or reject returned by the leader to the cross shard transaction client.
)
// The types of messages used for NODE/CONTROL
@ -26,7 +34,7 @@ const (
// Constructs serialized transactions
func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)})
byteBuffer.WriteByte(byte(common.TRANSACTION))
byteBuffer.WriteByte(byte(TRANSACTION))
byteBuffer.WriteByte(byte(SEND))
encoder := gob.NewEncoder(byteBuffer)
// Copy over the tx data
@ -41,7 +49,7 @@ func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []b
// Constructs serialized transactions
func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)})
byteBuffer.WriteByte(byte(common.TRANSACTION))
byteBuffer.WriteByte(byte(TRANSACTION))
byteBuffer.WriteByte(byte(REQUEST))
for _, txId := range transactionIds {
byteBuffer.Write(txId)
@ -52,15 +60,7 @@ func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte {
// Constructs STOP message for node to stop
func ConstructStopMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)})
byteBuffer.WriteByte(byte(common.CONTROL))
byteBuffer.WriteByte(byte(CONTROL))
byteBuffer.WriteByte(byte(STOP))
return byteBuffer.Bytes()
}
//ConstructStopMessage is STOP message
func ConstructProofOfAcceptOrRejectMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)})
byteBuffer.WriteByte(byte(common.TRANSACTION))
byteBuffer.WriteByte(byte(CROSS_TX_PROOF))
return byteBuffer.Bytes()
}

@ -2,6 +2,7 @@ package node
import (
"harmony-benchmark/blockchain"
"harmony-benchmark/client"
"harmony-benchmark/consensus"
"harmony-benchmark/log"
"harmony-benchmark/p2p"
@ -27,7 +28,9 @@ type Node struct {
pendingTxMutex sync.Mutex
crossTxToReturnMutex sync.Mutex
ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used to return proof-of-accept
ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used for leaders to return proof-of-accept
Client *client.Client // The presence of a client object means this node will also act as a client
}
// Add new crossTx and proofs to the list of crossTx that needs to be sent back to client

@ -4,7 +4,9 @@ import (
"bytes"
"encoding/gob"
"harmony-benchmark/blockchain"
"harmony-benchmark/client"
"harmony-benchmark/common"
"harmony-benchmark/consensus"
"harmony-benchmark/p2p"
"net"
"os"
@ -27,7 +29,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
node.log.Error("Read p2p data failed", "err", err, "node", node)
return
}
consensus := node.Consensus
consensusObj := node.Consensus
msgCategory, err := common.GetMessageCategory(content)
if err != nil {
@ -49,27 +51,32 @@ func (node *Node) NodeHandler(conn net.Conn) {
switch msgCategory {
case common.COMMITTEE:
actionType := common.CommitteeMessageType(msgType)
actionType := consensus.CommitteeMessageType(msgType)
switch actionType {
case common.CONSENSUS:
if consensus.IsLeader {
consensus.ProcessMessageLeader(msgPayload)
case consensus.CONSENSUS:
if consensusObj.IsLeader {
consensusObj.ProcessMessageLeader(msgPayload)
} else {
consensus.ProcessMessageValidator(msgPayload)
consensusObj.ProcessMessageValidator(msgPayload)
}
}
case common.NODE:
actionType := common.NodeMessageType(msgType)
actionType := NodeMessageType(msgType)
switch actionType {
case common.TRANSACTION:
case TRANSACTION:
node.transactionMessageHandler(msgPayload)
case common.CONTROL:
case CONTROL:
controlType := msgPayload[0]
if ControlMessageType(controlType) == STOP {
node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain())
os.Exit(0)
}
}
case common.CLIENT:
actionType := client.ClientMessageType(msgType)
switch actionType {
case client.TRANSACTION:
node.Client.TransactionMessageHandler(msgPayload)
}
}
}
@ -109,8 +116,6 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
}
}
// TODO: return the transaction list to requester
case CROSS_TX_PROOF:
// TODO: implement this
}
}
@ -164,7 +169,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) {
// This is called by consensus participants to verify the block they are running consensus on
func (node *Node) SendBackProofOfAcceptOrReject() {
if node.ClientPeer != nil {
p2p.SendMessage(*node.ClientPeer, ConstructProofOfAcceptOrRejectMessage())
p2p.SendMessage(*node.ClientPeer, client.ConstructProofOfAcceptOrRejectMessage())
}
}

Loading…
Cancel
Save