From bec43ca07ec70e1a7f102281a0a95301e3e285b7 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Thu, 25 Mar 2021 18:46:28 -0700 Subject: [PATCH] [sync] refactor the convoluted logic in processStateSync --- api/service/legacysync/downloader/client.go | 6 +-- api/service/legacysync/syncing.go | 52 +++++++++++++++++---- api/service/legacysync/syncing_test.go | 49 +++++++++++++++++++ node/node_syncing.go | 4 +- 4 files changed, 97 insertions(+), 14 deletions(-) diff --git a/api/service/legacysync/downloader/client.go b/api/service/legacysync/downloader/client.go index 0c7756ba4..ac03e2130 100644 --- a/api/service/legacysync/downloader/client.go +++ b/api/service/legacysync/downloader/client.go @@ -126,13 +126,13 @@ func (client *Client) Register(hash []byte, ip, port string) *pb.DownloaderRespo } // PushNewBlock will send the lastest verified block to registered nodes -func (client *Client) PushNewBlock(selfPeerHash [20]byte, blockHash []byte, timeout bool) (*pb.DownloaderResponse, error) { +func (client *Client) PushNewBlock(selfPeerHash [20]byte, blockBytes []byte, timeout bool) (*pb.DownloaderResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_NEWBLOCK} - request.BlockHash = make([]byte, len(blockHash)) - copy(request.BlockHash, blockHash) + request.BlockHash = make([]byte, len(blockBytes)) + copy(request.BlockHash, blockBytes) request.PeerHash = make([]byte, len(selfPeerHash)) copy(request.PeerHash, selfPeerHash[:]) diff --git a/api/service/legacysync/syncing.go b/api/service/legacysync/syncing.go index 8fe86b501..07e17b253 100644 --- a/api/service/legacysync/syncing.go +++ b/api/service/legacysync/syncing.go @@ -11,8 +11,6 @@ import ( "sync" "time" - "github.com/harmony-one/harmony/internal/chain" - "github.com/Workiva/go-datastructures/queue" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" @@ -22,6 +20,7 @@ import ( "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/chain" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" @@ -758,6 +757,40 @@ func (ss *StateSync) getBlockFromOldBlocksByParentHash(parentHash common.Hash) * return nil } +func (ss *StateSync) getCommonBlockIter(parentHash common.Hash) *commonBlockIter { + return newCommonBlockIter(ss.commonBlocks, parentHash) +} + +type commonBlockIter struct { + parentToChild map[common.Hash]*types.Block + curParentHash common.Hash +} + +func newCommonBlockIter(blocks map[int]*types.Block, startHash common.Hash) *commonBlockIter { + m := make(map[common.Hash]*types.Block) + for _, block := range blocks { + m[block.ParentHash()] = block + } + return &commonBlockIter{ + parentToChild: m, + curParentHash: startHash, + } +} + +func (iter *commonBlockIter) Next() *types.Block { + curBlock, ok := iter.parentToChild[iter.curParentHash] + if !ok || curBlock == nil { + return nil + } + iter.curParentHash = curBlock.Hash() + return curBlock +} + +func (iter *commonBlockIter) HasNext() bool { + _, ok := iter.parentToChild[iter.curParentHash] + return ok +} + func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Hash) *types.Block { for _, block := range ss.lastMileBlocks { ph := block.ParentHash() @@ -848,20 +881,21 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker parentHash := bc.CurrentBlock().Hash() var err error - for block := ss.getBlockFromOldBlocksByParentHash(parentHash); ; { + + commonIter := ss.getCommonBlockIter(parentHash) + for { + block := commonIter.Next() if block == nil { break } - next := ss.getBlockFromOldBlocksByParentHash(block.Hash()) // Enforce sig check for the last block in a batch - enforceSigCheck := next == nil - + enforceSigCheck := !commonIter.HasNext() err = ss.UpdateBlockAndStatus(block, bc, worker, enforceSigCheck) if err != nil { break } - block = next } + ss.syncMux.Lock() ss.commonBlocks = make(map[int]*types.Block) ss.syncMux.Unlock() @@ -873,7 +907,7 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker if block == nil { break } - err = ss.UpdateBlockAndStatus(block, bc, worker, true) + err = ss.UpdateBlockAndStatus(block, bc, worker, false) if err != nil { break } @@ -894,7 +928,7 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker if block == nil { break } - err = ss.UpdateBlockAndStatus(block, bc, worker, true) + err = ss.UpdateBlockAndStatus(block, bc, worker, false) if err != nil { break } diff --git a/api/service/legacysync/syncing_test.go b/api/service/legacysync/syncing_test.go index 9d566313f..2dc36827d 100644 --- a/api/service/legacysync/syncing_test.go +++ b/api/service/legacysync/syncing_test.go @@ -3,11 +3,16 @@ package legacysync import ( "errors" "fmt" + "math/big" "reflect" "strings" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/api/service/legacysync/downloader" + "github.com/harmony-one/harmony/block" + headerV3 "github.com/harmony-one/harmony/block/v3" + "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/p2p" "github.com/stretchr/testify/assert" ) @@ -169,6 +174,28 @@ func TestLimitPeersWithBound_random(t *testing.T) { } } +func TestCommonBlockIter(t *testing.T) { + size := 10 + blocks := makeTestBlocks(size) + iter := newCommonBlockIter(blocks, testGenesis.Hash()) + + itCnt := 0 + for { + hasNext := iter.HasNext() + b := iter.Next() + if (b == nil) == hasNext { + t.Errorf("has next unexpected: %v / %v", hasNext, b != nil) + } + if b == nil { + break + } + itCnt++ + } + if itCnt != size { + t.Errorf("unexpected iteration count: %v / %v", itCnt, size) + } +} + func makePeersForTest(size int) []p2p.Peer { ps := make([]p2p.Peer, 0, size) for i := 0; i != size; i++ { @@ -199,3 +226,25 @@ func assertTestError(got, expect error) error { } return nil } + +func makeTestBlocks(size int) map[int]*types.Block { + m := make(map[int]*types.Block) + parentHash := testGenesis.Hash() + for i := 0; i != size; i++ { + b := makeTestBlock(uint64(i)+1, parentHash) + parentHash = b.Hash() + + m[i] = b + } + return m +} + +var testGenesis = makeTestBlock(0, common.Hash{}) + +func makeTestBlock(bn uint64, parentHash common.Hash) *types.Block { + testHeader := &block.Header{Header: headerV3.NewHeader()} + testHeader.SetNumber(big.NewInt(int64(bn))) + testHeader.SetParentHash(parentHash) + block := types.NewBlockWithHeader(testHeader) + return block +} diff --git a/node/node_syncing.go b/node/node_syncing.go index 0f767c88f..d9e45a294 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -352,7 +352,7 @@ func (node *Node) StartSyncingServer() { func (node *Node) SendNewBlockToUnsync() { for { block := <-node.Consensus.VerifiedNewBlock - blockHash, err := rlp.EncodeToBytes(block) + blockBytes, err := rlp.EncodeToBytes(block) if err != nil { utils.Logger().Warn().Msg("[SYNC] unable to encode block to hashes") continue @@ -367,7 +367,7 @@ func (node *Node) SendNewBlockToUnsync() { delete(node.peerRegistrationRecord, peerID) continue } - response, err := config.client.PushNewBlock(node.GetSyncID(), blockHash, false) + response, err := config.client.PushNewBlock(node.GetSyncID(), blockBytes, false) // close the connection if cannot push new block to unsync node if err != nil { node.peerRegistrationRecord[peerID].client.Close()