@ -15,6 +15,7 @@ import (
proto_node "github.com/harmony-one/harmony/api/proto/node"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/host"
)
)
@ -40,7 +41,7 @@ func (node *Node) StreamHandler(s p2p.Stream) {
content , err := p2p . ReadMessageContent ( s )
content , err := p2p . ReadMessageContent ( s )
if err != nil {
if err != nil {
node . log . Error ( "Read p2p data failed" , "err" , err , "node" , node )
utils . GetLogInstance ( ) . Error ( "Read p2p data failed" , "err" , err , "node" , node )
return
return
}
}
node . MaybeBroadcastAsValidator ( content )
node . MaybeBroadcastAsValidator ( content )
@ -49,19 +50,19 @@ func (node *Node) StreamHandler(s p2p.Stream) {
msgCategory , err := proto . GetMessageCategory ( content )
msgCategory , err := proto . GetMessageCategory ( content )
if err != nil {
if err != nil {
node . log . Error ( "Read node type failed" , "err" , err , "node" , node )
utils . GetLogInstance ( ) . Error ( "Read node type failed" , "err" , err , "node" , node )
return
return
}
}
msgType , err := proto . GetMessageType ( content )
msgType , err := proto . GetMessageType ( content )
if err != nil {
if err != nil {
node . log . Error ( "Read action type failed" , "err" , err , "node" , node )
utils . GetLogInstance ( ) . Error ( "Read action type failed" , "err" , err , "node" , node )
return
return
}
}
msgPayload , err := proto . GetMessagePayload ( content )
msgPayload , err := proto . GetMessagePayload ( content )
if err != nil {
if err != nil {
node . log . Error ( "Read message payload failed" , "err" , err , "node" , node )
utils . GetLogInstance ( ) . Error ( "Read message payload failed" , "err" , err , "node" , node )
return
return
}
}
@ -74,18 +75,18 @@ func (node *Node) StreamHandler(s p2p.Stream) {
switch messageType {
switch messageType {
case proto_identity . Register :
case proto_identity . Register :
fmt . Println ( "received a identity message" )
fmt . Println ( "received a identity message" )
node . log . Info ( "NET: received message: IDENTITY/REGISTER" )
utils . GetLogInstance ( ) . Info ( "NET: received message: IDENTITY/REGISTER" )
default :
default :
node . log . Error ( "Announce message should be sent to IdentityChain" )
utils . GetLogInstance ( ) . Error ( "Announce message should be sent to IdentityChain" )
}
}
}
}
case proto . Consensus :
case proto . Consensus :
msgPayload , _ := proto . GetConsensusMessagePayload ( content )
msgPayload , _ := proto . GetConsensusMessagePayload ( content )
if consensusObj . IsLeader {
if consensusObj . IsLeader {
node . log . Info ( "NET: Leader received message:" , "messageCategory" , msgCategory , "messageType" , msgType )
utils . GetLogInstance ( ) . Info ( "NET: Leader received message:" , "messageCategory" , msgCategory , "messageType" , msgType )
consensusObj . ProcessMessageLeader ( msgPayload )
consensusObj . ProcessMessageLeader ( msgPayload )
} else {
} else {
node . log . Info ( "NET: Validator received message:" , "messageCategory" , msgCategory , "messageType" , msgType )
utils . GetLogInstance ( ) . Info ( "NET: Validator received message:" , "messageCategory" , msgCategory , "messageType" , msgType )
consensusObj . ProcessMessageValidator ( msgPayload )
consensusObj . ProcessMessageValidator ( msgPayload )
// TODO(minhdoan): add logic to check if the current blockchain is not sync with other consensus
// TODO(minhdoan): add logic to check if the current blockchain is not sync with other consensus
// we should switch to other state rather than DoingConsensus.
// we should switch to other state rather than DoingConsensus.
@ -94,17 +95,17 @@ func (node *Node) StreamHandler(s p2p.Stream) {
actionType := proto_node . MessageType ( msgType )
actionType := proto_node . MessageType ( msgType )
switch actionType {
switch actionType {
case proto_node . Transaction :
case proto_node . Transaction :
node . log . Info ( "NET: received message: Node/Transaction" )
utils . GetLogInstance ( ) . Info ( "NET: received message: Node/Transaction" )
node . transactionMessageHandler ( msgPayload )
node . transactionMessageHandler ( msgPayload )
case proto_node . Block :
case proto_node . Block :
node . log . Info ( "NET: received message: Node/Block" )
utils . GetLogInstance ( ) . Info ( "NET: received message: Node/Block" )
blockMsgType := proto_node . BlockMessageType ( msgPayload [ 0 ] )
blockMsgType := proto_node . BlockMessageType ( msgPayload [ 0 ] )
switch blockMsgType {
switch blockMsgType {
case proto_node . Sync :
case proto_node . Sync :
var blocks [ ] * types . Block
var blocks [ ] * types . Block
err := rlp . DecodeBytes ( msgPayload [ 1 : ] , & blocks ) // skip the Sync messge type
err := rlp . DecodeBytes ( msgPayload [ 1 : ] , & blocks ) // skip the Sync messge type
if err != nil {
if err != nil {
node . log . Error ( "block sync" , "error" , err )
utils . GetLogInstance ( ) . Error ( "block sync" , "error" , err )
} else {
} else {
if node . Client != nil && node . Client . UpdateBlocks != nil && blocks != nil {
if node . Client != nil && node . Client . UpdateBlocks != nil && blocks != nil {
node . Client . UpdateBlocks ( blocks )
node . Client . UpdateBlocks ( blocks )
@ -112,10 +113,10 @@ func (node *Node) StreamHandler(s p2p.Stream) {
}
}
}
}
case proto_node . Control :
case proto_node . Control :
node . log . Info ( "NET: received message: Node/Control" )
utils . GetLogInstance ( ) . Info ( "NET: received message: Node/Control" )
controlType := msgPayload [ 0 ]
controlType := msgPayload [ 0 ]
if proto_node . ControlMessageType ( controlType ) == proto_node . STOP {
if proto_node . ControlMessageType ( controlType ) == proto_node . STOP {
node . log . Debug ( "Stopping Node" , "node" , node , "numBlocks" , node . blockchain . CurrentBlock ( ) . NumberU64 ( ) , "numTxsProcessed" , node . countNumTransactionsInBlockchain ( ) )
utils . GetLogInstance ( ) . Debug ( "Stopping Node" , "node" , node , "numBlocks" , node . blockchain . CurrentBlock ( ) . NumberU64 ( ) , "numTxsProcessed" , node . countNumTransactionsInBlockchain ( ) )
var avgBlockSizeInBytes common . StorageSize
var avgBlockSizeInBytes common . StorageSize
txCount := 0
txCount := 0
@ -135,7 +136,7 @@ func (node *Node) StreamHandler(s p2p.Stream) {
avgTxSize = avgTxSize / txCount
avgTxSize = avgTxSize / txCount
}
}
node . log . Debug ( "Blockchain Report" , "totalNumBlocks" , blockCount , "avgBlockSizeInCurrentEpoch" , avgBlockSizeInBytes , "totalNumTxs" , txCount , "avgTxSzieInCurrentEpoch" , avgTxSize )
utils . GetLogInstance ( ) . Debug ( "Blockchain Report" , "totalNumBlocks" , blockCount , "avgBlockSizeInCurrentEpoch" , avgBlockSizeInBytes , "totalNumTxs" , txCount , "avgTxSzieInCurrentEpoch" , avgTxSize )
os . Exit ( 0 )
os . Exit ( 0 )
}
}
@ -145,7 +146,7 @@ func (node *Node) StreamHandler(s p2p.Stream) {
node . pongMessageHandler ( msgPayload )
node . pongMessageHandler ( msgPayload )
}
}
default :
default :
node . log . Error ( "Unknown" , "MsgCategory" , msgCategory )
utils . GetLogInstance ( ) . Error ( "Unknown" , "MsgCategory" , msgCategory )
}
}
}
}
@ -157,7 +158,7 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
txs := types . Transactions { }
txs := types . Transactions { }
err := rlp . Decode ( bytes . NewReader ( msgPayload [ 1 : ] ) , & txs ) // skip the Send messge type
err := rlp . Decode ( bytes . NewReader ( msgPayload [ 1 : ] ) , & txs ) // skip the Send messge type
if err != nil {
if err != nil {
node . log . Error ( "Failed to deserialize transaction list" , "error" , err )
utils . GetLogInstance ( ) . Error ( "Failed to deserialize transaction list" , "error" , err )
}
}
node . addPendingTransactions ( txs )
node . addPendingTransactions ( txs )
@ -187,7 +188,7 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
// WaitForConsensusReady listen for the readiness signal from consensus and generate new block for consensus.
// WaitForConsensusReady listen for the readiness signal from consensus and generate new block for consensus.
func ( node * Node ) WaitForConsensusReady ( readySignal chan struct { } ) {
func ( node * Node ) WaitForConsensusReady ( readySignal chan struct { } ) {
node . log . Debug ( "Waiting for Consensus ready" , "node" , node )
utils . GetLogInstance ( ) . Debug ( "Waiting for Consensus ready" , "node" , node )
time . Sleep ( 15 * time . Second ) // Wait for other nodes to be ready (test-only)
time . Sleep ( 15 * time . Second ) // Wait for other nodes to be ready (test-only)
firstTime := true
firstTime := true
@ -201,7 +202,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
case <- time . After ( 200 * time . Second ) :
case <- time . After ( 200 * time . Second ) :
node . Consensus . ResetState ( )
node . Consensus . ResetState ( )
timeoutCount ++
timeoutCount ++
node . log . Debug ( "Consensus timeout, retry!" , "count" , timeoutCount , "node" , node )
utils . GetLogInstance ( ) . Debug ( "Consensus timeout, retry!" , "count" , timeoutCount , "node" , node )
}
}
for {
for {
@ -211,7 +212,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
threshold = 2
threshold = 2
firstTime = false
firstTime = false
}
}
node . log . Debug ( "STARTING BLOCK" , "threshold" , threshold , "pendingTransactions" , len ( node . pendingTransactions ) )
utils . GetLogInstance ( ) . Debug ( "STARTING BLOCK" , "threshold" , threshold , "pendingTransactions" , len ( node . pendingTransactions ) )
if len ( node . pendingTransactions ) >= threshold {
if len ( node . pendingTransactions ) >= threshold {
// Normal tx block consensus
// Normal tx block consensus
selectedTxs := node . getTransactionsForNewBlock ( MaxNumberOfTransactionsPerBlock )
selectedTxs := node . getTransactionsForNewBlock ( MaxNumberOfTransactionsPerBlock )
@ -219,7 +220,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
node . Worker . CommitTransactions ( selectedTxs )
node . Worker . CommitTransactions ( selectedTxs )
block , err := node . Worker . Commit ( )
block , err := node . Worker . Commit ( )
if err != nil {
if err != nil {
node . log . Debug ( "Failed commiting new block" , "Error" , err )
utils . GetLogInstance ( ) . Debug ( "Failed commiting new block" , "Error" , err )
} else {
} else {
newBlock = block
newBlock = block
break
break
@ -242,7 +243,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
// TODO (lc): broadcast the new blocks to new nodes doing state sync
// TODO (lc): broadcast the new blocks to new nodes doing state sync
func ( node * Node ) BroadcastNewBlock ( newBlock * types . Block ) {
func ( node * Node ) BroadcastNewBlock ( newBlock * types . Block ) {
if node . ClientPeer != nil {
if node . ClientPeer != nil {
node . log . Debug ( "Sending new block to client" , "client" , node . ClientPeer )
utils . GetLogInstance ( ) . Debug ( "Sending new block to client" , "client" , node . ClientPeer )
node . SendMessage ( * node . ClientPeer , proto_node . ConstructBlocksSyncMessage ( [ ] * types . Block { newBlock } ) )
node . SendMessage ( * node . ClientPeer , proto_node . ConstructBlocksSyncMessage ( [ ] * types . Block { newBlock } ) )
}
}
}
}
@ -251,13 +252,13 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
func ( node * Node ) VerifyNewBlock ( newBlock * types . Block ) bool {
func ( node * Node ) VerifyNewBlock ( newBlock * types . Block ) bool {
err := node . blockchain . ValidateNewBlock ( newBlock , pki . GetAddressFromPublicKey ( node . SelfPeer . PubKey ) )
err := node . blockchain . ValidateNewBlock ( newBlock , pki . GetAddressFromPublicKey ( node . SelfPeer . PubKey ) )
if err != nil {
if err != nil {
node . log . Debug ( "Failed verifying new block" , "Error" , err , "tx" , newBlock . Transactions ( ) [ 0 ] )
utils . GetLogInstance ( ) . Debug ( "Failed verifying new block" , "Error" , err , "tx" , newBlock . Transactions ( ) [ 0 ] )
// send consensus block to state syncing
// send consensus block to state syncing
select {
select {
case node . Consensus . ConsensusBlock <- newBlock :
case node . Consensus . ConsensusBlock <- newBlock :
default :
default :
node . log . Warn ( "consensus block unable to sent to state sync" , "height" , newBlock . NumberU64 ( ) , "blockHash" , newBlock . Hash ( ) . Hex ( ) )
utils . GetLogInstance ( ) . Warn ( "consensus block unable to sent to state sync" , "height" , newBlock . NumberU64 ( ) , "blockHash" , newBlock . Hash ( ) . Hex ( ) )
}
}
return false
return false
@ -281,19 +282,19 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
func ( node * Node ) AddNewBlock ( newBlock * types . Block ) {
func ( node * Node ) AddNewBlock ( newBlock * types . Block ) {
blockNum , err := node . blockchain . InsertChain ( [ ] * types . Block { newBlock } )
blockNum , err := node . blockchain . InsertChain ( [ ] * types . Block { newBlock } )
if err != nil {
if err != nil {
node . log . Debug ( "Error adding new block to blockchain" , "blockNum" , blockNum , "Error" , err )
utils . GetLogInstance ( ) . Debug ( "Error adding new block to blockchain" , "blockNum" , blockNum , "Error" , err )
} else {
} else {
node . log . Info ( "adding new block to blockchain" , "blockNum" , blockNum )
utils . GetLogInstance ( ) . Info ( "adding new block to blockchain" , "blockNum" , blockNum )
}
}
}
}
func ( node * Node ) pingMessageHandler ( msgPayload [ ] byte ) int {
func ( node * Node ) pingMessageHandler ( msgPayload [ ] byte ) int {
ping , err := proto_node . GetPingMessage ( msgPayload )
ping , err := proto_node . GetPingMessage ( msgPayload )
if err != nil {
if err != nil {
node . log . Error ( "Can't get Ping Message" )
utils . GetLogInstance ( ) . Error ( "Can't get Ping Message" )
return - 1
return - 1
}
}
// node.log .Info("Ping", "Msg", ping)
// utils.GetLogInstance() .Info("Ping", "Msg", ping)
peer := new ( p2p . Peer )
peer := new ( p2p . Peer )
peer . IP = ping . Node . IP
peer . IP = ping . Node . IP
@ -304,12 +305,12 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
peer . PubKey = & bls . PublicKey { }
peer . PubKey = & bls . PublicKey { }
err = peer . PubKey . Deserialize ( ping . Node . PubKey [ : ] )
err = peer . PubKey . Deserialize ( ping . Node . PubKey [ : ] )
if err != nil {
if err != nil {
node . log . Error ( "UnmarshalBinary Failed" , "error" , err )
utils . GetLogInstance ( ) . Error ( "UnmarshalBinary Failed" , "error" , err )
return - 1
return - 1
}
}
if ping . Node . Role == proto_node . ClientRole {
if ping . Node . Role == proto_node . ClientRole {
node . log . Info ( "Add Client Peer to Node" , "Node" , node . Consensus . GetNodeID ( ) , "Client" , peer )
utils . GetLogInstance ( ) . Info ( "Add Client Peer to Node" , "Node" , node . Consensus . GetNodeID ( ) , "Client" , peer )
node . ClientPeer = peer
node . ClientPeer = peer
return 0
return 0
}
}
@ -341,11 +342,11 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
func ( node * Node ) pongMessageHandler ( msgPayload [ ] byte ) int {
func ( node * Node ) pongMessageHandler ( msgPayload [ ] byte ) int {
pong , err := proto_node . GetPongMessage ( msgPayload )
pong , err := proto_node . GetPongMessage ( msgPayload )
if err != nil {
if err != nil {
node . log . Error ( "Can't get Pong Message" )
utils . GetLogInstance ( ) . Error ( "Can't get Pong Message" )
return - 1
return - 1
}
}
// node.log .Debug("pongMessageHandler", "pong", pong, "nodeID", node.Consensus.GetNodeID())
// utils.GetLogInstance() .Debug("pongMessageHandler", "pong", pong, "nodeID", node.Consensus.GetNodeID())
peers := make ( [ ] * p2p . Peer , 0 )
peers := make ( [ ] * p2p . Peer , 0 )
@ -359,7 +360,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
peer . PubKey = & bls . PublicKey { }
peer . PubKey = & bls . PublicKey { }
err = peer . PubKey . Deserialize ( p . PubKey [ : ] )
err = peer . PubKey . Deserialize ( p . PubKey [ : ] )
if err != nil {
if err != nil {
node . log . Error ( "UnmarshalBinary Failed" , "error" , err )
utils . GetLogInstance ( ) . Error ( "UnmarshalBinary Failed" , "error" , err )
continue
continue
}
}
peers = append ( peers , peer )
peers = append ( peers , peer )
@ -379,7 +380,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
key := bls . PublicKey { }
key := bls . PublicKey { }
err = key . Deserialize ( k [ : ] )
err = key . Deserialize ( k [ : ] )
if err != nil {
if err != nil {
node . log . Error ( "UnmarshalBinary Failed PubKeys" , "error" , err )
utils . GetLogInstance ( ) . Error ( "UnmarshalBinary Failed PubKeys" , "error" , err )
continue
continue
}
}
publicKeys = append ( publicKeys , & key )
publicKeys = append ( publicKeys , & key )