add logic for syncing

pull/69/head
Minh Doan 6 years ago
parent 665a427100
commit e7cb022721
  1. 2
      consensus/consensus.go
  2. 6
      consensus/consensus_leader.go
  3. 78
      node/node.go
  4. 40
      node/node_handler.go
  5. 7
      p2p/peer.go
  6. 23
      proto/node/node.go

@ -172,7 +172,7 @@ func (consensus *Consensus) signMessage(message []byte) []byte {
return signature
}
func (consensus *Consensus) getValidatorPeers() []p2p.Peer {
func (consensus *Consensus) GetValidatorPeers() []p2p.Peer {
validatorPeers := make([]p2p.Peer, 0)
for _, validatorPeer := range consensus.validators {
validatorPeers = append(validatorPeers, validatorPeer)

@ -86,7 +86,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
consensus.blockHeader = byteBuffer.Bytes()
msgToSend := consensus.constructAnnounceMessage()
p2p.BroadcastMessage(consensus.getValidatorPeers(), msgToSend)
p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend)
// Set state to ANNOUNCE_DONE
consensus.state = ANNOUNCE_DONE
consensus.commitByLeader(true)
@ -202,7 +202,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con
consensus.responseByLeader(challengeScalar, targetState == CHALLENGE_DONE)
// Broadcast challenge message
p2p.BroadcastMessage(consensus.getValidatorPeers(), msgToSend)
p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend)
// Set state to targetState (CHALLENGE_DONE or FINAL_CHALLENGE_DONE)
consensus.state = targetState
@ -347,7 +347,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C
// Start the second round of Cosi
msgToSend := consensus.constructCollectiveSigMessage(collectiveSig, bitmap)
p2p.BroadcastMessage(consensus.getValidatorPeers(), msgToSend)
p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend)
consensus.commitByLeader(false)
} else {
consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(*responses))

@ -1,22 +1,23 @@
package node
import (
"bufio"
"bytes"
"encoding/gob"
"fmt"
"net"
"sync"
"github.com/simple-rules/harmony-benchmark/crypto/pki"
"github.com/simple-rules/harmony-benchmark/pow"
"github.com/simple-rules/harmony-benchmark/proto/identity"
"github.com/simple-rules/harmony-benchmark/blockchain"
"github.com/simple-rules/harmony-benchmark/client"
"github.com/simple-rules/harmony-benchmark/consensus"
"github.com/simple-rules/harmony-benchmark/crypto/pki"
"github.com/simple-rules/harmony-benchmark/db"
"github.com/simple-rules/harmony-benchmark/log"
"github.com/simple-rules/harmony-benchmark/p2p"
"github.com/simple-rules/harmony-benchmark/pow"
"github.com/simple-rules/harmony-benchmark/proto/identity"
proto_node "github.com/simple-rules/harmony-benchmark/proto/node"
)
// Node represents a program (machine) participating in the network
@ -42,6 +43,19 @@ type Node struct {
SyncNode bool // TODO(minhdoan): Remove it later.
}
type SyncPeerConfig struct {
peer p2p.Peer
conn net.Conn
block *blockchain.Block
w *bufio.Writer
receivedMsg []byte
err error
}
type SyncConfig struct {
peers []SyncPeerConfig
}
// Add new crossTx and proofs to the list of crossTx that needs to be sent back to client
func (node *Node) addCrossTxsToReturn(crossTxs []*blockchain.CrossShardTxAndProof) {
node.crossTxToReturnMutex.Lock()
@ -101,8 +115,64 @@ func (node *Node) listenOnPort(port string) {
}
}
func (node *Node) startBlockSyncing() {
peers := node.Consensus.GetValidatorPeers()
peer_number := len(peers)
syncConfig := SyncConfig{
peers: make([]SyncPeerConfig, peer_number),
}
for id := range syncConfig.peers {
syncConfig.peers[id].peer = peers[id]
}
var wg sync.WaitGroup
wg.Add(peer_number)
for id := range syncConfig.peers {
go func(peerConfig *SyncPeerConfig) {
defer wg.Done()
peerConfig.conn, peerConfig.err = p2p.DialWithSocketClient(peerConfig.peer.Ip, peerConfig.peer.Port)
}(&syncConfig.peers[id])
}
wg.Wait()
activePeerNumber := 0
for _, configPeer := range syncConfig.peers {
if configPeer.err == nil {
activePeerNumber++
configPeer.w = bufio.NewWriter(configPeer.conn)
}
}
var prevHash [32]byte
for {
var wg sync.WaitGroup
wg.Add(activePeerNumber)
for _, configPeer := range syncConfig.peers {
if configPeer.err != nil {
continue
}
go func(peerConfig *SyncPeerConfig, prevHash [32]byte) {
msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_BLOCK, prevHash)
peerConfig.w.Write(msg)
peerConfig.w.Flush()
}(&configPeer, prevHash)
}
wg.Wait()
wg.Add(activePeerNumber)
for _, configPeer := range syncConfig.peers {
if configPeer.err != nil {
continue
}
go func(peerConfig *SyncPeerConfig, prevHash [32]byte) {
defer wg.Done()
peerConfig.receivedMsg, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn)
if peerConfig.err == nil {
peerConfig.block, peerConfig.err = blockchain.DeserializeBlock(peerConfig.receivedMsg)
}
}(&configPeer, prevHash)
}
wg.Wait()
}
}

