diff --git a/consensus/consensus.go b/consensus/consensus.go index b396f6ead..8ec5cc81f 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -98,8 +98,6 @@ type Consensus struct { BlockVerifier VerifyBlockFunc // verified block to state sync broadcast VerifiedNewBlock chan *types.Block - // will trigger state syncing when blockNum is low - BlockNumLowChan chan struct{} // Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf // randomnesses) to consensus. The first 32 bytes are randomness, the rest is for bitmap. PRndChannel chan []byte @@ -268,18 +266,17 @@ func New( Decider quorum.Decider, minPeers int, aggregateSig bool, ) (*Consensus, error) { consensus := Consensus{ - mutex: &sync.RWMutex{}, - ShardID: shard, - fBFTLog: NewFBFTLog(), - phase: FBFTAnnounce, - current: State{mode: Normal}, - Decider: Decider, - registry: registry, - MinPeers: minPeers, - AggregateSig: aggregateSig, - host: host, - msgSender: NewMessageSender(host), - BlockNumLowChan: make(chan struct{}, 1), + mutex: &sync.RWMutex{}, + ShardID: shard, + fBFTLog: NewFBFTLog(), + phase: FBFTAnnounce, + current: State{mode: Normal}, + Decider: Decider, + registry: registry, + MinPeers: minPeers, + AggregateSig: aggregateSig, + host: host, + msgSender: NewMessageSender(host), // FBFT timeout consensusTimeout: createTimeout(), } diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 697ba4952..fa1deaf57 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -33,7 +33,6 @@ func TestConsensusInitialization(t *testing.T) { assert.Equal(t, decider, consensus.Decider) assert.Equal(t, host, consensus.host) assert.Equal(t, messageSender, consensus.msgSender) - assert.IsType(t, make(chan struct{}), consensus.BlockNumLowChan) // FBFTLog assert.NotNil(t, consensus.FBFTLog()) diff --git a/consensus/downloader.go b/consensus/downloader.go index 84414aa80..dde7deab7 100644 --- a/consensus/downloader.go +++ b/consensus/downloader.go @@ -110,14 +110,3 @@ func (consensus *Consensus) spinUpStateSync() { v.Stop() } } - -func (consensus *Consensus) spinLegacyStateSync() { - select { - case consensus.BlockNumLowChan <- struct{}{}: - consensus.current.SetMode(Syncing) - for _, v := range consensus.consensusTimeout { - v.Stop() - } - default: - } -} diff --git a/node/node.go b/node/node.go index f035bf491..a77939f56 100644 --- a/node/node.go +++ b/node/node.go @@ -158,10 +158,6 @@ func (node *Node) SyncInstance() ISync { return node.GetOrCreateSyncInstance(true) } -func (node *Node) CurrentSyncInstance() bool { - return node.GetOrCreateSyncInstance(false) != nil -} - // GetOrCreateSyncInstance returns an instance of state sync, either legacy or staged // if initiate sets to true, it generates a new instance func (node *Node) GetOrCreateSyncInstance(initiate bool) ISync { diff --git a/node/node_syncing.go b/node/node_syncing.go index fa90ec5c7..830df25c0 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -7,30 +7,28 @@ import ( "strconv" "time" - "github.com/harmony-one/harmony/internal/tikv" - "github.com/multiformats/go-multiaddr" - - prom "github.com/harmony-one/harmony/api/service/prometheus" - "github.com/prometheus/client_golang/prometheus" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" - lru "github.com/hashicorp/golang-lru" - "github.com/pkg/errors" - "github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/api/service/legacysync" legdownloader "github.com/harmony-one/harmony/api/service/legacysync/downloader" downloader_pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto" + prom "github.com/harmony-one/harmony/api/service/prometheus" "github.com/harmony-one/harmony/api/service/stagedstreamsync" "github.com/harmony-one/harmony/api/service/stagedsync" "github.com/harmony-one/harmony/api/service/synchronize" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/tikv" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/shard" + lru "github.com/hashicorp/golang-lru" + "github.com/multiformats/go-multiaddr" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" ) // Constants related to doing syncing. @@ -279,20 +277,16 @@ func (node *Node) DoSyncing(bc core.BlockChain, willJoinConsensus bool) { for { select { case <-ticker.C: - node.doSync(bc, willJoinConsensus) - case <-node.Consensus.BlockNumLowChan: - node.doSync(bc, willJoinConsensus) + node.doSync(node.SyncInstance(), node.SyncingPeerProvider, bc, node.Consensus, willJoinConsensus) } } } // doSync keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up -func (node *Node) doSync(bc core.BlockChain, willJoinConsensus bool) { - - syncInstance := node.SyncInstance() +func (node *Node) doSync(syncInstance ISync, syncingPeerProvider SyncingPeerProvider, bc core.BlockChain, consensus *consensus.Consensus, willJoinConsensus bool) { if syncInstance.GetActivePeerNumber() < legacysync.NumPeersLowBound { shardID := bc.ShardID() - peers, err := node.SyncingPeerProvider.SyncingPeers(shardID) + peers, err := syncingPeerProvider.SyncingPeers(shardID) if err != nil { utils.Logger().Warn(). Err(err). @@ -313,13 +307,13 @@ func (node *Node) doSync(bc core.BlockChain, willJoinConsensus bool) { if isSynchronized, _, _ := syncInstance.GetParsedSyncStatusDoubleChecked(); !isSynchronized { node.IsSynchronized.UnSet() if willJoinConsensus { - node.Consensus.BlocksNotSynchronized() + consensus.BlocksNotSynchronized() } isBeacon := bc.ShardID() == shard.BeaconChainShardID - syncInstance.SyncLoop(bc, isBeacon, node.Consensus, legacysync.LoopMinTime) + syncInstance.SyncLoop(bc, isBeacon, consensus, legacysync.LoopMinTime) if willJoinConsensus { node.IsSynchronized.Set() - node.Consensus.BlocksSynchronized() + consensus.BlocksSynchronized() } } node.IsSynchronized.Set() @@ -415,7 +409,7 @@ func (node *Node) SendNewBlockToUnsync() { utils.Logger().Warn().Msg("[SYNC] unable to encode block to hashes") continue } - blockWithSigBytes, err := node.getEncodedBlockWithSigFromBlock(block) + blockWithSigBytes, err := getEncodedBlockWithSigFromBlock(block) if err != nil { utils.Logger().Warn().Err(err).Msg("[SYNC] rlp encode BlockWithSig") continue @@ -747,7 +741,7 @@ func (node *Node) getEncodedBlockWithSigByHeight(height uint64) ([]byte, error) return b, nil } -func (node *Node) getEncodedBlockWithSigFromBlock(block *types.Block) ([]byte, error) { +func getEncodedBlockWithSigFromBlock(block *types.Block) ([]byte, error) { bwh := legacysync.BlockWithSig{ Block: block, CommitSigAndBitmap: block.GetCurrentCommitSig(),