Merge branch 'dev' into feature/clear-stale-staking-data

feature/dev-engine_test
static 1 year ago
commit 5c512f62ed
  1. 25
      consensus/consensus.go
  2. 1
      consensus/consensus_test.go
  3. 11
      consensus/downloader.go
  4. 4
      node/node.go
  5. 36
      node/node_syncing.go

@ -98,8 +98,6 @@ type Consensus struct {
BlockVerifier VerifyBlockFunc BlockVerifier VerifyBlockFunc
// verified block to state sync broadcast // verified block to state sync broadcast
VerifiedNewBlock chan *types.Block VerifiedNewBlock chan *types.Block
// will trigger state syncing when blockNum is low
BlockNumLowChan chan struct{}
// Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf // Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf
// randomnesses) to consensus. The first 32 bytes are randomness, the rest is for bitmap. // randomnesses) to consensus. The first 32 bytes are randomness, the rest is for bitmap.
PRndChannel chan []byte PRndChannel chan []byte
@ -268,18 +266,17 @@ func New(
Decider quorum.Decider, minPeers int, aggregateSig bool, Decider quorum.Decider, minPeers int, aggregateSig bool,
) (*Consensus, error) { ) (*Consensus, error) {
consensus := Consensus{ consensus := Consensus{
mutex: &sync.RWMutex{}, mutex: &sync.RWMutex{},
ShardID: shard, ShardID: shard,
fBFTLog: NewFBFTLog(), fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce, phase: FBFTAnnounce,
current: State{mode: Normal}, current: State{mode: Normal},
Decider: Decider, Decider: Decider,
registry: registry, registry: registry,
MinPeers: minPeers, MinPeers: minPeers,
AggregateSig: aggregateSig, AggregateSig: aggregateSig,
host: host, host: host,
msgSender: NewMessageSender(host), msgSender: NewMessageSender(host),
BlockNumLowChan: make(chan struct{}, 1),
// FBFT timeout // FBFT timeout
consensusTimeout: createTimeout(), consensusTimeout: createTimeout(),
} }

@ -33,7 +33,6 @@ func TestConsensusInitialization(t *testing.T) {
assert.Equal(t, decider, consensus.Decider) assert.Equal(t, decider, consensus.Decider)
assert.Equal(t, host, consensus.host) assert.Equal(t, host, consensus.host)
assert.Equal(t, messageSender, consensus.msgSender) assert.Equal(t, messageSender, consensus.msgSender)
assert.IsType(t, make(chan struct{}), consensus.BlockNumLowChan)
// FBFTLog // FBFTLog
assert.NotNil(t, consensus.FBFTLog()) assert.NotNil(t, consensus.FBFTLog())

@ -110,14 +110,3 @@ func (consensus *Consensus) spinUpStateSync() {
v.Stop() v.Stop()
} }
} }
func (consensus *Consensus) spinLegacyStateSync() {
select {
case consensus.BlockNumLowChan <- struct{}{}:
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
default:
}
}

