Richard Liu 6 years ago
commit 95878c2ae2
  1. 3
      client/client.go
  2. 4
      client/wallet/main.go
  3. 2
      consensus/consensus.go
  4. 6
      consensus/consensus_validator.go
  5. 15
      node/node.go
  6. 17
      node/node_handler.go

@ -20,6 +20,7 @@ type Client struct {
UpdateBlocks func([]*blockchain.Block) // Closure function used to sync new block with the leader. Once the leader finishes the consensus on a new block, it will send it to the clients. Clients use this method to update their blockchain UpdateBlocks func([]*blockchain.Block) // Closure function used to sync new block with the leader. Once the leader finishes the consensus on a new block, it will send it to the clients. Clients use this method to update their blockchain
ShardUtxoMap map[uint32]blockchain.UtxoMap ShardUtxoMap map[uint32]blockchain.UtxoMap
ShardUtxoMapMutex sync.Mutex // Mutex for the UTXO maps
log log.Logger // Log utility log log.Logger // Log utility
} }
@ -104,6 +105,8 @@ func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTx
} }
func (client *Client) handleFetchUtxoResponseMessage(utxoResponse client_proto.FetchUtxoResponseMessage) { func (client *Client) handleFetchUtxoResponseMessage(utxoResponse client_proto.FetchUtxoResponseMessage) {
client.ShardUtxoMapMutex.Lock()
defer client.ShardUtxoMapMutex.Unlock()
_, ok := client.ShardUtxoMap[utxoResponse.ShardId] _, ok := client.ShardUtxoMap[utxoResponse.ShardId]
if ok { if ok {
return return

@ -75,6 +75,10 @@ func main() {
case "import": case "import":
accountImportCommand.Parse(os.Args[3:]) accountImportCommand.Parse(os.Args[3:])
priKey := *accountImportPtr priKey := *accountImportPtr
if priKey == "" {
fmt.Println("Error: --privateKey is required")
return
}
if !accountImportCommand.Parsed() { if !accountImportCommand.Parsed() {
fmt.Println("Failed to parse flags") fmt.Println("Failed to parse flags")
} }

@ -55,6 +55,8 @@ type Consensus struct {
blockHash [32]byte blockHash [32]byte
// BlockHeader to run consensus on // BlockHeader to run consensus on
blockHeader []byte blockHeader []byte
// Array of block hashes.
blockHashes [][32]byte
// Shard Id which this node belongs to // Shard Id which this node belongs to
ShardID uint32 ShardID uint32

@ -16,7 +16,7 @@ import (
) )
// Validator's consensus message dispatcher // Validator's consensus message dispatcher
func (consensus *Consensus) ProcessMessageValidator(message []byte) { func (consensus *Consensus) ProcessMessageValidator(message []byte, blockSyncing chan struct{}, syncNode bool) {
msgType, err := proto_consensus.GetConsensusMessageType(message) msgType, err := proto_consensus.GetConsensusMessageType(message)
if err != nil { if err != nil {
consensus.Log.Error("Failed to get consensus message type", "err", err, "consensus", consensus) consensus.Log.Error("Failed to get consensus message type", "err", err, "consensus", consensus)
@ -27,6 +27,10 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
consensus.Log.Error("Failed to get consensus message payload", "err", err, "consensus", consensus) consensus.Log.Error("Failed to get consensus message payload", "err", err, "consensus", consensus)
} }
if syncNode {
blockSyncing <- struct{}{}
return
}
switch msgType { switch msgType {
case proto_consensus.ANNOUNCE: case proto_consensus.ANNOUNCE:
consensus.processAnnounceMessage(payload) consensus.processAnnounceMessage(payload)

@ -41,6 +41,7 @@ type Node struct {
IsWaiting bool IsWaiting bool
Self p2p.Peer Self p2p.Peer
IDCPeer p2p.Peer IDCPeer p2p.Peer
syncNode bool // TODO(minhdoan): Remove it later.
} }
// Add new crossTx and proofs to the list of crossTx that needs to be sent back to client // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client
@ -93,6 +94,7 @@ func (node *Node) listenOnPort(port string) {
select { select {
case <-node.blockSyncing: case <-node.blockSyncing:
// Wait until the syncing part gets finished. // Wait until the syncing part gets finished.
node.startBlockSyncing()
<-node.doneSyncing <-node.doneSyncing
default: default:
conn, err := listen.Accept() conn, err := listen.Accept()
@ -104,6 +106,11 @@ func (node *Node) listenOnPort(port string) {
} }
} }
} }
func (node *Node) startBlockSyncing() {
// TODO(minhdoan):
for {
}
}
func (node *Node) String() string { func (node *Node) String() string {
return node.Consensus.String() return node.Consensus.String()
@ -127,16 +134,16 @@ func (node *Node) ConnectIdentityChain() {
} }
//NewWaitNode is a way to initiate a waiting no //NewWaitNode is a way to initiate a waiting no
func NewWaitNode(peer, IDCPeer p2p.Peer) Node { func NewWaitNode(peer, IDCPeer p2p.Peer) *Node {
node := Node{} node := Node{}
node.Self = peer node.Self = peer
node.IDCPeer = IDCPeer node.IDCPeer = IDCPeer
node.log = log.New() node.log = log.New()
return node return &node
} }
//NewNodefromIDC //NewNodefromIDC
func NewNodefromIDC(node Node, consensus *consensus.Consensus, db *db.LDBDatabase) *Node { func NewNodefromIDC(node *Node, consensus *consensus.Consensus, db *db.LDBDatabase) *Node {
if consensus != nil { if consensus != nil {
// Consensus and associated channel to communicate blocks // Consensus and associated channel to communicate blocks
@ -162,7 +169,7 @@ func NewNodefromIDC(node Node, consensus *consensus.Consensus, db *db.LDBDatabas
// Logger // Logger
node.log = log.New() node.log = log.New()
return &node return node
} }
func (node *Node) processPOWMessage(message []byte) { func (node *Node) processPOWMessage(message []byte) {

@ -77,7 +77,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
if consensusObj.IsLeader { if consensusObj.IsLeader {
consensusObj.ProcessMessageLeader(msgPayload) consensusObj.ProcessMessageLeader(msgPayload)
} else { } else {
consensusObj.ProcessMessageValidator(msgPayload) consensusObj.ProcessMessageValidator(msgPayload, node.blockSyncing, node.syncNode)
} }
} }
case proto.NODE: case proto.NODE:
@ -97,7 +97,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
} }
} }
case proto_node.BLOCKCHAIN_SYNC: case proto_node.BLOCKCHAIN_SYNC:
node.transactionMessageHandler(msgPayload) node.handleBlockchainSync(conn)
case proto_node.CLIENT: case proto_node.CLIENT:
clientMsgType := proto_node.ClientMessageType(msgPayload[0]) clientMsgType := proto_node.ClientMessageType(msgPayload[0])
switch clientMsgType { switch clientMsgType {
@ -168,6 +168,12 @@ func (node *Node) NodeHandler(conn net.Conn) {
} }
} }
func (node *Node) handleBlockchainSync(conn net.Conn) {
for {
}
}
func (node *Node) transactionMessageHandler(msgPayload []byte) { func (node *Node) transactionMessageHandler(msgPayload []byte) {
txMessageType := proto_node.TransactionMessageType(msgPayload[0]) txMessageType := proto_node.TransactionMessageType(msgPayload[0])
@ -224,10 +230,11 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
timeoutCount := 0 timeoutCount := 0
for { // keep waiting for Consensus ready for { // keep waiting for Consensus ready
retry := false retry := false
// TODO(minhdoan, rj): Refactor by sending signal in channel instead of waiting for 10 seconds.
select { select {
case <-readySignal: case <-readySignal:
time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up. time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up.
case <-time.After(10 * time.Second): case <-time.After(100 * time.Second):
retry = true retry = true
node.Consensus.ResetState() node.Consensus.ResetState()
timeoutCount++ timeoutCount++
@ -365,9 +372,9 @@ func (node *Node) UpdateUtxoAndState(newBlock *blockchain.Block) {
// //fmt.Println(newBlock.Transactions) // //fmt.Println(newBlock.Transactions)
// fmt.Printf("LEADER CURRENT UTXO - %d\n", node.UtxoPool.ShardID) // fmt.Printf("LEADER CURRENT UTXO - %d\n", node.UtxoPool.ShardID)
// fmt.Println(node.UtxoPool.CountNumOfUtxos()) // fmt.Println(node.UtxoPool.CountNumOfUtxos())
// //fmt.Println(node.UtxoPool) // fmt.Println(node.UtxoPool)
// fmt.Printf("LEADER LOCKED UTXO - %d\n", node.UtxoPool.ShardID) // fmt.Printf("LEADER LOCKED UTXO - %d\n", node.UtxoPool.ShardID)
// fmt.Println(node.UtxoPool.CountNumOfLockedUtxos()) // fmt.Println(node.UtxoPool.CountNumOfLockedUtxos())
// //fmt.Println(node.UtxoPool.StringOfLockedUtxos()) // fmt.Println(node.UtxoPool.StringOfLockedUtxos())
//} //}
} }

Loading…
Cancel
Save