using syncing package instead

pull/69/head
Minh Doan 6 years ago
parent 2d46c18170
commit 1d89e89231
  1. 3
      node/node.go
  2. 68
      syncing/syncing.go

@ -20,6 +20,7 @@ import (
"github.com/simple-rules/harmony-benchmark/pow" "github.com/simple-rules/harmony-benchmark/pow"
"github.com/simple-rules/harmony-benchmark/proto/identity" "github.com/simple-rules/harmony-benchmark/proto/identity"
proto_node "github.com/simple-rules/harmony-benchmark/proto/node" proto_node "github.com/simple-rules/harmony-benchmark/proto/node"
"github.com/simple-rules/harmony-benchmark/syncing"
) )
// Node represents a program (machine) participating in the network // Node represents a program (machine) participating in the network
@ -95,7 +96,7 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Trans
// Start a server and process the request by a handler. // Start a server and process the request by a handler.
func (node *Node) StartServer(port string) { func (node *Node) StartServer(port string) {
if node.SyncNode { if node.SyncNode {
node.startBlockSyncing() node.blockchain = syncing.StartBlockSyncing(node.Consensus.GetValidatorPeers())
} }
fmt.Println("Hello in server now") fmt.Println("Hello in server now")
node.log.Debug("Starting server", "node", node, "port", port) node.log.Debug("Starting server", "node", node, "port", port)

@ -1,4 +1,4 @@
package sync package syncing
import ( import (
"bufio" "bufio"
@ -8,7 +8,6 @@ import (
"github.com/Workiva/go-datastructures/queue" "github.com/Workiva/go-datastructures/queue"
"github.com/simple-rules/harmony-benchmark/blockchain" "github.com/simple-rules/harmony-benchmark/blockchain"
"github.com/simple-rules/harmony-benchmark/node"
"github.com/simple-rules/harmony-benchmark/p2p" "github.com/simple-rules/harmony-benchmark/p2p"
proto_node "github.com/simple-rules/harmony-benchmark/proto/node" proto_node "github.com/simple-rules/harmony-benchmark/proto/node"
) )
@ -33,8 +32,7 @@ type SyncConfig struct {
peers []SyncPeerConfig peers []SyncPeerConfig
} }
func StartBlockSyncing(node *node.Node) *blockchain.Blockchain { func StartBlockSyncing(peers []p2p.Peer) *blockchain.Blockchain {
peers := node.Consensus.GetValidatorPeers()
peer_number := len(peers) peer_number := len(peers)
syncConfig := SyncConfig{ syncConfig := SyncConfig{
peers: make([]SyncPeerConfig, peer_number), peers: make([]SyncPeerConfig, peer_number),
@ -117,41 +115,37 @@ TASK_LOOP:
bc := &blockchain.Blockchain{ bc := &blockchain.Blockchain{
Blocks: make([]*blockchain.Block, blockSize), Blocks: make([]*blockchain.Block, blockSize),
} }
// loop to do syncing. wg.Add(activePeerNumber)
for { for _, configPeer := range syncConfig.peers {
var wg sync.WaitGroup if configPeer.err != nil {
wg.Add(activePeerNumber) continue
for _, configPeer := range syncConfig.peers {
if configPeer.err != nil {
continue
}
go func(peerConfig *SyncPeerConfig, taskSyncQueue *queue.Queue, bc *blockchain.Blockchain) {
defer wg.Done()
for !taskSyncQueue.Empty() {
task, err := taskSyncQueue.Poll(1, time.Millisecond)
if err == queue.ErrTimeout {
break
}
syncTask := task[0].(SyncBlockTask)
msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_BLOCK, syncTask.blockHash)
peerConfig.w.Write(msg)
peerConfig.w.Flush()
var content []byte
content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn)
if peerConfig.err != nil {
peerConfig.trusted = false
return
}
block, err := blockchain.DeserializeBlock(content)
if err == nil {
bc.Blocks[syncTask.index] = block
}
}
}(&configPeer, taskSyncQueue, bc)
} }
wg.Wait() go func(peerConfig *SyncPeerConfig, taskSyncQueue *queue.Queue, bc *blockchain.Blockchain) {
defer wg.Done()
for !taskSyncQueue.Empty() {
task, err := taskSyncQueue.Poll(1, time.Millisecond)
if err == queue.ErrTimeout {
break
}
syncTask := task[0].(SyncBlockTask)
msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_BLOCK, syncTask.blockHash)
peerConfig.w.Write(msg)
peerConfig.w.Flush()
var content []byte
content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn)
if peerConfig.err != nil {
peerConfig.trusted = false
return
}
block, err := blockchain.DeserializeBlock(content)
if err == nil {
bc.Blocks[syncTask.index] = block
}
}
}(&configPeer, taskSyncQueue, bc)
} }
wg.Wait()
return bc
} }
func getConsensus(syncConfig *SyncConfig) bool { func getConsensus(syncConfig *SyncConfig) bool {
Loading…
Cancel
Save