From c097113ca5127d0f31cd012499cc9ec181b78554 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Wed, 12 Sep 2018 23:27:53 -0700 Subject: [PATCH] add sync package --- node/node.go | 7 ++- sync/sync.go | 159 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+), 3 deletions(-) create mode 100644 sync/sync.go diff --git a/node/node.go b/node/node.go index 3190d6c85..18bf763e6 100644 --- a/node/node.go +++ b/node/node.go @@ -128,7 +128,7 @@ func (node *Node) getConsensus(syncConfig *SyncConfig) bool { return true } -func (node *Node) startBlockSyncing() { +func (node *Node) startBlockSyncing() *blockchain.Blockchain { peers := node.Consensus.GetValidatorPeers() peer_number := len(peers) syncConfig := SyncConfig{ @@ -209,7 +209,7 @@ TASK_LOOP: } } // Initialize blockchain - node.blockchain = &blockchain.Blockchain{ + bc := &blockchain.Blockchain{ Blocks: make([]*blockchain.Block, blockSize), } // loop to do syncing. @@ -243,10 +243,11 @@ TASK_LOOP: bc.Blocks[syncTask.index] = block } } - }(&configPeer, taskSyncQueue, node.blockchain) + }(&configPeer, taskSyncQueue, bc) } wg.Wait() } + return bc } func (node *Node) String() string { diff --git a/sync/sync.go b/sync/sync.go new file mode 100644 index 000000000..2aefc2edf --- /dev/null +++ b/sync/sync.go @@ -0,0 +1,159 @@ +package sync + +import ( + "bufio" + "net" + "sync" + "time" + + "github.com/Workiva/go-datastructures/queue" + "github.com/simple-rules/harmony-benchmark/blockchain" + "github.com/simple-rules/harmony-benchmark/node" + "github.com/simple-rules/harmony-benchmark/p2p" + proto_node "github.com/simple-rules/harmony-benchmark/proto/node" +) + +type SyncPeerConfig struct { + peer p2p.Peer + conn net.Conn + block *blockchain.Block + w *bufio.Writer + receivedMsg []byte + err error + trusted bool + indexes []uint16 + blockHashes [][32]byte +} + +type SyncBlockTask struct { + index int + blockHash [32]byte +} +type SyncConfig struct { + peers []SyncPeerConfig +} + +func StartBlockSyncing(node *node.Node) *blockchain.Blockchain { + peers := node.Consensus.GetValidatorPeers() + peer_number := len(peers) + syncConfig := SyncConfig{ + peers: make([]SyncPeerConfig, peer_number), + } + for id := range syncConfig.peers { + syncConfig.peers[id].peer = peers[id] + syncConfig.peers[id].trusted = false + } + + var wg sync.WaitGroup + wg.Add(peer_number) + + for id := range syncConfig.peers { + go func(peerConfig *SyncPeerConfig) { + defer wg.Done() + peerConfig.conn, peerConfig.err = p2p.DialWithSocketClient(peerConfig.peer.Ip, peerConfig.peer.Port) + }(&syncConfig.peers[id]) + } + wg.Wait() + + activePeerNumber := 0 + for _, configPeer := range syncConfig.peers { + if configPeer.err == nil { + activePeerNumber++ + configPeer.w = bufio.NewWriter(configPeer.conn) + configPeer.trusted = true + } + } + + // Looping to get an array of block hashes from honest nodes. +LOOP_HONEST_NODE: + for { + var wg sync.WaitGroup + wg.Add(activePeerNumber) + + for _, configPeer := range syncConfig.peers { + if configPeer.err != nil { + continue + } + go func(peerConfig *SyncPeerConfig) { + defer wg.Done() + msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_LAST_BLOCK_HASHES, [32]byte{}) + peerConfig.w.Write(msg) + peerConfig.w.Flush() + var content []byte + content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn) + if peerConfig.err != nil { + peerConfig.trusted = false + return + } + var blockchainSyncMessage *proto_node.BlockchainSyncMessage + blockchainSyncMessage, peerConfig.err = proto_node.DeserializeBlockchainSyncMessage(content) + if peerConfig.err != nil { + peerConfig.trusted = false + return + } + peerConfig.blockHashes = blockchainSyncMessage.BlockHashes + }(&configPeer) + } + wg.Wait() + + if getConsensus(&syncConfig) { + break LOOP_HONEST_NODE + } + } + + taskSyncQueue := queue.New(0) + blockSize := 0 +TASK_LOOP: + for _, configPeer := range syncConfig.peers { + if configPeer.trusted { + for id, blockHash := range configPeer.blockHashes { + taskSyncQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) + } + blockSize = len(configPeer.blockHashes) + break TASK_LOOP + } + } + // Initialize blockchain + bc := &blockchain.Blockchain{ + Blocks: make([]*blockchain.Block, blockSize), + } + // loop to do syncing. + for { + var wg sync.WaitGroup + wg.Add(activePeerNumber) + + for _, configPeer := range syncConfig.peers { + if configPeer.err != nil { + continue + } + go func(peerConfig *SyncPeerConfig, taskSyncQueue *queue.Queue, bc *blockchain.Blockchain) { + defer wg.Done() + for !taskSyncQueue.Empty() { + task, err := taskSyncQueue.Poll(1, time.Millisecond) + if err == queue.ErrTimeout { + break + } + syncTask := task[0].(SyncBlockTask) + msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_BLOCK, syncTask.blockHash) + peerConfig.w.Write(msg) + peerConfig.w.Flush() + var content []byte + content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn) + if peerConfig.err != nil { + peerConfig.trusted = false + return + } + block, err := blockchain.DeserializeBlock(content) + if err == nil { + bc.Blocks[syncTask.index] = block + } + } + }(&configPeer, taskSyncQueue, bc) + } + wg.Wait() + } +} + +func getConsensus(syncConfig *SyncConfig) bool { + return true +}