@ -1,7 +1,6 @@
package node
import (
"bufio"
"bytes"
"encoding/gob"
"fmt"
@ -10,6 +9,8 @@ import (
"strconv"
"time"
"github.com/harmony-one/harmony/p2pv2"
"github.com/dedis/kyber"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/blockchain"
@ -22,6 +23,7 @@ import (
"github.com/harmony-one/harmony/proto/consensus"
proto_identity "github.com/harmony-one/harmony/proto/identity"
proto_node "github.com/harmony-one/harmony/proto/node"
netp2p "github.com/libp2p/go-libp2p-net"
)
const (
@ -76,7 +78,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
switch msgCategory {
case proto . Identity :
actionType := proto_identity . Identity MessageType ( msgType )
actionType := proto_identity . ID MessageType ( msgType )
switch actionType {
case proto_identity . Identity :
messageType := proto_identity . MessageType ( msgPayload [ 0 ] )
@ -91,7 +93,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
}
}
case proto . Consensus :
actionType := consensus . Consensus MessageType ( msgType )
actionType := consensus . ConMessageType ( msgType )
switch actionType {
case consensus . Consensus :
if consensusObj . IsLeader {
@ -120,9 +122,6 @@ func (node *Node) NodeHandler(conn net.Conn) {
node . Client . UpdateBlocks ( * blocks )
}
}
case proto_node . BlockchainSync :
node . log . Info ( "NET: received message: Node/BlockchainSync" )
node . handleBlockchainSync ( msgPayload , conn )
case proto_node . Client :
node . log . Info ( "NET: received message: Node/Client" )
clientMsgType := proto_node . ClientMessageType ( msgPayload [ 0 ] )
@ -193,7 +192,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
node . pongMessageHandler ( msgPayload )
}
case proto . Client :
actionType := client . Client MessageType( msgType )
actionType := client . MessageType ( msgType )
node . log . Info ( "NET: received message: Client/Transaction" )
switch actionType {
case client . Transaction :
@ -206,55 +205,167 @@ func (node *Node) NodeHandler(conn net.Conn) {
}
}
// Refactor by moving this code into a sync package.
func ( node * Node ) handleBlockchainSync ( payload [ ] byte , conn net . Conn ) {
// TODO(minhdoan): Looking to removing this.
w := bufio . NewWriter ( conn )
FOR_LOOP :
for {
syncMsgType := proto_node . BlockchainSyncMessageType ( payload [ 0 ] )
switch syncMsgType {
case proto_node . GetBlock :
block := node . blockchain . FindBlock ( payload [ 1 : 33 ] )
w . Write ( block . Serialize ( ) )
w . Flush ( )
case proto_node . GetLastBlockHashes :
blockchainSyncMessage := proto_node . BlockchainSyncMessage {
BlockHeight : len ( node . blockchain . Blocks ) ,
BlockHashes : node . blockchain . GetBlockHashes ( ) ,
}
w . Write ( proto_node . SerializeBlockchainSyncMessage ( & blockchainSyncMessage ) )
w . Flush ( )
case proto_node . Done :
break FOR_LOOP
}
content , err := p2p . ReadMessageContent ( conn )
// NodeHandler handles a new incoming connection.
func ( node * Node ) NodeHandlerV1 ( s netp2p . Stream ) {
defer s . Close ( )
// Read p2p message payload
content , err := p2pv2 . ReadData ( s )
if err != nil {
node . log . Error ( "Failed in reading message content from syncing node" , err )
node . log . Error ( "Read p2p data failed" , "err" , err , "node" , node )
return
}
// TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p.
node . MaybeBroadcastAsValidator ( content )
consensusObj := node . Consensus
msgCategory , _ := proto . GetMessageCategory ( content )
if err != nil || msgCategory != proto . Node {
node . log . Error ( "Failed in reading message category from syncing node" , err )
msgCategory , err := proto . GetMessageCategory ( content )
if err != nil {
node . log . Error ( "Read node type failed" , "err" , err , "node" , node )
return
}
msgType , err := proto . GetMessageType ( content )
actionType := proto_node . MessageType ( msgType )
if err != nil || actionType != proto_node . BlockchainSync {
node . log . Error ( "Failed in reading message type from syncing node" , err )
if err != nil {
node . log . Error ( "Read action type failed" , "err" , err , "node" , node )
return
}
payload , err = proto . GetMessagePayload ( content )
msgPayload , err : = proto . GetMessagePayload ( content )
if err != nil {
node . log . Error ( "Failed in reading payload from syncing node" , err )
node . log . Error ( "Read message payload failed" , "err" , err , " node" , nod e)
return
}
switch msgCategory {
case proto . Identity :
actionType := proto_identity . IDMessageType ( msgType )
switch actionType {
case proto_identity . Identity :
messageType := proto_identity . MessageType ( msgPayload [ 0 ] )
switch messageType {
case proto_identity . Register :
fmt . Println ( "received a identity message" )
// TODO(ak): fix it.
// node.processPOWMessage(msgPayload)
node . log . Info ( "NET: received message: IDENTITY/REGISTER" )
default :
node . log . Error ( "Announce message should be sent to IdentityChain" )
}
}
case proto . Consensus :
actionType := consensus . ConMessageType ( msgType )
switch actionType {
case consensus . Consensus :
if consensusObj . IsLeader {
node . log . Info ( "NET: received message: Consensus/Leader" )
consensusObj . ProcessMessageLeader ( msgPayload )
} else {
node . log . Info ( "NET: received message: Consensus/Validator" )
consensusObj . ProcessMessageValidator ( msgPayload )
}
}
case proto . Node :
actionType := proto_node . MessageType ( msgType )
switch actionType {
case proto_node . Transaction :
node . log . Info ( "NET: received message: Node/Transaction" )
node . transactionMessageHandler ( msgPayload )
case proto_node . Block :
node . log . Info ( "NET: received message: Node/Block" )
blockMsgType := proto_node . BlockMessageType ( msgPayload [ 0 ] )
switch blockMsgType {
case proto_node . Sync :
decoder := gob . NewDecoder ( bytes . NewReader ( msgPayload [ 1 : ] ) ) // skip the Sync messge type
blocks := new ( [ ] * blockchain . Block )
decoder . Decode ( blocks )
if node . Client != nil && node . Client . UpdateBlocks != nil && blocks != nil {
node . Client . UpdateBlocks ( * blocks )
}
}
case proto_node . Client :
node . log . Info ( "NET: received message: Node/Client" )
clientMsgType := proto_node . ClientMessageType ( msgPayload [ 0 ] )
switch clientMsgType {
case proto_node . LookupUtxo :
decoder := gob . NewDecoder ( bytes . NewReader ( msgPayload [ 1 : ] ) ) // skip the LookupUtxo messge type
fetchUtxoMessage := new ( proto_node . FetchUtxoMessage )
decoder . Decode ( fetchUtxoMessage )
utxoMap := node . UtxoPool . GetUtxoMapByAddresses ( fetchUtxoMessage . Addresses )
p2p . SendMessage ( fetchUtxoMessage . Sender , client . ConstructFetchUtxoResponseMessage ( & utxoMap , node . UtxoPool . ShardID ) )
}
case proto_node . Control :
node . log . Info ( "NET: received message: Node/Control" )
controlType := msgPayload [ 0 ]
if proto_node . ControlMessageType ( controlType ) == proto_node . STOP {
if node . Chain == nil {
node . log . Debug ( "Stopping Node" , "node" , node , "numBlocks" , len ( node . blockchain . Blocks ) , "numTxsProcessed" , node . countNumTransactionsInBlockchain ( ) )
sizeInBytes := node . UtxoPool . GetSizeInByteOfUtxoMap ( )
node . log . Debug ( "UtxoPool Report" , "numEntries" , len ( node . UtxoPool . UtxoMap ) , "sizeInBytes" , sizeInBytes )
avgBlockSizeInBytes := 0
txCount := 0
blockCount := 0
totalTxCount := 0
totalBlockCount := 0
avgTxSize := 0
for _ , block := range node . blockchain . Blocks {
if block . IsStateBlock ( ) {
totalTxCount += int ( block . State . NumTransactions )
totalBlockCount += int ( block . State . NumBlocks )
} else {
byteBuffer := bytes . NewBuffer ( [ ] byte { } )
encoder := gob . NewEncoder ( byteBuffer )
encoder . Encode ( block )
avgBlockSizeInBytes += len ( byteBuffer . Bytes ( ) )
txCount += len ( block . Transactions )
blockCount ++
totalTxCount += len ( block . TransactionIds )
totalBlockCount ++
byteBuffer = bytes . NewBuffer ( [ ] byte { } )
encoder = gob . NewEncoder ( byteBuffer )
encoder . Encode ( block . Transactions )
avgTxSize += len ( byteBuffer . Bytes ( ) )
}
}
if blockCount != 0 {
avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount
avgTxSize = avgTxSize / txCount
}
node . log . Debug ( "Blockchain Report" , "totalNumBlocks" , totalBlockCount , "avgBlockSizeInCurrentEpoch" , avgBlockSizeInBytes , "totalNumTxs" , totalTxCount , "avgTxSzieInCurrentEpoch" , avgTxSize )
} else {
node . log . Debug ( "Stopping Node (Account Model)" , "node" , node , "CurBlockNum" , node . Chain . CurrentHeader ( ) . Number , "numTxsProcessed" , node . countNumTransactionsInBlockchainAccount ( ) )
}
os . Exit ( 0 )
}
case proto_node . PING :
node . pingMessageHandler ( msgPayload )
case proto_node . PONG :
node . pongMessageHandler ( msgPayload )
}
case proto . Client :
actionType := client . MessageType ( msgType )
node . log . Info ( "NET: received message: Client/Transaction" )
switch actionType {
case client . Transaction :
if node . Client != nil {
node . Client . TransactionMessageHandler ( msgPayload )
}
}
default :
node . log . Error ( "Unknown" , "MsgCateory:" , msgCategory )
}
node . log . Info ( "HOORAY: Done sending info to syncing node." )
}
func ( node * Node ) transactionMessageHandler ( msgPayload [ ] byte ) {