add some more State and set up syncing in each node

pull/104/head^2
Minh Doan 6 years ago
parent 9a3b178ff4
commit 45e0eeb728
  1. 5
      benchmark.go
  2. 60
      node/node.go
  3. 11
      node/node_handler.go
  4. 4
      syncing/downloader/server.go

@ -230,5 +230,8 @@ func main() {
} }
} }
currentNode.StartServer(*port) go currentNode.StartServer(*port)
go currentNode.StartHelpSyncing()
// Keep waiting.
<-make(chan struct{})
} }

@ -31,17 +31,26 @@ import (
downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto" downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto"
) )
type NodeState byte // State is a state of a node.
type State byte
// All constants except the NodeLeader below are for validators only.
const ( const (
NodeInit NodeState = iota // Node just started, before contacting BeaconChain NodeInit State = iota // Node just started, before contacting BeaconChain
NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard
NodeJoinedShard // Node joined Shard, ready for consensus NodeJoinedShard // Node joined Shard, ready for consensus
NodeOffline // Node is offline NodeOffline // Node is offline
NodeReadyForConsensus // Node is ready to do consensus NodeReadyForConsensus // Node is ready to do consensus
NodeDoingConsensus // Node is already doing consensus NodeDoingConsensus // Node is already doing consensus
NodeLeader // Node is the leader of some shard.
) )
const (
// TimeToSleepForSyncing is the time waiting for node transformed into NodeDoingConsensus
TimeToSleepForSyncing = time.Second * 30
)
// NetworkNode is TODO(leo)
type NetworkNode struct { type NetworkNode struct {
SelfPeer p2p.Peer SelfPeer p2p.Peer
IDCPeer p2p.Peer IDCPeer p2p.Peer
@ -71,7 +80,7 @@ type Node struct {
SyncNode bool // TODO(minhdoan): Remove it later. SyncNode bool // TODO(minhdoan): Remove it later.
chain *core.BlockChain // Account Model chain *core.BlockChain // Account Model
Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
State NodeState // State of the Node State State // State of the Node
// Account Model // Account Model
pendingTransactionsAccount types.Transactions // TODO: replace with txPool pendingTransactionsAccount types.Transactions // TODO: replace with txPool
@ -150,6 +159,7 @@ func (node *Node) StartServer(port string) {
node.listenOnPort(port) node.listenOnPort(port)
} }
// SetLog sets log for Node.
func (node *Node) SetLog() *Node { func (node *Node) SetLog() *Node {
node.log = log.New() node.log = log.New()
return node return node
@ -205,7 +215,7 @@ func (node *Node) countNumTransactionsInBlockchainAccount() int {
return count return count
} }
//ConnectIdentityChain connects to identity chain //ConnectBeaconChain connects to identity chain
func (node *Node) ConnectBeaconChain() { func (node *Node) ConnectBeaconChain() {
Nnode := &NetworkNode{SelfPeer: node.SelfPeer, IDCPeer: node.IDCPeer} Nnode := &NetworkNode{SelfPeer: node.SelfPeer, IDCPeer: node.IDCPeer}
msg := node.SerializeNode(Nnode) msg := node.SerializeNode(Nnode)
@ -298,12 +308,16 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
} }
// Logger // Logger
node.log = log.New() node.log = log.New()
node.State = NodeInit if consensus.IsLeader {
node.State = NodeLeader
} else {
node.State = NodeInit
}
return &node return &node
} }
// Add neighbors nodes // AddPeers adds neighbors nodes
func (node *Node) AddPeers(peers []p2p.Peer) int { func (node *Node) AddPeers(peers []p2p.Peer) int {
count := 0 count := 0
for _, p := range peers { for _, p := range peers {
@ -337,10 +351,28 @@ func (node *Node) JoinShard(leader p2p.Peer) {
} }
} }
// StartDownloaderServer starts downloader server. // StartHelpSyncing keeps sleeping until it's doing consensus or it's a leader.
func (node *Node) StartDownloaderServer() { func (node *Node) StartHelpSyncing() {
for {
time.Sleep(TimeToSleepForSyncing)
//
if node.State == NodeDoingConsensus || node.State == NodeLeader {
node.InitSyncingServer()
node.StartHelpSyncing()
break
}
}
}
// InitSyncingServer starts downloader server.
func (node *Node) InitSyncingServer() {
node.downloaderServer = downloader.NewServer(node) node.downloaderServer = downloader.NewServer(node)
// node.downloaderServer.Start(node.) }
// StartSyncingServer starts syncing server.
func (node *Node) StartSyncingServer() {
// Handles returned grpcServer??
node.downloaderServer.Start("localhost", downloader.DefaultDownloadPort)
} }
// CalculateResponse implements DownloadInterface on Node object. // CalculateResponse implements DownloadInterface on Node object.

@ -5,14 +5,15 @@ import (
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/pki"
"net" "net"
"os" "os"
"strconv" "strconv"
"time" "time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/blockchain"
hmy_crypto "github.com/harmony-one/harmony/crypto" hmy_crypto "github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -371,7 +372,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
} }
} }
// WaitForConsensusReady ... // WaitForConsensusReadyAccount ...
func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) { func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) {
node.log.Debug("Waiting for Consensus ready", "node", node) node.log.Debug("Waiting for Consensus ready", "node", node)
@ -465,7 +466,7 @@ func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool {
return node.UtxoPool.VerifyTransactions(newBlock.Transactions) return node.UtxoPool.VerifyTransactions(newBlock.Transactions)
} }
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on // VerifyNewBlockAccount is called by consensus participants to verify the block (account model) they are running consensus on
func (node *Node) VerifyNewBlockAccount(newBlock *types.Block) bool { func (node *Node) VerifyNewBlockAccount(newBlock *types.Block) bool {
err := node.Chain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey)) err := node.Chain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey))
if err != nil { if err != nil {

@ -11,6 +11,10 @@ import (
pb "github.com/harmony-one/harmony/syncing/downloader/proto" pb "github.com/harmony-one/harmony/syncing/downloader/proto"
) )
const (
DefaultDownloadPort = "8888"
)
// Server is the Server struct for downloader package. // Server is the Server struct for downloader package.
type Server struct { type Server struct {
downloadInterface DownloadInterface downloadInterface DownloadInterface

Loading…
Cancel
Save