pull/69/head
Minh Doan 6 years ago
parent 1d89e89231
commit 945de9f852
  1. 146
      node/node.go

@ -1,15 +1,12 @@
package node package node
import ( import (
"bufio"
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"net" "net"
"sync" "sync"
"time"
"github.com/Workiva/go-datastructures/queue"
"github.com/simple-rules/harmony-benchmark/blockchain" "github.com/simple-rules/harmony-benchmark/blockchain"
"github.com/simple-rules/harmony-benchmark/client" "github.com/simple-rules/harmony-benchmark/client"
"github.com/simple-rules/harmony-benchmark/consensus" "github.com/simple-rules/harmony-benchmark/consensus"
@ -19,7 +16,6 @@ import (
"github.com/simple-rules/harmony-benchmark/p2p" "github.com/simple-rules/harmony-benchmark/p2p"
"github.com/simple-rules/harmony-benchmark/pow" "github.com/simple-rules/harmony-benchmark/pow"
"github.com/simple-rules/harmony-benchmark/proto/identity" "github.com/simple-rules/harmony-benchmark/proto/identity"
proto_node "github.com/simple-rules/harmony-benchmark/proto/node"
"github.com/simple-rules/harmony-benchmark/syncing" "github.com/simple-rules/harmony-benchmark/syncing"
) )
@ -46,26 +42,6 @@ type Node struct {
SyncNode bool // TODO(minhdoan): Remove it later. SyncNode bool // TODO(minhdoan): Remove it later.
} }
type SyncPeerConfig struct {
peer p2p.Peer
conn net.Conn
block *blockchain.Block
w *bufio.Writer
receivedMsg []byte
err error
trusted bool
indexes []uint16
blockHashes [][32]byte
}
type SyncBlockTask struct {
index int
blockHash [32]byte
}
type SyncConfig struct {
peers []SyncPeerConfig
}
// Add new crossTx and proofs to the list of crossTx that needs to be sent back to client // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client
func (node *Node) addCrossTxsToReturn(crossTxs []*blockchain.CrossShardTxAndProof) { func (node *Node) addCrossTxsToReturn(crossTxs []*blockchain.CrossShardTxAndProof) {
node.crossTxToReturnMutex.Lock() node.crossTxToReturnMutex.Lock()
@ -125,128 +101,6 @@ func (node *Node) listenOnPort(port string) {
} }
} }
func (node *Node) getConsensus(syncConfig *SyncConfig) bool {
return true
}
func (node *Node) startBlockSyncing() *blockchain.Blockchain {
peers := node.Consensus.GetValidatorPeers()
peer_number := len(peers)
syncConfig := SyncConfig{
peers: make([]SyncPeerConfig, peer_number),
}
for id := range syncConfig.peers {
syncConfig.peers[id].peer = peers[id]
syncConfig.peers[id].trusted = false
}
var wg sync.WaitGroup
wg.Add(peer_number)
for id := range syncConfig.peers {
go func(peerConfig *SyncPeerConfig) {
defer wg.Done()
peerConfig.conn, peerConfig.err = p2p.DialWithSocketClient(peerConfig.peer.Ip, peerConfig.peer.Port)
}(&syncConfig.peers[id])
}
wg.Wait()
activePeerNumber := 0
for _, configPeer := range syncConfig.peers {
if configPeer.err == nil {
activePeerNumber++
configPeer.w = bufio.NewWriter(configPeer.conn)
configPeer.trusted = true
}
}
// Looping to get an array of block hashes from honest nodes.
LOOP_HONEST_NODE:
for {
var wg sync.WaitGroup
wg.Add(activePeerNumber)
for _, configPeer := range syncConfig.peers {
if configPeer.err != nil {
continue
}
go func(peerConfig *SyncPeerConfig) {
defer wg.Done()
msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_LAST_BLOCK_HASHES, [32]byte{})
peerConfig.w.Write(msg)
peerConfig.w.Flush()
var content []byte
content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn)
if peerConfig.err != nil {
peerConfig.trusted = false
return
}
var blockchainSyncMessage *proto_node.BlockchainSyncMessage
blockchainSyncMessage, peerConfig.err = proto_node.DeserializeBlockchainSyncMessage(content)
if peerConfig.err != nil {
peerConfig.trusted = false
return
}
peerConfig.blockHashes = blockchainSyncMessage.BlockHashes
}(&configPeer)
}
wg.Wait()
if node.getConsensus(&syncConfig) {
break LOOP_HONEST_NODE
}
}
taskSyncQueue := queue.New(0)
blockSize := 0
TASK_LOOP:
for _, configPeer := range syncConfig.peers {
if configPeer.trusted {
for id, blockHash := range configPeer.blockHashes {
taskSyncQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
}
blockSize = len(configPeer.blockHashes)
break TASK_LOOP
}
}
// Initialize blockchain
bc := &blockchain.Blockchain{
Blocks: make([]*blockchain.Block, blockSize),
}
wg.Add(activePeerNumber)
for _, configPeer := range syncConfig.peers {
if configPeer.err != nil {
continue
}
go func(peerConfig *SyncPeerConfig, taskSyncQueue *queue.Queue, bc *blockchain.Blockchain) {
defer wg.Done()
for !taskSyncQueue.Empty() {
task, err := taskSyncQueue.Poll(1, time.Millisecond)
if err == queue.ErrTimeout {
break
}
syncTask := task[0].(SyncBlockTask)
msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_BLOCK, syncTask.blockHash)
peerConfig.w.Write(msg)
peerConfig.w.Flush()
var content []byte
content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn)
if peerConfig.err != nil {
peerConfig.trusted = false
return
}
block, err := blockchain.DeserializeBlock(content)
if err == nil {
bc.Blocks[syncTask.index] = block
}
}
}(&configPeer, taskSyncQueue, bc)
}
wg.Wait()
return bc
}
func (node *Node) String() string { func (node *Node) String() string {
return node.Consensus.String() return node.Consensus.String()
} }

Loading…
Cancel
Save