Rwlock consensus.

pull/4369/head
frozen 2 years ago committed by Casey Gardiner
parent 1aefca345d
commit a92cc1bb40
  1. 8
      api/service/consensus/service.go
  2. 17
      consensus/consensus.go
  3. 6
      consensus/consensus_test.go
  4. 58
      consensus/consensus_v2.go
  5. 2
      node/node.go
  6. 2
      node/node_newblock.go
  7. 2
      node/service_setup.go

@ -3,13 +3,11 @@ package consensus
import ( import (
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
) )
// Service is the consensus service. // Service is the consensus service.
type Service struct { type Service struct {
blockChannel chan *types.Block // The channel to receive new blocks from Node
consensus *consensus.Consensus consensus *consensus.Consensus
stopChan chan struct{} stopChan chan struct{}
stoppedChan chan struct{} stoppedChan chan struct{}
@ -18,8 +16,8 @@ type Service struct {
} }
// New returns consensus service. // New returns consensus service.
func New(blockChannel chan *types.Block, consensus *consensus.Consensus, startChan chan struct{}) *Service { func New(consensus *consensus.Consensus, startChan chan struct{}) *Service {
return &Service{blockChannel: blockChannel, consensus: consensus, startChan: startChan} return &Service{consensus: consensus, startChan: startChan}
} }
// Start starts consensus service. // Start starts consensus service.
@ -27,7 +25,7 @@ func (s *Service) Start() error {
utils.Logger().Info().Msg("[consensus/service] Starting consensus service.") utils.Logger().Info().Msg("[consensus/service] Starting consensus service.")
s.stopChan = make(chan struct{}) s.stopChan = make(chan struct{})
s.stoppedChan = make(chan struct{}) s.stoppedChan = make(chan struct{})
s.consensus.Start(s.blockChannel, s.stopChan, s.stoppedChan, s.startChan) s.consensus.Start(s.stopChan, s.stoppedChan, s.startChan)
s.consensus.WaitForNewRandomness() s.consensus.WaitForNewRandomness()
return nil return nil
} }

