diff --git a/node/node.go b/node/node.go index 26d0f34ac..f6ffaf703 100644 --- a/node/node.go +++ b/node/node.go @@ -8,7 +8,6 @@ import ( "fmt" "math/big" "os" - "strconv" "sync" "time" @@ -367,49 +366,6 @@ func (node *Node) IsOutOfSync(consensusBlockInfo *bft.BFTBlockInfo) bool { return true } -// DoSyncing wait for check status and starts syncing if out of sync -func (node *Node) DoSyncing() { - for { - select { - // in current implementation logic, timeout means in sync - case <-time.After(5 * time.Second): - //myHeight := node.blockchain.CurrentBlock().NumberU64() - //utils.GetLogInstance().Debug("[SYNC]", "currentHeight", myHeight) - node.stateMutex.Lock() - node.State = NodeReadyForConsensus - node.stateMutex.Unlock() - continue - case consensusBlockInfo := <-node.Consensus.ConsensusBlock: - if !node.IsOutOfSync(consensusBlockInfo) { - startHash := node.blockchain.CurrentBlock().Hash() - node.stateSync.StartStateSync(startHash[:], node.blockchain, node.Worker) - if node.State == NodeNotInSync { - utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!") - } - node.stateMutex.Lock() - node.State = NodeReadyForConsensus - node.stateMutex.Unlock() - node.stateSync.CloseConnections() - node.stateSync = nil - continue - } else { - utils.GetLogInstance().Debug("[SYNC] node is out of sync") - node.stateMutex.Lock() - node.State = NodeNotInSync - node.stateMutex.Unlock() - } - - if node.stateSync == nil { - node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port) - node.stateSync.CreateSyncConfig(node.GetSyncingPeers()) - node.stateSync.MakeConnectionToPeers() - } - startHash := node.blockchain.CurrentBlock().Hash() - node.stateSync.StartStateSync(startHash[:], node.blockchain, node.Worker) - } - } -} - // AddPeers adds neighbors nodes func (node *Node) AddPeers(peers []*p2p.Peer) int { count := 0 @@ -435,37 +391,6 @@ func (node *Node) AddPeers(peers []*p2p.Peer) int { return count } -// GetSyncingPort returns the syncing port. -func GetSyncingPort(nodePort string) string { - if port, err := strconv.Atoi(nodePort); err == nil { - return fmt.Sprintf("%d", port-syncing.SyncingPortDifference) - } - os.Exit(1) - return "" -} - -// GetSyncingPeers returns list of peers. -func (node *Node) GetSyncingPeers() []p2p.Peer { - res := []p2p.Peer{} - node.Neighbors.Range(func(k, v interface{}) bool { - res = append(res, v.(p2p.Peer)) - return true - }) - - removeID := -1 - for i := range res { - if res[i].Port == node.SelfPeer.Port { - removeID = i - } - res[i].Port = GetSyncingPort(res[i].Port) - } - if removeID != -1 { - res = append(res[:removeID], res[removeID+1:]...) - } - utils.GetLogInstance().Debug("GetSyncingPeers: ", "res", res, "self", node.SelfPeer) - return res -} - // JoinShard helps a new node to join a shard. func (node *Node) JoinShard(leader p2p.Peer) { // try to join the shard, send ping message every 1 second, with a 10 minutes time-out @@ -495,26 +420,6 @@ func (node *Node) JoinShard(leader p2p.Peer) { } } -// SupportSyncing keeps sleeping until it's doing consensus or it's a leader. -func (node *Node) SupportSyncing() { - node.InitSyncingServer() - node.StartSyncingServer() - - go node.DoSyncing() - go node.SendNewBlockToUnsync() -} - -// InitSyncingServer starts downloader server. -func (node *Node) InitSyncingServer() { - node.downloaderServer = downloader.NewServer(node) -} - -// StartSyncingServer starts syncing server. -func (node *Node) StartSyncingServer() { - utils.GetLogInstance().Info("support_sycning: StartSyncingServer") - node.downloaderServer.Start(node.SelfPeer.IP, GetSyncingPort(node.SelfPeer.Port)) -} - // CalculateResponse implements DownloadInterface on Node object. func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*downloader_pb.DownloaderResponse, error) { response := &downloader_pb.DownloaderResponse{} @@ -595,43 +500,6 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (* return response, nil } -// SendNewBlockToUnsync send latest verified block to unsync, registered nodes -func (node *Node) SendNewBlockToUnsync() { - for { - block := <-node.Consensus.VerifiedNewBlock - blockHash, err := rlp.EncodeToBytes(block) - if err != nil { - utils.GetLogInstance().Warn("[SYNC] unable to encode block to hashes") - continue - } - - // really need to have a unique id independent of ip/port - selfPeerID := utils.GetUniqueIDFromIPPort(node.SelfPeer.IP, node.SelfPeer.Port) - utils.GetLogInstance().Debug("[SYNC] peerRegistration Record", "peerID", selfPeerID, "number", len(node.peerRegistrationRecord)) - - for peerID, config := range node.peerRegistrationRecord { - elapseTime := time.Now().UnixNano() - config.timestamp - if elapseTime > broadcastTimeout { - utils.GetLogInstance().Warn("[SYNC] SendNewBlockToUnsync to peer timeout", "peerID", peerID) - // send last time and delete - config.client.PushNewBlock(selfPeerID, blockHash, true) - node.stateMutex.Lock() - node.peerRegistrationRecord[peerID].client.Close() - delete(node.peerRegistrationRecord, peerID) - node.stateMutex.Unlock() - continue - } - response := config.client.PushNewBlock(selfPeerID, blockHash, false) - if response != nil && response.Type == downloader_pb.DownloaderResponse_INSYNC { - node.stateMutex.Lock() - node.peerRegistrationRecord[peerID].client.Close() - delete(node.peerRegistrationRecord, peerID) - node.stateMutex.Unlock() - } - } - } -} - // RemovePeersHandler is a goroutine to wait on the OfflinePeers channel // and remove the peers from validator list func (node *Node) RemovePeersHandler() { diff --git a/node/node_syncing.go b/node/node_syncing.go new file mode 100644 index 000000000..acf954e6a --- /dev/null +++ b/node/node_syncing.go @@ -0,0 +1,110 @@ + +import ( + "time" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/api/service/syncing" + "github.com/harmony-one/harmony/api/service/syncing/downloader" + downloader_pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto" + "github.com/harmony-one/harmony/internal/utils" +) + +// DoSyncing wait for check status and starts syncing if out of sync +func (node *Node) DoSyncing() { + for { + select { + // in current implementation logic, timeout means in sync + case <-time.After(5 * time.Second): + //myHeight := node.blockchain.CurrentBlock().NumberU64() + //utils.GetLogInstance().Debug("[SYNC]", "currentHeight", myHeight) + node.stateMutex.Lock() + node.State = NodeReadyForConsensus + node.stateMutex.Unlock() + continue + case consensusBlockInfo := <-node.Consensus.ConsensusBlock: + if !node.IsOutOfSync(consensusBlockInfo) { + startHash := node.blockchain.CurrentBlock().Hash() + node.stateSync.StartStateSync(startHash[:], node.blockchain, node.Worker) + if node.State == NodeNotInSync { + utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!") + } + node.stateMutex.Lock() + node.State = NodeReadyForConsensus + node.stateMutex.Unlock() + node.stateSync.CloseConnections() + node.stateSync = nil + continue + } else { + utils.GetLogInstance().Debug("[SYNC] node is out of sync") + node.stateMutex.Lock() + node.State = NodeNotInSync + node.stateMutex.Unlock() + } + + if node.stateSync == nil { + node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port) + node.stateSync.CreateSyncConfig(node.GetSyncingPeers()) + node.stateSync.MakeConnectionToPeers() + } + startHash := node.blockchain.CurrentBlock().Hash() + node.stateSync.StartStateSync(startHash[:], node.blockchain, node.Worker) + } + } +} + +// SupportSyncing keeps sleeping until it's doing consensus or it's a leader. +func (node *Node) SupportSyncing() { + node.InitSyncingServer() + node.StartSyncingServer() + + go node.DoSyncing() + go node.SendNewBlockToUnsync() +} + +// InitSyncingServer starts downloader server. +func (node *Node) InitSyncingServer() { + node.downloaderServer = downloader.NewServer(node) +} + +// StartSyncingServer starts syncing server. +func (node *Node) StartSyncingServer() { + utils.GetLogInstance().Info("support_sycning: StartSyncingServer") + node.downloaderServer.Start(node.SelfPeer.IP, GetSyncingPort(node.SelfPeer.Port)) +} + +// SendNewBlockToUnsync send latest verified block to unsync, registered nodes +func (node *Node) SendNewBlockToUnsync() { + for { + block := <-node.Consensus.VerifiedNewBlock + blockHash, err := rlp.EncodeToBytes(block) + if err != nil { + utils.GetLogInstance().Warn("[SYNC] unable to encode block to hashes") + continue + } + + // really need to have a unique id independent of ip/port + selfPeerID := utils.GetUniqueIDFromIPPort(node.SelfPeer.IP, node.SelfPeer.Port) + utils.GetLogInstance().Debug("[SYNC] peerRegistration Record", "peerID", selfPeerID, "number", len(node.peerRegistrationRecord)) + + for peerID, config := range node.peerRegistrationRecord { + elapseTime := time.Now().UnixNano() - config.timestamp + if elapseTime > broadcastTimeout { + utils.GetLogInstance().Warn("[SYNC] SendNewBlockToUnsync to peer timeout", "peerID", peerID) + // send last time and delete + config.client.PushNewBlock(selfPeerID, blockHash, true) + node.stateMutex.Lock() + node.peerRegistrationRecord[peerID].client.Close() + delete(node.peerRegistrationRecord, peerID) + node.stateMutex.Unlock() + continue + } + response := config.client.PushNewBlock(selfPeerID, blockHash, false) + if response != nil && response.Type == downloader_pb.DownloaderResponse_INSYNC { + node.stateMutex.Lock() + node.peerRegistrationRecord[peerID].client.Close() + delete(node.peerRegistrationRecord, peerID) + node.stateMutex.Unlock() + } + } + } +}