add more wires for syncing. add flag node_sync to assume the new node will start with sync first. also fix some code of identitychain which make some copies of struct containing Mutex

pull/69/head
Minh Doan 6 years ago
parent 210f9f7ba9
commit 93a7f92136
  1. 6
      consensus/consensus_validator.go
  2. 15
      node/node.go
  3. 11
      node/node_handler.go

@ -16,7 +16,7 @@ import (
)
// Validator's consensus message dispatcher
func (consensus *Consensus) ProcessMessageValidator(message []byte) {
func (consensus *Consensus) ProcessMessageValidator(message []byte, blockSyncing chan struct{}, syncNode bool) {
msgType, err := proto_consensus.GetConsensusMessageType(message)
if err != nil {
consensus.Log.Error("Failed to get consensus message type", "err", err, "consensus", consensus)
@ -27,6 +27,10 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
consensus.Log.Error("Failed to get consensus message payload", "err", err, "consensus", consensus)
}
if syncNode {
blockSyncing <- struct{}{}
return
}
switch msgType {
case proto_consensus.ANNOUNCE:
consensus.processAnnounceMessage(payload)

@ -41,6 +41,7 @@ type Node struct {
IsWaiting bool
Self p2p.Peer
IDCPeer p2p.Peer
syncNode bool // TODO(minhdoan): Remove it later.
}
// Add new crossTx and proofs to the list of crossTx that needs to be sent back to client
@ -93,6 +94,7 @@ func (node *Node) listenOnPort(port string) {
select {
case <-node.blockSyncing:
// Wait until the syncing part gets finished.
node.startBlockSyncing()
<-node.doneSyncing
default:
conn, err := listen.Accept()
@ -104,6 +106,11 @@ func (node *Node) listenOnPort(port string) {
}
}
}
func (node *Node) startBlockSyncing() {
// TODO(minhdoan):
for {
}
}
func (node *Node) String() string {
return node.Consensus.String()
@ -127,16 +134,16 @@ func (node *Node) ConnectIdentityChain() {
}
//NewWaitNode is a way to initiate a waiting no
func NewWaitNode(peer, IDCPeer p2p.Peer) Node {
func NewWaitNode(peer, IDCPeer p2p.Peer) *Node {
node := Node{}
node.Self = peer
node.IDCPeer = IDCPeer
node.log = log.New()
return node
return &node
}
//NewNodefromIDC
func NewNodefromIDC(node Node, consensus *consensus.Consensus, db *db.LDBDatabase) *Node {
func NewNodefromIDC(node *Node, consensus *consensus.Consensus, db *db.LDBDatabase) *Node {
if consensus != nil {
// Consensus and associated channel to communicate blocks
@ -162,7 +169,7 @@ func NewNodefromIDC(node Node, consensus *consensus.Consensus, db *db.LDBDatabas
// Logger
node.log = log.New()
return &node
return node
}
func (node *Node) processPOWMessage(message []byte) {

@ -77,7 +77,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
if consensusObj.IsLeader {
consensusObj.ProcessMessageLeader(msgPayload)
} else {
consensusObj.ProcessMessageValidator(msgPayload)
consensusObj.ProcessMessageValidator(msgPayload, node.blockSyncing, node.syncNode)
}
}
case proto.NODE:
@ -97,7 +97,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
}
}
case proto_node.BLOCKCHAIN_SYNC:
node.transactionMessageHandler(msgPayload)
node.handleBlockchainSync(conn)
case proto_node.CLIENT:
clientMsgType := proto_node.ClientMessageType(msgPayload[0])
switch clientMsgType {
@ -168,6 +168,12 @@ func (node *Node) NodeHandler(conn net.Conn) {
}
}
func (node *Node) handleBlockchainSync(conn net.Conn) {
for {
}
}
func (node *Node) transactionMessageHandler(msgPayload []byte) {
txMessageType := proto_node.TransactionMessageType(msgPayload[0])
@ -224,6 +230,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
timeoutCount := 0
for { // keep waiting for Consensus ready
retry := false
// TODO(minhdoan, rj): Refactor by sending signal in channel instead of waiting for 10 seconds.
select {
case <-readySignal:
time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up.

Loading…
Cancel
Save