package syncing import ( "bufio" "net" "reflect" "sync" "time" "github.com/Workiva/go-datastructures/queue" "github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/p2p" proto_node "github.com/harmony-one/harmony/proto/node" ) // SyncPeerConfig is peer config to sync. type SyncPeerConfig struct { peer p2p.Peer conn net.Conn w *bufio.Writer err error trusted bool blockHashes [][32]byte } // SyncBlockTask is the task struct to sync a specific block. type SyncBlockTask struct { index int blockHash [32]byte } // SyncConfig contains an array of SyncPeerConfig. type SyncConfig struct { peers []SyncPeerConfig } // GetStateSync returns the implementation of StateSyncInterface interface. func GetStateSync() *StateSync { return &StateSync{} } // StateSync is the struct that implements StateSyncInterface. type StateSync struct { peerNumber int activePeerNumber int blockHeight int syncConfig *SyncConfig stateSyncTaskQueue *queue.Queue } // ProcessStateSyncFromPeers used to do state sync. func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) { // TODO: Validate peers. done := make(chan struct{}) go func() { ss.StartStateSync(peers, bc) done <- struct{}{} }() return done, nil } // ProcessStateSyncFromSinglePeer used to do state sync from a single peer. func (ss *StateSync) ProcessStateSyncFromSinglePeer(peer *p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) { // Later. return nil, nil } func (ss *StateSync) createSyncConfig(peers []p2p.Peer) { ss.peerNumber = len(peers) ss.syncConfig = &SyncConfig{ peers: make([]SyncPeerConfig, ss.peerNumber), } for id := range ss.syncConfig.peers { ss.syncConfig.peers[id].peer = peers[id] ss.syncConfig.peers[id].trusted = false } } func (ss *StateSync) makeConnectionToPeers() { var wg sync.WaitGroup wg.Add(ss.peerNumber) for _, synPeerConfig := range ss.syncConfig.peers { go func(peerConfig *SyncPeerConfig) { defer wg.Done() peerConfig.conn, peerConfig.err = p2p.DialWithSocketClient(peerConfig.peer.Ip, peerConfig.peer.Port) }(&synPeerConfig) } wg.Wait() ss.activePeerNumber = 0 for _, configPeer := range ss.syncConfig.peers { if configPeer.err == nil { ss.activePeerNumber++ configPeer.w = bufio.NewWriter(configPeer.conn) configPeer.trusted = true } } } // areConsensusHashesEqual chesk if all consensus hashes are equal. func (ss *StateSync) areConsensusHashesEqual() bool { var fixedPeer *SyncPeerConfig for _, configPeer := range ss.syncConfig.peers { if configPeer.trusted { if fixedPeer == nil { fixedPeer = &configPeer } if !reflect.DeepEqual(configPeer.blockHashes, fixedPeer) { return false } } } return true } // getConsensusHashes gets all hashes needed to download. func (ss *StateSync) getConsensusHashes() { for { var wg sync.WaitGroup wg.Add(ss.activePeerNumber) for _, configPeer := range ss.syncConfig.peers { if configPeer.err != nil { continue } go func(peerConfig *SyncPeerConfig) { defer wg.Done() msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GetLastBlockHashes, [32]byte{}) peerConfig.w.Write(msg) peerConfig.w.Flush() var content []byte content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn) if peerConfig.err != nil { peerConfig.trusted = false return } var blockchainSyncMessage *proto_node.BlockchainSyncMessage blockchainSyncMessage, peerConfig.err = proto_node.DeserializeBlockchainSyncMessage(content) if peerConfig.err != nil { peerConfig.trusted = false return } peerConfig.blockHashes = blockchainSyncMessage.BlockHashes }(&configPeer) } if ss.areConsensusHashesEqual() { break } } } // getConsensusHashes gets all hashes needed to download. func (ss *StateSync) generateStateSyncTaskQueue() { ss.stateSyncTaskQueue = queue.New(0) for _, configPeer := range ss.syncConfig.peers { if configPeer.trusted { for id, blockHash := range configPeer.blockHashes { ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) } ss.blockHeight = len(configPeer.blockHashes) } } } // downloadBlocks downloads blocks from state sync task queue. func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) { // Initialize blockchain bc.Blocks = make([]*blockchain.Block, ss.blockHeight) var wg sync.WaitGroup wg.Add(int(ss.stateSyncTaskQueue.Len())) for _, configPeer := range ss.syncConfig.peers { if configPeer.err != nil { continue } go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *blockchain.Blockchain) { defer wg.Done() for !stateSyncTaskQueue.Empty() { task, err := stateSyncTaskQueue.Poll(1, time.Millisecond) if err == queue.ErrTimeout { break } syncTask := task[0].(SyncBlockTask) msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GetBlock, syncTask.blockHash) peerConfig.w.Write(msg) peerConfig.w.Flush() var content []byte content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn) if peerConfig.err != nil { peerConfig.trusted = false return } block, err := blockchain.DeserializeBlock(content) if err == nil { bc.Blocks[syncTask.index] = block } } }(&configPeer, ss.stateSyncTaskQueue, bc) } wg.Wait() } // StartStateSync starts state sync. func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) { // Creates sync config. ss.createSyncConfig(peers) // Makes connections to peers. ss.makeConnectionToPeers() // Gets consensus hashes. ss.getConsensusHashes() // Generates state-sync task queue. ss.generateStateSyncTaskQueue() ss.downloadBlocks(bc) } func getConsensus(syncConfig *SyncConfig) bool { return true }