@ -172,10 +172,48 @@ func (node *Node) NodeHandler(conn net.Conn) {
func (node *Node) handleBlockchainSync(payload []byte, conn net.Conn) {
// TODO(minhdoan): Looking to removing this.
w := bufio.NewWriter(conn)
for _, block := range node.blockchain.Blocks {
FOR_LOOP:
for {
syncMsgType := proto_node.BlockchainSyncMessageType(payload[0])
switch syncMsgType {
case proto_node.GET_BLOCK:
block := node.blockchain.FindBlock(payload[1:33])
w.Write(block.Serialize())
w.Flush()
case proto_node.GET_LAST_BLOCK_HASH:
block := node.blockchain.GetLatestBlock()
w.Write(block.Serialize())
w.Flush()
case proto_node.DONE:
break FOR_LOOP
}
content, err := p2p.ReadMessageContent(conn)
if err != nil {
node.log.Error("Failed in reading message content from syncing node", err)
return
}
msgCategory, _ := proto.GetMessageCategory(content)
if err != nil || msgCategory != proto.NODE {
node.log.Error("Failed in reading message category from syncing node", err)
return
}
msgType, err := proto.GetMessageType(content)
actionType := proto_node.NodeMessageType(msgType)
if err != nil || actionType != proto_node.BLOCKCHAIN_SYNC {
node.log.Error("Failed in reading message type from syncing node", err)
return
}
payload, err = proto.GetMessagePayload(content)
if err != nil {
node.log.Error("Failed in reading payload from syncing node", err)
return
}
}
node.log.Info("HOORAY: Done sending info to syncing node.")
}
func (node *Node) transactionMessageHandler(msgPayload []byte) {

@ -91,3 +91,10 @@ func send(ip, port string, message []byte) (returnMessage string) {
sendWithSocketClient(ip, port, message)
return
}
func DialWithSocketClient(ip, port string) (conn net.Conn, err error) {
//log.Printf("Sending message to ip %s and port %s\n", ip, port)
addr := strings.Join([]string{ip, port}, ":")
conn, err = net.Dial("tcp", addr)
return
}

@ -21,6 +21,19 @@ const (
// TODO: add more types
)
type BlockchainSyncMessage struct {
msgType BlockchainSyncMessageType
blockHash [32]byte
block *blockchain.Block
}
type BlockchainSyncMessageType int
const (
DONE BlockchainSyncMessageType = iota
GET_LAST_BLOCK_HASH
GET_BLOCK
)
// The types of messages used for NODE/TRANSACTION
type TransactionMessageType int
@ -98,13 +111,21 @@ func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []b
}
// Constructs Blockchain Sync Message.
func ConstructBlockchainSyncMessage(blockHash [32]byte) []byte {
func ConstructBlockchainSyncMessage(msgType BlockchainSyncMessageType, blockHash [32]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})
byteBuffer.WriteByte(byte(BLOCKCHAIN_SYNC))
byteBuffer.WriteByte(byte(msgType))
byteBuffer.Write(blockHash[:])
return byteBuffer.Bytes()
}
func GenerateBlockchainSyncMessage(payload []byte) *BlockchainSyncMessage {
dec := gob.NewDecoder(bytes.NewBuffer(payload))
var res BlockchainSyncMessage
dec.Decode(&res)
return &res
}
// Constructs serialized transactions
func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})

Loading…
Cancel
Save