diff --git a/node/node.go b/node/node.go index c69fe07bc..b2cbddbd6 100644 --- a/node/node.go +++ b/node/node.go @@ -20,6 +20,7 @@ import ( "github.com/simple-rules/harmony-benchmark/pow" "github.com/simple-rules/harmony-benchmark/proto/identity" proto_node "github.com/simple-rules/harmony-benchmark/proto/node" + "github.com/simple-rules/harmony-benchmark/syncing" ) // Node represents a program (machine) participating in the network @@ -95,7 +96,7 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Trans // Start a server and process the request by a handler. func (node *Node) StartServer(port string) { if node.SyncNode { - node.startBlockSyncing() + node.blockchain = syncing.StartBlockSyncing(node.Consensus.GetValidatorPeers()) } fmt.Println("Hello in server now") node.log.Debug("Starting server", "node", node, "port", port) diff --git a/sync/sync.go b/syncing/syncing.go similarity index 71% rename from sync/sync.go rename to syncing/syncing.go index 2aefc2edf..51dd9d1e1 100644 --- a/sync/sync.go +++ b/syncing/syncing.go @@ -1,4 +1,4 @@ -package sync +package syncing import ( "bufio" @@ -8,7 +8,6 @@ import ( "github.com/Workiva/go-datastructures/queue" "github.com/simple-rules/harmony-benchmark/blockchain" - "github.com/simple-rules/harmony-benchmark/node" "github.com/simple-rules/harmony-benchmark/p2p" proto_node "github.com/simple-rules/harmony-benchmark/proto/node" ) @@ -33,8 +32,7 @@ type SyncConfig struct { peers []SyncPeerConfig } -func StartBlockSyncing(node *node.Node) *blockchain.Blockchain { - peers := node.Consensus.GetValidatorPeers() +func StartBlockSyncing(peers []p2p.Peer) *blockchain.Blockchain { peer_number := len(peers) syncConfig := SyncConfig{ peers: make([]SyncPeerConfig, peer_number), @@ -117,41 +115,37 @@ TASK_LOOP: bc := &blockchain.Blockchain{ Blocks: make([]*blockchain.Block, blockSize), } - // loop to do syncing. - for { - var wg sync.WaitGroup - wg.Add(activePeerNumber) - - for _, configPeer := range syncConfig.peers { - if configPeer.err != nil { - continue - } - go func(peerConfig *SyncPeerConfig, taskSyncQueue *queue.Queue, bc *blockchain.Blockchain) { - defer wg.Done() - for !taskSyncQueue.Empty() { - task, err := taskSyncQueue.Poll(1, time.Millisecond) - if err == queue.ErrTimeout { - break - } - syncTask := task[0].(SyncBlockTask) - msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_BLOCK, syncTask.blockHash) - peerConfig.w.Write(msg) - peerConfig.w.Flush() - var content []byte - content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn) - if peerConfig.err != nil { - peerConfig.trusted = false - return - } - block, err := blockchain.DeserializeBlock(content) - if err == nil { - bc.Blocks[syncTask.index] = block - } - } - }(&configPeer, taskSyncQueue, bc) + wg.Add(activePeerNumber) + for _, configPeer := range syncConfig.peers { + if configPeer.err != nil { + continue } - wg.Wait() + go func(peerConfig *SyncPeerConfig, taskSyncQueue *queue.Queue, bc *blockchain.Blockchain) { + defer wg.Done() + for !taskSyncQueue.Empty() { + task, err := taskSyncQueue.Poll(1, time.Millisecond) + if err == queue.ErrTimeout { + break + } + syncTask := task[0].(SyncBlockTask) + msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_BLOCK, syncTask.blockHash) + peerConfig.w.Write(msg) + peerConfig.w.Flush() + var content []byte + content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn) + if peerConfig.err != nil { + peerConfig.trusted = false + return + } + block, err := blockchain.DeserializeBlock(content) + if err == nil { + bc.Blocks[syncTask.index] = block + } + } + }(&configPeer, taskSyncQueue, bc) } + wg.Wait() + return bc } func getConsensus(syncConfig *SyncConfig) bool {