@ -85,7 +85,7 @@ type Consensus struct {
// IgnoreViewIDCheck determines whether to ignore viewID check // IgnoreViewIDCheck determines whether to ignore viewID check
IgnoreViewIDCheck *abool.AtomicBool IgnoreViewIDCheck *abool.AtomicBool
// consensus mutex // consensus mutex
mutex sync.Mutex mutex sync.RWMutex
// mutex for verify new block // mutex for verify new block
verifyBlockMutex sync.Mutex verifyBlockMutex sync.Mutex
// ViewChange struct // ViewChange struct
@ -114,10 +114,6 @@ type Consensus struct {
host p2p.Host host p2p.Host
// MessageSender takes are of sending consensus message and the corresponding retry logic. // MessageSender takes are of sending consensus message and the corresponding retry logic.
msgSender *MessageSender msgSender *MessageSender
// Used to convey to the consensus main loop that block syncing has finished.
syncReadyChan chan struct{}
// Used to convey to the consensus main loop that node is out of sync
syncNotReadyChan chan struct{}
// If true, this consensus will not propose view change. // If true, this consensus will not propose view change.
disableViewChange bool disableViewChange bool
// Have a dedicated reader thread pull from this chan, like in node // Have a dedicated reader thread pull from this chan, like in node
@ -136,6 +132,9 @@ type Consensus struct {
finalityCounter atomic.Value //int64 finalityCounter atomic.Value //int64
dHelper *downloadHelper dHelper *downloadHelper
// Flag only for initialization state.
start bool
} }
// Blockchain returns the blockchain. // Blockchain returns the blockchain.
@ -157,12 +156,14 @@ func (consensus *Consensus) VerifyBlock(block *types.Block) error {
// BlocksSynchronized lets the main loop know that block synchronization finished // BlocksSynchronized lets the main loop know that block synchronization finished
// thus the blockchain is likely to be up to date. // thus the blockchain is likely to be up to date.
func (consensus *Consensus) BlocksSynchronized() { func (consensus *Consensus) BlocksSynchronized() {
consensus.syncReadyChan <- struct{}{} consensus.mutex.Lock()
consensus.syncReadyChan()
consensus.mutex.Unlock()
} }
// BlocksNotSynchronized lets the main loop know that block is not synchronized // BlocksNotSynchronized lets the main loop know that block is not synchronized
func (consensus *Consensus) BlocksNotSynchronized() { func (consensus *Consensus) BlocksNotSynchronized() {
consensus.syncNotReadyChan <- struct{}{} consensus.syncNotReadyChan()
} }
// VdfSeedSize returns the number of VRFs for VDF computation // VdfSeedSize returns the number of VRFs for VDF computation
@ -265,8 +266,6 @@ func New(
// displayed on explorer as Height right now // displayed on explorer as Height right now
consensus.SetCurBlockViewID(0) consensus.SetCurBlockViewID(0)
consensus.ShardID = shard consensus.ShardID = shard
consensus.syncReadyChan = make(chan struct{})
consensus.syncNotReadyChan = make(chan struct{})
consensus.SlashChan = make(chan slash.Record) consensus.SlashChan = make(chan slash.Record)
consensus.ReadySignal = make(chan ProposalType) consensus.ReadySignal = make(chan ProposalType)
consensus.CommitSigChannel = make(chan []byte) consensus.CommitSigChannel = make(chan []byte)

@ -61,11 +61,7 @@ func TestConsensusInitialization(t *testing.T) {
assert.Equal(t, uint64(0), consensus.GetViewChangingID()) assert.Equal(t, uint64(0), consensus.GetViewChangingID())
assert.Equal(t, uint32(shard.BeaconChainShardID), consensus.ShardID) assert.Equal(t, uint32(shard.BeaconChainShardID), consensus.ShardID)
assert.IsType(t, make(chan struct{}), consensus.syncReadyChan) assert.Equal(t, false, consensus.start)
assert.NotNil(t, consensus.syncReadyChan)
assert.IsType(t, make(chan struct{}), consensus.syncNotReadyChan)
assert.NotNil(t, consensus.syncNotReadyChan)
assert.IsType(t, make(chan slash.Record), consensus.SlashChan) assert.IsType(t, make(chan slash.Record), consensus.SlashChan)
assert.NotNil(t, consensus.SlashChan) assert.NotNil(t, consensus.SlashChan)

@ -292,7 +292,7 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) {
// Start waits for the next new block and run consensus // Start waits for the next new block and run consensus
func (consensus *Consensus) Start( func (consensus *Consensus) Start(
blockChannel chan *types.Block, stopChan, stoppedChan, startChannel chan struct{}, stopChan, stoppedChan, startChannel chan struct{},
) { ) {
go func() { go func() {
toStart := make(chan struct{}, 1) toStart := make(chan struct{}, 1)
@ -317,13 +317,13 @@ func (consensus *Consensus) Start(
// Set up next block due time. // Set up next block due time.
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod)
start := false consensus.start = false
for { for {
select { select {
case <-toStart: case <-toStart:
start = true consensus.start = true
case <-ticker.C: case <-ticker.C:
if !start && isInitialLeader { if !consensus.start && isInitialLeader {
continue continue
} }
for k, v := range consensus.consensusTimeout { for k, v := range consensus.consensusTimeout {
@ -362,10 +362,20 @@ func (consensus *Consensus) Start(
} }
} }
// TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed case <-stopChan:
case <-consensus.syncReadyChan: consensus.getLogger().Info().Msg("[ConsensusMainLoop] stopChan")
return
}
}
}()
if consensus.dHelper != nil {
consensus.dHelper.start()
}
}
func (consensus *Consensus) syncReadyChan() {
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan")
consensus.mutex.Lock()
if consensus.BlockNum() < consensus.Blockchain().CurrentHeader().Number().Uint64()+1 { if consensus.BlockNum() < consensus.Blockchain().CurrentHeader().Number().Uint64()+1 {
consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1)
consensus.SetViewIDs(consensus.Blockchain().CurrentHeader().ViewID().Uint64() + 1) consensus.SetViewIDs(consensus.Blockchain().CurrentHeader().ViewID().Uint64() + 1)
@ -384,31 +394,40 @@ func (consensus *Consensus) Start(
consensus.consensusTimeout[timeoutConsensus].Start() consensus.consensusTimeout[timeoutConsensus].Start()
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc() consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc()
} }
consensus.mutex.Unlock()
// TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed }
case <-consensus.syncNotReadyChan:
func (consensus *Consensus) syncNotReadyChan() {
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan")
consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1)
consensus.current.SetMode(Syncing) consensus.current.SetMode(Syncing)
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC") consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC")
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc() consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc()
}
case newBlock := <-blockChannel: // Close close the consensus. If current is in normal commit phase, wait until the commit
//consensus.ReshardingNextLeader(newBlock) // phase end.
consensus.getLogger().Info(). func (consensus *Consensus) Close() error {
if consensus.dHelper != nil {
consensus.dHelper.close()
}
consensus.waitForCommit()
return nil
}
func (consensus *Consensus) BlockChannel(newBlock *types.Block) {
//consensus.ReshardingNextLeader(newBlock)consensus.getLogger().Info().
Uint64("MsgBlockNum", newBlock.NumberU64()). Uint64("MsgBlockNum", newBlock.NumberU64()).
Msg("[ConsensusMainLoop] Received Proposed New Block!") Msg("[ConsensusMainLoop] Received Proposed New Block!")
if newBlock.NumberU64() < consensus.BlockNum() { if newBlock.NumberU64() < consensus.BlockNum() {
consensus.getLogger().Warn().Uint64("newBlockNum", newBlock.NumberU64()). consensus.getLogger().Warn().Uint64("newBlockNum", newBlock.NumberU64()).
Msg("[ConsensusMainLoop] received old block, abort") Msg("[ConsensusMainLoop] received old block, abort")
continue return
} }
// Sleep to wait for the full block time // Sleep to wait for the full block time
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time") consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time")
time.AfterFunc(time.Until(consensus.NextBlockDue), func() {
<-time.After(time.Until(consensus.NextBlockDue))
consensus.StartFinalityCount() consensus.StartFinalityCount()
// Update time due for next block // Update time due for next block
@ -424,12 +443,7 @@ func (consensus *Consensus) Start(
Int64("publicKeys", consensus.Decider.ParticipantsCount()). Int64("publicKeys", consensus.Decider.ParticipantsCount()).
Msg("[ConsensusMainLoop] STARTING CONSENSUS") Msg("[ConsensusMainLoop] STARTING CONSENSUS")
consensus.announce(newBlock) consensus.announce(newBlock)
case <-stopChan: })
consensus.getLogger().Info().Msg("[ConsensusMainLoop] stopChan")
return
}
}
}()
if consensus.dHelper != nil { if consensus.dHelper != nil {
consensus.dHelper.start() consensus.dHelper.start()

@ -100,7 +100,6 @@ type ISync interface {
// Node represents a protocol-participating node in the network // Node represents a protocol-participating node in the network
type Node struct { type Node struct {
Consensus *consensus.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits) Consensus *consensus.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits)
BlockChannel chan *types.Block // The channel to send newly proposed blocks
ConfirmedBlockChannel chan *types.Block // The channel to send confirmed blocks ConfirmedBlockChannel chan *types.Block // The channel to send confirmed blocks
BeaconBlockChannel chan *types.Block // The channel to send beacon blocks for non-beaconchain nodes BeaconBlockChannel chan *types.Block // The channel to send beacon blocks for non-beaconchain nodes
pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus
@ -1083,7 +1082,6 @@ func New(
} }
} }
node.BlockChannel = make(chan *types.Block)
node.ConfirmedBlockChannel = make(chan *types.Block) node.ConfirmedBlockChannel = make(chan *types.Block)
node.BeaconBlockChannel = make(chan *types.Block) node.BeaconBlockChannel = make(chan *types.Block)
txPoolConfig := core.DefaultTxPoolConfig txPoolConfig := core.DefaultTxPoolConfig

@ -107,7 +107,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
// Send the new block to Consensus so it can be confirmed. // Send the new block to Consensus so it can be confirmed.
node.proposedBlock[newBlock.NumberU64()] = newBlock node.proposedBlock[newBlock.NumberU64()] = newBlock
delete(node.proposedBlock, newBlock.NumberU64()-10) delete(node.proposedBlock, newBlock.NumberU64()-10)
node.BlockChannel <- newBlock node.Consensus.BlockChannel(newBlock)
break break
} else { } else {
retryCount++ retryCount++

@ -14,7 +14,7 @@ func (node *Node) RegisterValidatorServices() {
// Register consensus service. // Register consensus service.
node.serviceManager.Register( node.serviceManager.Register(
service.Consensus, service.Consensus,
consensus.New(node.BlockChannel, node.Consensus, node.startConsensus), consensus.New(node.Consensus, node.startConsensus),
) )
// Register new block service. // Register new block service.
node.serviceManager.Register( node.serviceManager.Register(

Loading…
Cancel
Save