From b6dd823812e3fc752e70189bef9a266f37e4d1c3 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Fri, 30 Nov 2018 15:48:54 -0800 Subject: [PATCH 01/12] write get peers to do syncing --- benchmark.go | 3 +++ node/node.go | 28 +++++++++++++++++++++------- node/node_handler.go | 6 ++++++ node/node_test.go | 38 +++++++++++++++++++++++++++----------- proto/node/node.go | 3 +-- 5 files changed, 58 insertions(+), 20 deletions(-) diff --git a/benchmark.go b/benchmark.go index 419d3d912..9ef1edfb5 100644 --- a/benchmark.go +++ b/benchmark.go @@ -203,6 +203,7 @@ func main() { currentNode.State = node.NodeWaitToJoin if consensus.IsLeader { + currentNode.State = node.NodeLeader if *accountModel { // Let consensus run go func() { @@ -225,6 +226,8 @@ func main() { } else { if *peerDisvoery { go currentNode.JoinShard(leader) + } else { + node.State = node.NodeDoingConsensus } } diff --git a/node/node.go b/node/node.go index c7ff2ac56..b39c57f69 100644 --- a/node/node.go +++ b/node/node.go @@ -38,13 +38,12 @@ type State byte // All constants except the NodeLeader below are for validators only. const ( - NodeInit State = iota // Node just started, before contacting BeaconChain - NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard - NodeJoinedShard // Node joined Shard, ready for consensus - NodeOffline // Node is offline - NodeReadyForConsensus // Node is ready to do consensus - NodeDoingConsensus // Node is already doing consensus - NodeLeader // Node is the leader of some shard. + NodeInit State = iota // Node just started, before contacting BeaconChain + NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard + NodeJoinedShard // Node joined Shard, ready for consensus + NodeOffline // Node is offline + NodeDoingConsensus // Node is already doing consensus + NodeLeader // Node is the leader of some shard. ) const ( @@ -324,6 +323,11 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node return &node } +// DoSyncing starts syncing. +func (node *Node) DoSyncing() { + +} + // AddPeers adds neighbors nodes func (node *Node) AddPeers(peers []p2p.Peer) int { count := 0 @@ -342,6 +346,16 @@ func (node *Node) AddPeers(peers []p2p.Peer) int { return count } +// GetPeers returns list of peers. +func (node *Node) GetPeers() []p2p.Peer { + res := []p2p.Peer{} + node.Neighbors.Range(func(k, v interface{}) bool { + res = append(res, v.(p2p.Peer)) + return true + }) + return res +} + // JoinShard helps a new node to join a shard. func (node *Node) JoinShard(leader p2p.Peer) { // try to join the shard, with 10 minutes time-out diff --git a/node/node_handler.go b/node/node_handler.go index 54626dab7..e1d9f7d81 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -90,6 +90,12 @@ func (node *Node) NodeHandler(conn net.Conn) { } } case proto.Consensus: + if !(node.State == NodeDoingConsensus || node.State == NodeLeader) { + if node.State == NodeJoinedShard { + + } + return + } actionType := consensus.ConMessageType(msgType) switch actionType { case consensus.Consensus: diff --git a/node/node_test.go b/node/node_test.go index bb0458b4b..d27a60cbc 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -15,34 +15,34 @@ import ( proto_node "github.com/harmony-one/harmony/proto/node" ) -func TestNewNewNode(test *testing.T) { +func TestNewNewNode(t *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) node := New(consensus, nil, leader) if node.Consensus == nil { - test.Error("Consensus is not initialized for the node") + t.Error("Consensus is not initialized for the node") } if node.blockchain == nil { - test.Error("Blockchain is not initialized for the node") + t.Error("Blockchain is not initialized for the node") } if len(node.blockchain.Blocks) != 1 { - test.Error("Genesis block is not initialized for the node") + t.Error("Genesis block is not initialized for the node") } if len(node.blockchain.Blocks[0].Transactions) != 1 { - test.Error("Coinbase TX is not initialized for the node") + t.Error("Coinbase TX is not initialized for the node") } if node.UtxoPool == nil { - test.Error("Utxo pool is not initialized for the node") + t.Error("Utxo pool is not initialized for the node") } } -func TestCountNumTransactionsInBlockchain(test *testing.T) { +func TestCountNumTransactionsInBlockchain(t *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) @@ -50,11 +50,27 @@ func TestCountNumTransactionsInBlockchain(test *testing.T) { node := New(consensus, nil, leader) node.AddTestingAddresses(1000) if node.countNumTransactionsInBlockchain() != 1001 { - test.Error("Count of transactions in the blockchain is incorrect") + t.Error("Count of transactions in the blockchain is incorrect") } } -func TestAddPeers(test *testing.T) { +func TestGetPeers(t *testing.T) { + leader := p2p.Peer{IP: "1", Port: "2"} + validator := p2p.Peer{IP: "3", Port: "5"} + consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) + + node := New(consensus, nil, leader) + peer := p2p.Peer{IP: "1.1.1.1"} + peer2 := p2p.Peer{IP: "2.1.1.1"} + node.Neighbors.Store("minh", peer) + node.Neighbors.Store("mark", peer2) + res := node.GetPeers() + if len(res) != 2 || res[0] != peer || res[1] != peer2 { + t.Error("GetPeers should return list of {peer, peer2}") + } +} + +func TestAddPeers(t *testing.T) { priKey1 := crypto.Ed25519Curve.Scalar().SetInt64(int64(333)) pubKey1 := pki.GetPublicKeyFromScalar(priKey1) @@ -85,12 +101,12 @@ func TestAddPeers(test *testing.T) { r1 := node.AddPeers(peers1) e1 := 2 if r1 != e1 { - test.Errorf("Add %v peers, expectd %v", r1, e1) + t.Errorf("Add %v peers, expectd %v", r1, e1) } r2 := node.AddPeers(peers1) e2 := 0 if r2 != e2 { - test.Errorf("Add %v peers, expectd %v", r2, e2) + t.Errorf("Add %v peers, expectd %v", r2, e2) } } diff --git a/proto/node/node.go b/proto/node/node.go index d440231cd..32f480d9c 100644 --- a/proto/node/node.go +++ b/proto/node/node.go @@ -3,7 +3,6 @@ package node import ( "bytes" "encoding/gob" - "fmt" "log" "github.com/ethereum/go-ethereum/rlp" @@ -161,7 +160,7 @@ func ConstructTransactionListMessageAccount(transactions types.Transactions) []b txs, err := rlp.EncodeToBytes(transactions) if err != nil { - fmt.Errorf("ERROR RLP %s", err) + log.Fatal(err) return []byte{} // TODO(RJ): better handle of the error } byteBuffer.Write(txs) From 461db024bc21f6559b741d0b9717cc1b35aedeb6 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Fri, 30 Nov 2018 16:11:34 -0800 Subject: [PATCH 02/12] add dosyncing for node and fix test of getpeer --- benchmark.go | 3 --- node/node.go | 28 ++++++++++++++++++++++------ node/node_test.go | 2 +- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/benchmark.go b/benchmark.go index 9ef1edfb5..f76311869 100644 --- a/benchmark.go +++ b/benchmark.go @@ -85,7 +85,6 @@ func main() { profile := flag.Bool("profile", false, "Turn on profiling (CPU, Memory).") metricsReportURL := flag.String("metrics_report_url", "", "If set, reports metrics to this URL.") versionFlag := flag.Bool("version", false, "Output version info") - syncNode := flag.Bool("sync_node", false, "Whether this node is a new node joining blockchain and it needs to get synced before joining consensus.") onlyLogTps := flag.Bool("only_log_tps", false, "Only log TPS if true") // This IP belongs to jenkins.harmony.one @@ -186,8 +185,6 @@ func main() { currentNode := node.New(consensus, ldb, selfPeer) // Add self peer. currentNode.SelfPeer = selfPeer - // Add sync node configuration. - currentNode.SyncNode = *syncNode // If there is a client configured in the node list. if clientPeer != nil { currentNode.ClientPeer = clientPeer diff --git a/node/node.go b/node/node.go index b39c57f69..f368b96ee 100644 --- a/node/node.go +++ b/node/node.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/crypto" @@ -29,6 +30,7 @@ import ( "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2pv2" proto_node "github.com/harmony-one/harmony/proto/node" + "github.com/harmony-one/harmony/syncing" "github.com/harmony-one/harmony/syncing/downloader" downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto" ) @@ -46,6 +48,12 @@ const ( NodeLeader // Node is the leader of some shard. ) +// Constants related to doing syncing. +const ( + NotDoingSyncing uint32 = iota + DoingSyncing +) + const ( // TimeToSleepForSyncing is the time waiting for node transformed into NodeDoingConsensus TimeToSleepForSyncing = time.Second * 30 @@ -77,7 +85,6 @@ type Node struct { SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. IDCPeer p2p.Peer - SyncNode bool // TODO(minhdoan): Remove it later. chain *core.BlockChain // Account Model Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer State State // State of the Node @@ -92,6 +99,8 @@ type Node struct { // Syncing component. downloaderServer *downloader.Server + stateSync *syncing.StateSync + syncingState uint32 // Test only TestBankKeys []*ecdsa.PrivateKey @@ -150,10 +159,6 @@ func (node *Node) getTransactionsForNewBlockAccount(maxNumTxs int) (types.Transa // StartServer starts a server and process the request by a handler. func (node *Node) StartServer(port string) { - if node.SyncNode { - // Disable this temporarily. - // node.blockchain = syncing.StartBlockSyncing(node.Consensus.GetValidatorPeers()) - } if p2p.Version == 1 { fmt.Println("going to start server on port:", port) //node.log.Debug("Starting server", "node", node, "port", port) @@ -320,12 +325,23 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node node.State = NodeInit } + // Setup initial state of syncing. + node.syncingState = NotDoingSyncing + return &node } // DoSyncing starts syncing. func (node *Node) DoSyncing() { - + // If this node is currently doing sync, another call for syncing will be returned immediately. + if !atomic.CompareAndSwapUint32(&node.syncingState, NotDoingSyncing, DoingSyncing) { + return + } + defer atomic.StoreUint32(&node.syncingState, NotDoingSyncing) + if node.stateSync != nil { + node.stateSync = syncing.GetStateSync() + } + node.stateSync.StartStateSync(node.GetPeers(), node.blockchain) } // AddPeers adds neighbors nodes diff --git a/node/node_test.go b/node/node_test.go index d27a60cbc..bd3d59d50 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -65,7 +65,7 @@ func TestGetPeers(t *testing.T) { node.Neighbors.Store("minh", peer) node.Neighbors.Store("mark", peer2) res := node.GetPeers() - if len(res) != 2 || res[0] != peer || res[1] != peer2 { + if len(res) != 2 || !((res[0] == peer && res[1] == peer2) || (res[1] == peer && res[0] == peer2)) { t.Error("GetPeers should return list of {peer, peer2}") } } From 91c7f51c6ae95926b13ba97ad8198bff0f1ffc2f Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Fri, 30 Nov 2018 16:13:49 -0800 Subject: [PATCH 03/12] setup syncing when node.State transitioned to NodeJoinedShard --- node/node_handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/node_handler.go b/node/node_handler.go index e1d9f7d81..d6cd56a10 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -557,9 +557,6 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { node.log.Error("Can't get Pong Message") return -1 } - // node.log.Info("Pong", "Msg", pong) - // TODO (lc) state syncing, and wait for all public keys - node.State = NodeJoinedShard peers := make([]p2p.Peer, 0) @@ -598,5 +595,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { publicKeys = append(publicKeys, key) } + node.State = NodeJoinedShard + go node.DoSyncing() + return node.Consensus.UpdatePublicKeys(publicKeys) } From 004b906d084ac1260f72d2f26923b6195e16479d Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Fri, 30 Nov 2018 16:24:54 -0800 Subject: [PATCH 04/12] add DoSyncing when doing censonsus if the node state is still NodeJoinedShard --- node/node_handler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/node/node_handler.go b/node/node_handler.go index d6cd56a10..64ffe3f59 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -90,12 +90,12 @@ func (node *Node) NodeHandler(conn net.Conn) { } } case proto.Consensus: - if !(node.State == NodeDoingConsensus || node.State == NodeLeader) { - if node.State == NodeJoinedShard { - - } + if !(node.State == NodeDoingConsensus || node.State == NodeLeader || node.State == NodeJoinedShard) { return } + if node.State == NodeJoinedShard { + node.DoSyncing() + } actionType := consensus.ConMessageType(msgType) switch actionType { case consensus.Consensus: From 89bf6b4668949224d60a7a353a8f8ce64243ba53 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Fri, 30 Nov 2018 16:29:53 -0800 Subject: [PATCH 05/12] change state syncing if the node is doing consensus --- node/node_handler.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/node/node_handler.go b/node/node_handler.go index 64ffe3f59..90fc4a3f2 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -403,18 +403,26 @@ func (node *Node) BroadcastNewBlock(newBlock *blockchain.Block) { // VerifyNewBlock is called by consensus participants to verify the block they are running consensus on func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool { + // TODO: just a reminder for syncing. we need to check if the new block is fit with the current blockchain. + // The current blockchain can be in the progress of being synced. + var verified bool if newBlock.AccountBlock != nil { accountBlock := new(types.Block) err := rlp.DecodeBytes(newBlock.AccountBlock, accountBlock) if err != nil { node.log.Error("Failed decoding the block with RLP") } - return node.VerifyNewBlockAccount(accountBlock) + verified = node.VerifyNewBlockAccount(accountBlock) + } else if newBlock.IsStateBlock() { + verified = node.UtxoPool.VerifyStateBlock(newBlock) + } else { + verified = node.UtxoPool.VerifyTransactions(newBlock.Transactions) } - if newBlock.IsStateBlock() { - return node.UtxoPool.VerifyStateBlock(newBlock) + if verified { + // Change the syncing state. + node.State = NodeDoingConsensus } - return node.UtxoPool.VerifyTransactions(newBlock.Transactions) + return verified } // VerifyNewBlockAccount is called by consensus participants to verify the block (account model) they are running consensus on From 6cbb0f73bfebee1743a9125c3f534d41fd800c29 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Fri, 30 Nov 2018 17:16:23 -0800 Subject: [PATCH 06/12] fix typo --- benchmark.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark.go b/benchmark.go index f76311869..a765fae5f 100644 --- a/benchmark.go +++ b/benchmark.go @@ -224,7 +224,7 @@ func main() { if *peerDisvoery { go currentNode.JoinShard(leader) } else { - node.State = node.NodeDoingConsensus + currentNode.State = node.NodeDoingConsensus } } From e028236935216972e73dd0cc0b99b9ae7138a453 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sat, 1 Dec 2018 12:24:14 -0800 Subject: [PATCH 07/12] add syncing port logic for syncing --- node/node.go | 28 +++++++++++++++++++--------- node/node_test.go | 15 +++++++++------ 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/node/node.go b/node/node.go index f368b96ee..cb7d440b7 100644 --- a/node/node.go +++ b/node/node.go @@ -57,6 +57,8 @@ const ( const ( // TimeToSleepForSyncing is the time waiting for node transformed into NodeDoingConsensus TimeToSleepForSyncing = time.Second * 30 + // SyncingPortDifference is the difference between the node port and the syncing port. + SyncingPortDifference = 1000 ) // NetworkNode ... @@ -341,7 +343,7 @@ func (node *Node) DoSyncing() { if node.stateSync != nil { node.stateSync = syncing.GetStateSync() } - node.stateSync.StartStateSync(node.GetPeers(), node.blockchain) + node.stateSync.StartStateSync(node.GetSyncingPeers(), node.blockchain) } // AddPeers adds neighbors nodes @@ -362,13 +364,26 @@ func (node *Node) AddPeers(peers []p2p.Peer) int { return count } -// GetPeers returns list of peers. -func (node *Node) GetPeers() []p2p.Peer { +// GetSyncingPort returns the syncing port. +func GetSyncingPort(nodePort string) string { + if port, err := strconv.Atoi(nodePort); err == nil { + return fmt.Sprintf("%d", port-SyncingPortDifference) + } + os.Exit(1) + return "" +} + +// GetSyncingPeers returns list of peers. +func (node *Node) GetSyncingPeers() []p2p.Peer { res := []p2p.Peer{} node.Neighbors.Range(func(k, v interface{}) bool { res = append(res, v.(p2p.Peer)) return true }) + + for i := range res { + res[i].Port = GetSyncingPort(res[i].Port) + } return res } @@ -400,12 +415,7 @@ func (node *Node) InitSyncingServer() { // StartSyncingServer starts syncing server. func (node *Node) StartSyncingServer() { - if port, err := strconv.Atoi(node.SelfPeer.Port); err == nil { - node.downloaderServer.Start(node.SelfPeer.IP, fmt.Sprintf("%d", port-1000)) - } else { - node.log.Error("Wrong port format provided") - os.Exit(1) - } + node.downloaderServer.Start(node.SelfPeer.IP, GetSyncingPort(node.SelfPeer.Port)) } // CalculateResponse implements DownloadInterface on Node object. diff --git a/node/node_test.go b/node/node_test.go index bd3d59d50..81489fab2 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -54,19 +54,22 @@ func TestCountNumTransactionsInBlockchain(t *testing.T) { } } -func TestGetPeers(t *testing.T) { +func TestGetSyncingPeers(t *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) node := New(consensus, nil, leader) - peer := p2p.Peer{IP: "1.1.1.1"} - peer2 := p2p.Peer{IP: "2.1.1.1"} + peer := p2p.Peer{IP: "1.1.1.1", Port: "2000"} + peer2 := p2p.Peer{IP: "2.1.1.1", Port: "2000"} node.Neighbors.Store("minh", peer) node.Neighbors.Store("mark", peer2) - res := node.GetPeers() - if len(res) != 2 || !((res[0] == peer && res[1] == peer2) || (res[1] == peer && res[0] == peer2)) { - t.Error("GetPeers should return list of {peer, peer2}") + res := node.GetSyncingPeers() + if len(res) != 2 || !((res[0].IP == peer.IP && res[1].IP == peer2.IP) || (res[1].IP == peer.IP && res[0].IP == peer2.IP)) { + t.Error("GetSyncingPeers should return list of {peer, peer2}") + } + if len(res) != 2 || res[0].Port != "1000" || res[1].Port != "1000" { + t.Error("Syncing ports should be 1000") } } From 8ba071608931d283dbee99d78d8c4d46527d75c3 Mon Sep 17 00:00:00 2001 From: Richard Liu Date: Sat, 1 Dec 2018 23:08:54 -0800 Subject: [PATCH 08/12] comment out fmt.Println --- benchmark.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark.go b/benchmark.go index 61c656984..9def78282 100644 --- a/benchmark.go +++ b/benchmark.go @@ -141,7 +141,7 @@ func main() { clientPeer = distributionConfig.GetClientPeer() } - fmt.Println(peers, leader, selfPeer, clientPeer, *logFolder, *minPeers) //TODO: to be replaced by a logger later: ak, rl + // fmt.Println(peers, leader, selfPeer, clientPeer, *logFolder, *minPeers) //TODO: to be replaced by a logger later: ak, rl var role string if leader.IP == *ip && leader.Port == *port { From 284a765fd9582f0558b4df2c22a3dbb367f5bd68 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Mon, 3 Dec 2018 16:16:14 -0800 Subject: [PATCH 09/12] change logic of syncing --- benchmark.go | 1 - local_config5.txt | 3 +++ node/node.go | 26 ++++++++++++++------- node/node_handler.go | 12 +++++----- syncing/syncing.go | 50 ++++++++++++++++++++++++++++------------- syncing/syncing_test.go | 2 +- 6 files changed, 62 insertions(+), 32 deletions(-) create mode 100644 local_config5.txt diff --git a/benchmark.go b/benchmark.go index b68cd72ea..9283771ee 100644 --- a/benchmark.go +++ b/benchmark.go @@ -71,7 +71,6 @@ func loggingInit(logFolder, role, ip, port string, onlyLogTps bool) { h = log.MatchFilterHandler("msg", "TPS Report", h) } log.Root().SetHandler(h) - } func main() { diff --git a/local_config5.txt b/local_config5.txt new file mode 100644 index 000000000..2f6be3c39 --- /dev/null +++ b/local_config5.txt @@ -0,0 +1,3 @@ +127.0.0.1 9000 leader 0 +127.0.0.1 9001 validator 0 +127.0.0.1 9999 client 0 diff --git a/node/node.go b/node/node.go index de05085a7..1060e8ec4 100644 --- a/node/node.go +++ b/node/node.go @@ -40,12 +40,13 @@ type State byte // All constants except the NodeLeader below are for validators only. const ( - NodeInit State = iota // Node just started, before contacting BeaconChain - NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard - NodeJoinedShard // Node joined Shard, ready for consensus - NodeOffline // Node is offline - NodeDoingConsensus // Node is already doing consensus - NodeLeader // Node is the leader of some shard. + NodeInit State = iota // Node just started, before contacting BeaconChain + NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard + NodeJoinedShard // Node joined Shard, ready for consensus + NodeOffline // Node is offline + NodeReadyForConsensus // Node is ready for doing consensus + NodeDoingConsensus // Node is already doing consensus + NodeLeader // Node is the leader of some shard. ) // Constants related to doing syncing. @@ -339,10 +340,15 @@ func (node *Node) DoSyncing() { return } defer atomic.StoreUint32(&node.syncingState, NotDoingSyncing) - if node.stateSync != nil { + if node.stateSync == nil { node.stateSync = syncing.GetStateSync() } - node.stateSync.StartStateSync(node.GetSyncingPeers(), node.blockchain) + if node.stateSync.StartStateSync(node.GetSyncingPeers(), node.blockchain) { + node.log.Debug("DoSyncing: successfully sync") + node.State = NodeReadyForConsensus + } else { + node.log.Debug("DoSyncing: failed to sync") + } } // AddPeers adds neighbors nodes @@ -380,6 +386,7 @@ func GetSyncingPort(nodePort string) string { func (node *Node) GetSyncingPeers() []p2p.Peer { res := []p2p.Peer{} node.Neighbors.Range(func(k, v interface{}) bool { + node.log.Debug("GetSyncingPeers-Range: ", "k", k, "v", v) res = append(res, v.(p2p.Peer)) return true }) @@ -387,6 +394,7 @@ func (node *Node) GetSyncingPeers() []p2p.Peer { for i := range res { res[i].Port = GetSyncingPort(res[i].Port) } + node.log.Debug("GetSyncingPeers: ", "res", res) return res } @@ -418,6 +426,8 @@ func (node *Node) InitSyncingServer() { // StartSyncingServer starts syncing server. func (node *Node) StartSyncingServer() { + port := GetSyncingPort(node.SelfPeer.Port) + node.log.Info("support_sycning: StartSyncingServer on port:", "port", port) node.downloaderServer.Start(node.SelfPeer.IP, GetSyncingPort(node.SelfPeer.Port)) } diff --git a/node/node_handler.go b/node/node_handler.go index ec5db139b..d14b0b676 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -90,11 +90,9 @@ func (node *Node) NodeHandler(conn net.Conn) { } } case proto.Consensus: - if !(node.State == NodeDoingConsensus || node.State == NodeLeader || node.State == NodeJoinedShard) { - return - } - if node.State == NodeJoinedShard { - node.DoSyncing() + if !(node.State == NodeDoingConsensus || node.State == NodeLeader || node.State == NodeReadyForConsensus) { + node.log.Info("This node with ", "peer", node.SelfPeer, "can not join consensus because they are either not noding consensus or not a leader", nil) + break } actionType := consensus.ConMessageType(msgType) switch actionType { @@ -105,6 +103,8 @@ func (node *Node) NodeHandler(conn net.Conn) { } else { node.log.Info("NET: received message: Consensus/Validator") consensusObj.ProcessMessageValidator(msgPayload) + // TODO(minhdoan): add logic to check if the current blockchain is not sync with other consensus + // we should switch to other state rather than DoingConsensus. } } case proto.Node: @@ -208,7 +208,7 @@ func (node *Node) NodeHandler(conn net.Conn) { } // Post processing after receiving messsages. - if node.State == NodeJoinedShard { + if node.State == NodeJoinedShard || node.State == NodeReadyForConsensus { go node.DoSyncing() } } diff --git a/syncing/syncing.go b/syncing/syncing.go index 0c7d3770d..86251dc06 100644 --- a/syncing/syncing.go +++ b/syncing/syncing.go @@ -9,13 +9,16 @@ import ( "github.com/Workiva/go-datastructures/queue" "github.com/harmony-one/harmony/blockchain" + "github.com/harmony-one/harmony/log" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/syncing/downloader" ) // Constants for syncing. const ( - ConsensusRatio = float64(0.66) + ConsensusRatio = float64(0.66) + SleepTimeAfterNonConsensusBlockHashes = time.Second * 30 + TimesToFail = 5 ) // SyncPeerConfig is peer config to sync. @@ -26,6 +29,8 @@ type SyncPeerConfig struct { blockHashes [][]byte } +var Log = log.New() + // SyncBlockTask is the task struct to sync a specific block. type SyncBlockTask struct { index int @@ -111,7 +116,10 @@ func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain. // CreateSyncConfig creates SyncConfig for StateSync object. func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) { + Log.Debug("CreateSyncConfig: len of peers", "len", len(peers)) + Log.Debug("CreateSyncConfig: len of peers", "peers", peers) ss.peerNumber = len(peers) + Log.Debug("CreateSyncConfig: hello") ss.syncConfig = &SyncConfig{ peers: make([]*SyncPeerConfig, ss.peerNumber), } @@ -120,7 +128,9 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) { ip: peers[id].IP, port: peers[id].Port, } + Log.Debug("CreateSyncConfig: peer port to connect", "port", peers[id].Port) } + Log.Info("syncing: Finished creating SyncConfig.") } // MakeConnectionToPeers makes grpc connection to all peers. @@ -136,6 +146,7 @@ func (ss *StateSync) MakeConnectionToPeers() { } wg.Wait() ss.CleanUpNilPeers() + Log.Info("syncing: Finished making connection to peers.") } // CleanUpNilPeers cleans up peer with nil client and recalculate activePeerNumber. @@ -211,7 +222,8 @@ func (ss *StateSync) GetBlockHashesConsensusAndCleanUp() bool { } // GetConsensusHashes gets all hashes needed to download. -func (ss *StateSync) GetConsensusHashes() { +func (ss *StateSync) GetConsensusHashes() bool { + count := 0 for { var wg sync.WaitGroup wg.Add(ss.activePeerNumber) @@ -230,7 +242,15 @@ func (ss *StateSync) GetConsensusHashes() { if ss.GetBlockHashesConsensusAndCleanUp() { break } + if count > TimesToFail { + Log.Info("GetConsensusHashes: reached # of times to failed") + return false + } + count++ + time.Sleep(SleepTimeAfterNonConsensusBlockHashes) } + Log.Info("syncing: Finished getting consensus block hashes.") + return true } // getConsensusHashes gets all hashes needed to download. @@ -249,6 +269,7 @@ func (ss *StateSync) generateStateSyncTaskQueue(bc *blockchain.Blockchain) { break } } + Log.Info("syncing: Finished generateStateSyncTaskQueue.") } // downloadBlocks downloads blocks from state sync task queue. @@ -283,26 +304,23 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) { }(ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc) } wg.Wait() + Log.Info("syncing: Finished downloadBlocks.") } // StartStateSync starts state sync. -func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) { +func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) bool { // Creates sync config. ss.CreateSyncConfig(peers) // Makes connections to peers. ss.MakeConnectionToPeers() - for { - // Gets consensus hashes. - ss.GetConsensusHashes() - - // Generates state-sync task queue. - ss.generateStateSyncTaskQueue(bc) - - // Download blocks. - if ss.stateSyncTaskQueue.Len() > 0 { - ss.downloadBlocks(bc) - } else { - break - } + // Gets consensus hashes. + if !ss.GetConsensusHashes() { + return false + } + ss.generateStateSyncTaskQueue(bc) + // Download blocks. + if ss.stateSyncTaskQueue.Len() > 0 { + ss.downloadBlocks(bc) } + return true } diff --git a/syncing/syncing_test.go b/syncing/syncing_test.go index a55326092..7e7fe13ee 100644 --- a/syncing/syncing_test.go +++ b/syncing/syncing_test.go @@ -181,7 +181,7 @@ func TestSyncingIncludingBadNode(t *testing.T) { peers[i].Port = fakeNodes[i].port } - stateSync.StartStateSync(peers, bc) + assert.True(t, stateSync.StartStateSync(peers, bc), "should return true") for i := range bc.Blocks { if !reflect.DeepEqual(bc.Blocks[i], fakeNodes[0].bc.Blocks[i]) { From 39cb55561578136c01d0cfed35b2e7b25000676f Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Mon, 3 Dec 2018 16:20:12 -0800 Subject: [PATCH 10/12] add logic of syncing --- node/node.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/node/node.go b/node/node.go index 1060e8ec4..d5f65fbec 100644 --- a/node/node.go +++ b/node/node.go @@ -345,7 +345,9 @@ func (node *Node) DoSyncing() { } if node.stateSync.StartStateSync(node.GetSyncingPeers(), node.blockchain) { node.log.Debug("DoSyncing: successfully sync") - node.State = NodeReadyForConsensus + if node.State == NodeJoinedShard { + node.State = NodeReadyForConsensus + } } else { node.log.Debug("DoSyncing: failed to sync") } From 46212eb359c84cf2f6a724a62e28cf797b997bea Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Mon, 3 Dec 2018 16:37:57 -0800 Subject: [PATCH 11/12] length of syncing peers should be 1 now --- node/node.go | 6 +++++- node/node_test.go | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/node/node.go b/node/node.go index d5f65fbec..94580daa4 100644 --- a/node/node.go +++ b/node/node.go @@ -385,11 +385,15 @@ func GetSyncingPort(nodePort string) string { } // GetSyncingPeers returns list of peers. +// Right now, the list length is only 1 for testing. +// TODO(mihdoan): fix it later. func (node *Node) GetSyncingPeers() []p2p.Peer { res := []p2p.Peer{} node.Neighbors.Range(func(k, v interface{}) bool { node.log.Debug("GetSyncingPeers-Range: ", "k", k, "v", v) - res = append(res, v.(p2p.Peer)) + if len(res) == 0 { + res = append(res, v.(p2p.Peer)) + } return true }) diff --git a/node/node_test.go b/node/node_test.go index 2b93ece56..df52d7649 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -70,10 +70,10 @@ func TestGetSyncingPeers(t *testing.T) { node.Neighbors.Store("minh", peer) node.Neighbors.Store("mark", peer2) res := node.GetSyncingPeers() - if len(res) != 2 || !((res[0].IP == peer.IP && res[1].IP == peer2.IP) || (res[1].IP == peer.IP && res[0].IP == peer2.IP)) { + if len(res) != 1 || !(res[0].IP == peer.IP || res[0].IP == peer2.IP) { t.Error("GetSyncingPeers should return list of {peer, peer2}") } - if len(res) != 2 || res[0].Port != "1000" || res[1].Port != "1000" { + if len(res) != 1 || res[0].Port != "1000" { t.Error("Syncing ports should be 1000") } } @@ -166,8 +166,8 @@ func sendPongMessage(leader p2p.Peer) { } func exitServer() { - fmt.Println("wait 15 seconds to terminate the process ...") - time.Sleep(15 * time.Second) + fmt.Println("wait 5 seconds to terminate the process ...") + time.Sleep(5 * time.Second) os.Exit(0) } From 771b48adab54f8e1d4ae6c1c62471d157cd1df87 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Mon, 3 Dec 2018 16:59:54 -0800 Subject: [PATCH 12/12] try to fix the ethereum test --- core/tx_pool_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index b1ea3f29f..5889f498f 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -126,7 +126,7 @@ func validateEvents(events chan NewTxsEvent, count int) error { case ev := <-events: received = append(received, ev.Txs...) case <-time.After(time.Second): - return fmt.Errorf("event #%d not fired", received) + return fmt.Errorf("event #%d not fired", len(received)) } } if len(received) > count {