diff --git a/consensus/consensus.go b/consensus/consensus.go index 04277c494..fc188610f 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -172,7 +172,7 @@ func (consensus *Consensus) signMessage(message []byte) []byte { return signature } -func (consensus *Consensus) getValidatorPeers() []p2p.Peer { +func (consensus *Consensus) GetValidatorPeers() []p2p.Peer { validatorPeers := make([]p2p.Peer, 0) for _, validatorPeer := range consensus.validators { validatorPeers = append(validatorPeers, validatorPeer) diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index c824e7425..c219f4c8a 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -86,7 +86,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { consensus.blockHeader = byteBuffer.Bytes() msgToSend := consensus.constructAnnounceMessage() - p2p.BroadcastMessage(consensus.getValidatorPeers(), msgToSend) + p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend) // Set state to ANNOUNCE_DONE consensus.state = ANNOUNCE_DONE consensus.commitByLeader(true) @@ -202,7 +202,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con consensus.responseByLeader(challengeScalar, targetState == CHALLENGE_DONE) // Broadcast challenge message - p2p.BroadcastMessage(consensus.getValidatorPeers(), msgToSend) + p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend) // Set state to targetState (CHALLENGE_DONE or FINAL_CHALLENGE_DONE) consensus.state = targetState @@ -347,7 +347,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C // Start the second round of Cosi msgToSend := consensus.constructCollectiveSigMessage(collectiveSig, bitmap) - p2p.BroadcastMessage(consensus.getValidatorPeers(), msgToSend) + p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend) consensus.commitByLeader(false) } else { consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(*responses)) diff --git a/node/node.go b/node/node.go index 7792688ab..e2177a161 100644 --- a/node/node.go +++ b/node/node.go @@ -1,22 +1,23 @@ package node import ( + "bufio" "bytes" "encoding/gob" "fmt" "net" "sync" - "github.com/simple-rules/harmony-benchmark/crypto/pki" - "github.com/simple-rules/harmony-benchmark/pow" - "github.com/simple-rules/harmony-benchmark/proto/identity" - "github.com/simple-rules/harmony-benchmark/blockchain" "github.com/simple-rules/harmony-benchmark/client" "github.com/simple-rules/harmony-benchmark/consensus" + "github.com/simple-rules/harmony-benchmark/crypto/pki" "github.com/simple-rules/harmony-benchmark/db" "github.com/simple-rules/harmony-benchmark/log" "github.com/simple-rules/harmony-benchmark/p2p" + "github.com/simple-rules/harmony-benchmark/pow" + "github.com/simple-rules/harmony-benchmark/proto/identity" + proto_node "github.com/simple-rules/harmony-benchmark/proto/node" ) // Node represents a program (machine) participating in the network @@ -42,6 +43,19 @@ type Node struct { SyncNode bool // TODO(minhdoan): Remove it later. } +type SyncPeerConfig struct { + peer p2p.Peer + conn net.Conn + block *blockchain.Block + w *bufio.Writer + receivedMsg []byte + err error +} + +type SyncConfig struct { + peers []SyncPeerConfig +} + // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client func (node *Node) addCrossTxsToReturn(crossTxs []*blockchain.CrossShardTxAndProof) { node.crossTxToReturnMutex.Lock() @@ -101,8 +115,64 @@ func (node *Node) listenOnPort(port string) { } } func (node *Node) startBlockSyncing() { + peers := node.Consensus.GetValidatorPeers() + peer_number := len(peers) + syncConfig := SyncConfig{ + peers: make([]SyncPeerConfig, peer_number), + } + for id := range syncConfig.peers { + syncConfig.peers[id].peer = peers[id] + } + + var wg sync.WaitGroup + wg.Add(peer_number) + + for id := range syncConfig.peers { + go func(peerConfig *SyncPeerConfig) { + defer wg.Done() + peerConfig.conn, peerConfig.err = p2p.DialWithSocketClient(peerConfig.peer.Ip, peerConfig.peer.Port) + }(&syncConfig.peers[id]) + } + wg.Wait() + + activePeerNumber := 0 + for _, configPeer := range syncConfig.peers { + if configPeer.err == nil { + activePeerNumber++ + configPeer.w = bufio.NewWriter(configPeer.conn) + } + } + var prevHash [32]byte for { + var wg sync.WaitGroup + wg.Add(activePeerNumber) + for _, configPeer := range syncConfig.peers { + if configPeer.err != nil { + continue + } + go func(peerConfig *SyncPeerConfig, prevHash [32]byte) { + msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_BLOCK, prevHash) + peerConfig.w.Write(msg) + peerConfig.w.Flush() + }(&configPeer, prevHash) + } + wg.Wait() + wg.Add(activePeerNumber) + + for _, configPeer := range syncConfig.peers { + if configPeer.err != nil { + continue + } + go func(peerConfig *SyncPeerConfig, prevHash [32]byte) { + defer wg.Done() + peerConfig.receivedMsg, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn) + if peerConfig.err == nil { + peerConfig.block, peerConfig.err = blockchain.DeserializeBlock(peerConfig.receivedMsg) + } + }(&configPeer, prevHash) + } + wg.Wait() } } diff --git a/node/node_handler.go b/node/node_handler.go index 547688745..57c6c97fe 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -172,10 +172,48 @@ func (node *Node) NodeHandler(conn net.Conn) { func (node *Node) handleBlockchainSync(payload []byte, conn net.Conn) { // TODO(minhdoan): Looking to removing this. w := bufio.NewWriter(conn) - for _, block := range node.blockchain.Blocks { - w.Write(block.Serialize()) - w.Flush() +FOR_LOOP: + for { + syncMsgType := proto_node.BlockchainSyncMessageType(payload[0]) + switch syncMsgType { + case proto_node.GET_BLOCK: + block := node.blockchain.FindBlock(payload[1:33]) + w.Write(block.Serialize()) + w.Flush() + case proto_node.GET_LAST_BLOCK_HASH: + block := node.blockchain.GetLatestBlock() + w.Write(block.Serialize()) + w.Flush() + case proto_node.DONE: + break FOR_LOOP + } + content, err := p2p.ReadMessageContent(conn) + + if err != nil { + node.log.Error("Failed in reading message content from syncing node", err) + return + } + + msgCategory, _ := proto.GetMessageCategory(content) + if err != nil || msgCategory != proto.NODE { + node.log.Error("Failed in reading message category from syncing node", err) + return + } + + msgType, err := proto.GetMessageType(content) + actionType := proto_node.NodeMessageType(msgType) + if err != nil || actionType != proto_node.BLOCKCHAIN_SYNC { + node.log.Error("Failed in reading message type from syncing node", err) + return + } + + payload, err = proto.GetMessagePayload(content) + if err != nil { + node.log.Error("Failed in reading payload from syncing node", err) + return + } } + node.log.Info("HOORAY: Done sending info to syncing node.") } func (node *Node) transactionMessageHandler(msgPayload []byte) { diff --git a/p2p/peer.go b/p2p/peer.go index bdbb66f0b..96c60d592 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -91,3 +91,10 @@ func send(ip, port string, message []byte) (returnMessage string) { sendWithSocketClient(ip, port, message) return } + +func DialWithSocketClient(ip, port string) (conn net.Conn, err error) { + //log.Printf("Sending message to ip %s and port %s\n", ip, port) + addr := strings.Join([]string{ip, port}, ":") + conn, err = net.Dial("tcp", addr) + return +} diff --git a/proto/node/node.go b/proto/node/node.go index 4bba05cec..71c9c5c18 100644 --- a/proto/node/node.go +++ b/proto/node/node.go @@ -21,6 +21,19 @@ const ( // TODO: add more types ) +type BlockchainSyncMessage struct { + msgType BlockchainSyncMessageType + blockHash [32]byte + block *blockchain.Block +} +type BlockchainSyncMessageType int + +const ( + DONE BlockchainSyncMessageType = iota + GET_LAST_BLOCK_HASH + GET_BLOCK +) + // The types of messages used for NODE/TRANSACTION type TransactionMessageType int @@ -98,13 +111,21 @@ func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []b } // Constructs Blockchain Sync Message. -func ConstructBlockchainSyncMessage(blockHash [32]byte) []byte { +func ConstructBlockchainSyncMessage(msgType BlockchainSyncMessageType, blockHash [32]byte) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer.WriteByte(byte(BLOCKCHAIN_SYNC)) + byteBuffer.WriteByte(byte(msgType)) byteBuffer.Write(blockHash[:]) return byteBuffer.Bytes() } +func GenerateBlockchainSyncMessage(payload []byte) *BlockchainSyncMessage { + dec := gob.NewDecoder(bytes.NewBuffer(payload)) + var res BlockchainSyncMessage + dec.Decode(&res) + return &res +} + // Constructs serialized transactions func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})