[sync] refactor the convoluted logic in processStateSync

pull/3612/head
Jacky Wang 4 years ago
parent a4001a92de
commit bec43ca07e
No known key found for this signature in database
GPG Key ID: 1085CE5F4FF5842C
  1. 6
      api/service/legacysync/downloader/client.go
  2. 52
      api/service/legacysync/syncing.go
  3. 49
      api/service/legacysync/syncing_test.go
  4. 4
      node/node_syncing.go

@ -126,13 +126,13 @@ func (client *Client) Register(hash []byte, ip, port string) *pb.DownloaderRespo
}
// PushNewBlock will send the lastest verified block to registered nodes
func (client *Client) PushNewBlock(selfPeerHash [20]byte, blockHash []byte, timeout bool) (*pb.DownloaderResponse, error) {
func (client *Client) PushNewBlock(selfPeerHash [20]byte, blockBytes []byte, timeout bool) (*pb.DownloaderResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_NEWBLOCK}
request.BlockHash = make([]byte, len(blockHash))
copy(request.BlockHash, blockHash)
request.BlockHash = make([]byte, len(blockBytes))
copy(request.BlockHash, blockBytes)
request.PeerHash = make([]byte, len(selfPeerHash))
copy(request.PeerHash, selfPeerHash[:])

@ -11,8 +11,6 @@ import (
"sync"
"time"
"github.com/harmony-one/harmony/internal/chain"
"github.com/Workiva/go-datastructures/queue"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
@ -22,6 +20,7 @@ import (
"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
@ -758,6 +757,40 @@ func (ss *StateSync) getBlockFromOldBlocksByParentHash(parentHash common.Hash) *
return nil
}
func (ss *StateSync) getCommonBlockIter(parentHash common.Hash) *commonBlockIter {
return newCommonBlockIter(ss.commonBlocks, parentHash)
}
type commonBlockIter struct {
parentToChild map[common.Hash]*types.Block
curParentHash common.Hash
}
func newCommonBlockIter(blocks map[int]*types.Block, startHash common.Hash) *commonBlockIter {
m := make(map[common.Hash]*types.Block)
for _, block := range blocks {
m[block.ParentHash()] = block
}
return &commonBlockIter{
parentToChild: m,
curParentHash: startHash,
}
}
func (iter *commonBlockIter) Next() *types.Block {
curBlock, ok := iter.parentToChild[iter.curParentHash]
if !ok || curBlock == nil {
return nil
}
iter.curParentHash = curBlock.Hash()
return curBlock
}
func (iter *commonBlockIter) HasNext() bool {
_, ok := iter.parentToChild[iter.curParentHash]
return ok
}
func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Hash) *types.Block {
for _, block := range ss.lastMileBlocks {
ph := block.ParentHash()
@ -848,20 +881,21 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
parentHash := bc.CurrentBlock().Hash()
var err error
for block := ss.getBlockFromOldBlocksByParentHash(parentHash); ; {
commonIter := ss.getCommonBlockIter(parentHash)
for {
block := commonIter.Next()
if block == nil {
break
}
next := ss.getBlockFromOldBlocksByParentHash(block.Hash())
// Enforce sig check for the last block in a batch
enforceSigCheck := next == nil
enforceSigCheck := !commonIter.HasNext()
err = ss.UpdateBlockAndStatus(block, bc, worker, enforceSigCheck)
if err != nil {
break
}
block = next
}
ss.syncMux.Lock()
ss.commonBlocks = make(map[int]*types.Block)
ss.syncMux.Unlock()
@ -873,7 +907,7 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
if block == nil {
break
}
err = ss.UpdateBlockAndStatus(block, bc, worker, true)
err = ss.UpdateBlockAndStatus(block, bc, worker, false)
if err != nil {
break
}
@ -894,7 +928,7 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
if block == nil {
break
}
err = ss.UpdateBlockAndStatus(block, bc, worker, true)
err = ss.UpdateBlockAndStatus(block, bc, worker, false)
if err != nil {
break
}

@ -3,11 +3,16 @@ package legacysync
import (
"errors"
"fmt"
"math/big"
"reflect"
"strings"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/api/service/legacysync/downloader"
"github.com/harmony-one/harmony/block"
headerV3 "github.com/harmony-one/harmony/block/v3"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/p2p"
"github.com/stretchr/testify/assert"
)
@ -169,6 +174,28 @@ func TestLimitPeersWithBound_random(t *testing.T) {
}
}
func TestCommonBlockIter(t *testing.T) {
size := 10
blocks := makeTestBlocks(size)
iter := newCommonBlockIter(blocks, testGenesis.Hash())
itCnt := 0
for {
hasNext := iter.HasNext()
b := iter.Next()
if (b == nil) == hasNext {
t.Errorf("has next unexpected: %v / %v", hasNext, b != nil)
}
if b == nil {
break
}
itCnt++
}
if itCnt != size {
t.Errorf("unexpected iteration count: %v / %v", itCnt, size)
}
}
func makePeersForTest(size int) []p2p.Peer {
ps := make([]p2p.Peer, 0, size)
for i := 0; i != size; i++ {
@ -199,3 +226,25 @@ func assertTestError(got, expect error) error {
}
return nil
}
func makeTestBlocks(size int) map[int]*types.Block {
m := make(map[int]*types.Block)
parentHash := testGenesis.Hash()
for i := 0; i != size; i++ {
b := makeTestBlock(uint64(i)+1, parentHash)
parentHash = b.Hash()
m[i] = b
}
return m
}
var testGenesis = makeTestBlock(0, common.Hash{})
func makeTestBlock(bn uint64, parentHash common.Hash) *types.Block {
testHeader := &block.Header{Header: headerV3.NewHeader()}
testHeader.SetNumber(big.NewInt(int64(bn)))
testHeader.SetParentHash(parentHash)
block := types.NewBlockWithHeader(testHeader)
return block
}

@ -352,7 +352,7 @@ func (node *Node) StartSyncingServer() {
func (node *Node) SendNewBlockToUnsync() {
for {
block := <-node.Consensus.VerifiedNewBlock
blockHash, err := rlp.EncodeToBytes(block)
blockBytes, err := rlp.EncodeToBytes(block)
if err != nil {
utils.Logger().Warn().Msg("[SYNC] unable to encode block to hashes")
continue
@ -367,7 +367,7 @@ func (node *Node) SendNewBlockToUnsync() {
delete(node.peerRegistrationRecord, peerID)
continue
}
response, err := config.client.PushNewBlock(node.GetSyncID(), blockHash, false)
response, err := config.client.PushNewBlock(node.GetSyncID(), blockBytes, false)
// close the connection if cannot push new block to unsync node
if err != nil {
node.peerRegistrationRecord[peerID].client.Close()

Loading…
Cancel
Save