@ -158,10 +158,6 @@ func (node *Node) SyncInstance() ISync {
return node.GetOrCreateSyncInstance(true) return node.GetOrCreateSyncInstance(true)
} }
func (node *Node) CurrentSyncInstance() bool {
return node.GetOrCreateSyncInstance(false) != nil
}
// GetOrCreateSyncInstance returns an instance of state sync, either legacy or staged // GetOrCreateSyncInstance returns an instance of state sync, either legacy or staged
// if initiate sets to true, it generates a new instance // if initiate sets to true, it generates a new instance
func (node *Node) GetOrCreateSyncInstance(initiate bool) ISync { func (node *Node) GetOrCreateSyncInstance(initiate bool) ISync {

@ -7,30 +7,28 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/multiformats/go-multiaddr"
prom "github.com/harmony-one/harmony/api/service/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/api/service/legacysync" "github.com/harmony-one/harmony/api/service/legacysync"
legdownloader "github.com/harmony-one/harmony/api/service/legacysync/downloader" legdownloader "github.com/harmony-one/harmony/api/service/legacysync/downloader"
downloader_pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto" downloader_pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto"
prom "github.com/harmony-one/harmony/api/service/prometheus"
"github.com/harmony-one/harmony/api/service/stagedstreamsync" "github.com/harmony-one/harmony/api/service/stagedstreamsync"
"github.com/harmony-one/harmony/api/service/stagedsync" "github.com/harmony-one/harmony/api/service/stagedsync"
"github.com/harmony-one/harmony/api/service/synchronize" "github.com/harmony-one/harmony/api/service/synchronize"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard" "github.com/harmony-one/harmony/shard"
lru "github.com/hashicorp/golang-lru"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
) )
// Constants related to doing syncing. // Constants related to doing syncing.
@ -279,20 +277,16 @@ func (node *Node) DoSyncing(bc core.BlockChain, willJoinConsensus bool) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
node.doSync(bc, willJoinConsensus) node.doSync(node.SyncInstance(), node.SyncingPeerProvider, bc, node.Consensus, willJoinConsensus)
case <-node.Consensus.BlockNumLowChan:
node.doSync(bc, willJoinConsensus)
} }
} }
} }
// doSync keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up // doSync keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up
func (node *Node) doSync(bc core.BlockChain, willJoinConsensus bool) { func (node *Node) doSync(syncInstance ISync, syncingPeerProvider SyncingPeerProvider, bc core.BlockChain, consensus *consensus.Consensus, willJoinConsensus bool) {
syncInstance := node.SyncInstance()
if syncInstance.GetActivePeerNumber() < legacysync.NumPeersLowBound { if syncInstance.GetActivePeerNumber() < legacysync.NumPeersLowBound {
shardID := bc.ShardID() shardID := bc.ShardID()
peers, err := node.SyncingPeerProvider.SyncingPeers(shardID) peers, err := syncingPeerProvider.SyncingPeers(shardID)
if err != nil { if err != nil {
utils.Logger().Warn(). utils.Logger().Warn().
Err(err). Err(err).
@ -313,13 +307,13 @@ func (node *Node) doSync(bc core.BlockChain, willJoinConsensus bool) {
if isSynchronized, _, _ := syncInstance.GetParsedSyncStatusDoubleChecked(); !isSynchronized { if isSynchronized, _, _ := syncInstance.GetParsedSyncStatusDoubleChecked(); !isSynchronized {
node.IsSynchronized.UnSet() node.IsSynchronized.UnSet()
if willJoinConsensus { if willJoinConsensus {
node.Consensus.BlocksNotSynchronized() consensus.BlocksNotSynchronized()
} }
isBeacon := bc.ShardID() == shard.BeaconChainShardID isBeacon := bc.ShardID() == shard.BeaconChainShardID
syncInstance.SyncLoop(bc, isBeacon, node.Consensus, legacysync.LoopMinTime) syncInstance.SyncLoop(bc, isBeacon, consensus, legacysync.LoopMinTime)
if willJoinConsensus { if willJoinConsensus {
node.IsSynchronized.Set() node.IsSynchronized.Set()
node.Consensus.BlocksSynchronized() consensus.BlocksSynchronized()
} }
} }
node.IsSynchronized.Set() node.IsSynchronized.Set()
@ -415,7 +409,7 @@ func (node *Node) SendNewBlockToUnsync() {
utils.Logger().Warn().Msg("[SYNC] unable to encode block to hashes") utils.Logger().Warn().Msg("[SYNC] unable to encode block to hashes")
continue continue
} }
blockWithSigBytes, err := node.getEncodedBlockWithSigFromBlock(block) blockWithSigBytes, err := getEncodedBlockWithSigFromBlock(block)
if err != nil { if err != nil {
utils.Logger().Warn().Err(err).Msg("[SYNC] rlp encode BlockWithSig") utils.Logger().Warn().Err(err).Msg("[SYNC] rlp encode BlockWithSig")
continue continue
@ -747,7 +741,7 @@ func (node *Node) getEncodedBlockWithSigByHeight(height uint64) ([]byte, error)
return b, nil return b, nil
} }
func (node *Node) getEncodedBlockWithSigFromBlock(block *types.Block) ([]byte, error) { func getEncodedBlockWithSigFromBlock(block *types.Block) ([]byte, error) {
bwh := legacysync.BlockWithSig{ bwh := legacysync.BlockWithSig{
Block: block, Block: block,
CommitSigAndBitmap: block.GetCurrentCommitSig(), CommitSigAndBitmap: block.GetCurrentCommitSig(),

Loading…
Cancel
Save