pull/69/head
Minh Doan 6 years ago
parent 85fdc490b2
commit 428387f34a
  1. 6
      consensus/consensus_validator.go
  2. 30
      node/node.go
  3. 14
      node/node_handler.go
  4. 8
      proto/node/node.go

@ -16,7 +16,7 @@ import (
) )
// Validator's consensus message dispatcher // Validator's consensus message dispatcher
func (consensus *Consensus) ProcessMessageValidator(message []byte, blockSyncing chan struct{}, syncNode bool) { func (consensus *Consensus) ProcessMessageValidator(message []byte) {
msgType, err := proto_consensus.GetConsensusMessageType(message) msgType, err := proto_consensus.GetConsensusMessageType(message)
if err != nil { if err != nil {
consensus.Log.Error("Failed to get consensus message type", "err", err, "consensus", consensus) consensus.Log.Error("Failed to get consensus message type", "err", err, "consensus", consensus)
@ -27,10 +27,6 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte, blockSyncing
consensus.Log.Error("Failed to get consensus message payload", "err", err, "consensus", consensus) consensus.Log.Error("Failed to get consensus message payload", "err", err, "consensus", consensus)
} }
if syncNode {
blockSyncing <- struct{}{}
return
}
switch msgType { switch msgType {
case proto_consensus.ANNOUNCE: case proto_consensus.ANNOUNCE:
consensus.processAnnounceMessage(payload) consensus.processAnnounceMessage(payload)

@ -34,14 +34,12 @@ type Node struct {
log log.Logger // Log utility log log.Logger // Log utility
pendingTxMutex sync.Mutex pendingTxMutex sync.Mutex
crossTxToReturnMutex sync.Mutex crossTxToReturnMutex sync.Mutex
blockSyncing chan struct{}
doneSyncing chan struct{}
ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used for leaders to return proof-of-accept ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used for leaders to return proof-of-accept
Client *client.Client // The presence of a client object means this node will also act as a client Client *client.Client // The presence of a client object means this node will also act as a client
IsWaiting bool IsWaiting bool
Self p2p.Peer Self p2p.Peer
IDCPeer p2p.Peer IDCPeer p2p.Peer
syncNode bool // TODO(minhdoan): Remove it later. SyncNode bool // TODO(minhdoan): Remove it later.
} }
// Add new crossTx and proofs to the list of crossTx that needs to be sent back to client // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client
@ -73,6 +71,9 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Trans
// Start a server and process the request by a handler. // Start a server and process the request by a handler.
func (node *Node) StartServer(port string) { func (node *Node) StartServer(port string) {
if node.SyncNode {
node.startBlockSyncing()
}
fmt.Println("Hello in server now") fmt.Println("Hello in server now")
node.log.Debug("Starting server", "node", node, "port", port) node.log.Debug("Starting server", "node", node, "port", port)
@ -91,24 +92,17 @@ func (node *Node) listenOnPort(port string) {
return return
} }
for { for {
select { conn, err := listen.Accept()
case <-node.blockSyncing: if err != nil {
// Wait until the syncing part gets finished. node.log.Error("Error listening on port.", "port", port)
node.startBlockSyncing() continue
<-node.doneSyncing
default:
conn, err := listen.Accept()
if err != nil {
node.log.Error("Error listening on port.", "port", port)
continue
}
go node.NodeHandler(conn)
} }
go node.NodeHandler(conn)
} }
} }
func (node *Node) startBlockSyncing() { func (node *Node) startBlockSyncing() {
// TODO(minhdoan):
for { for {
} }
} }
@ -245,10 +239,6 @@ func New(consensus *consensus.Consensus, db *db.LDBDatabase) *Node {
// Initialize level db. // Initialize level db.
node.db = db node.db = db
// Initialize channel for syncing.
node.doneSyncing = make(chan struct{})
node.blockSyncing = make(chan struct{})
} }
// Logger // Logger
node.log = log.New() node.log = log.New()

@ -1,6 +1,7 @@
package node package node
import ( import (
"bufio"
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"fmt" "fmt"
@ -77,7 +78,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
if consensusObj.IsLeader { if consensusObj.IsLeader {
consensusObj.ProcessMessageLeader(msgPayload) consensusObj.ProcessMessageLeader(msgPayload)
} else { } else {
consensusObj.ProcessMessageValidator(msgPayload, node.blockSyncing, node.syncNode) consensusObj.ProcessMessageValidator(msgPayload)
} }
} }
case proto.NODE: case proto.NODE:
@ -97,7 +98,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
} }
} }
case proto_node.BLOCKCHAIN_SYNC: case proto_node.BLOCKCHAIN_SYNC:
node.handleBlockchainSync(conn) node.handleBlockchainSync(msgPayload, conn)
case proto_node.CLIENT: case proto_node.CLIENT:
clientMsgType := proto_node.ClientMessageType(msgPayload[0]) clientMsgType := proto_node.ClientMessageType(msgPayload[0])
switch clientMsgType { switch clientMsgType {
@ -168,9 +169,12 @@ func (node *Node) NodeHandler(conn net.Conn) {
} }
} }
func (node *Node) handleBlockchainSync(conn net.Conn) { func (node *Node) handleBlockchainSync(payload []byte, conn net.Conn) {
for { // TODO(minhdoan): Looking to removing this.
w := bufio.NewWriter(conn)
for _, block := range node.blockchain.Blocks {
w.Write(block.Serialize())
w.Flush()
} }
} }

@ -97,6 +97,14 @@ func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []b
return byteBuffer.Bytes() return byteBuffer.Bytes()
} }
// Constructs Blockchain Sync Message.
func ConstructBlockchainSyncMessage(blockHash [32]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})
byteBuffer.WriteByte(byte(BLOCKCHAIN_SYNC))
byteBuffer.Write(blockHash[:])
return byteBuffer.Bytes()
}
// Constructs serialized transactions // Constructs serialized transactions
func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})

Loading…
Cancel
Save