Removed channels.

pull/4377/head
frozen 2 years ago committed by Casey Gardiner
parent 9ddb48e633
commit 432ac56a6a
  1. 12
      api/service/consensus/service.go
  2. 5
      consensus/consensus.go
  3. 134
      consensus/consensus_v2.go
  4. 5
      node/node.go
  5. 4
      node/node_handler.go
  6. 2
      node/service_setup.go

@ -10,22 +10,19 @@ import (
type Service struct {
consensus *consensus.Consensus
stopChan chan struct{}
stoppedChan chan struct{}
startChan chan struct{}
messageChan chan *msg_pb.Message
}
// New returns consensus service.
func New(consensus *consensus.Consensus, startChan chan struct{}) *Service {
return &Service{consensus: consensus, startChan: startChan}
func New(consensus *consensus.Consensus) *Service {
return &Service{consensus: consensus}
}
// Start starts consensus service.
func (s *Service) Start() error {
utils.Logger().Info().Msg("[consensus/service] Starting consensus service.")
s.stopChan = make(chan struct{})
s.stoppedChan = make(chan struct{})
s.consensus.Start(s.stopChan, s.stoppedChan, s.startChan)
s.consensus.Start(s.stopChan)
s.consensus.WaitForNewRandomness()
return nil
}
@ -33,8 +30,7 @@ func (s *Service) Start() error {
// Stop stops consensus service.
func (s *Service) Stop() error {
utils.Logger().Info().Msg("Stopping consensus service.")
s.stopChan <- struct{}{}
<-s.stoppedChan
close(s.stopChan)
utils.Logger().Info().Msg("Consensus service stopped.")
return s.consensus.Close()
}

@ -133,8 +133,9 @@ type Consensus struct {
dHelper *downloadHelper
// Flag only for initialization state.
start bool
// Both flags only for initialization state.
start bool
isInitialLeader bool
}
// Blockchain returns the blockchain.

@ -292,81 +292,28 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) {
// Start waits for the next new block and run consensus
func (consensus *Consensus) Start(
stopChan, stoppedChan, startChannel chan struct{},
stopChan chan struct{},
) {
go func() {
toStart := make(chan struct{}, 1)
isInitialLeader := consensus.IsLeader()
if isInitialLeader {
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Waiting for consensus start")
// send a signal to indicate it's ready to run consensus
// this signal is consumed by node object to create a new block and in turn trigger a new consensus on it
go func() {
<-startChannel
toStart <- struct{}{}
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal")
consensus.ReadySignal <- SyncProposal
}()
}
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started")
defer close(stoppedChan)
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
go func() {
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-stopChan:
return
case <-ticker.C:
consensus.tick()
}
}
}()
consensus.consensusTimeout[timeoutBootstrap].Start()
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Start bootstrap timeout (only once)")
// Set up next block due time.
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod)
consensus.start = false
for {
select {
case <-toStart:
consensus.start = true
case <-ticker.C:
if !consensus.start && isInitialLeader {
continue
}
for k, v := range consensus.consensusTimeout {
// stop timer in listening mode
if consensus.current.Mode() == Listening {
v.Stop()
continue
}
if consensus.current.Mode() == Syncing {
// never stop bootstrap timer here in syncing mode as it only starts once
// if it is stopped, bootstrap will be stopped and nodes
// can't start view change or join consensus
// the bootstrap timer will be stopped once consensus is reached or view change
// is succeeded
if k != timeoutBootstrap {
consensus.getLogger().Debug().
Str("k", k.String()).
Str("Mode", consensus.current.Mode().String()).
Msg("[ConsensusMainLoop] consensusTimeout stopped!!!")
v.Stop()
continue
}
}
if !v.CheckExpire() {
continue
}
if k != timeoutViewChange {
consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops Consensus Timeout!!!")
consensus.startViewChange()
break
} else {
consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!")
consensus.startViewChange()
break
}
}
case <-stopChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] stopChan")
return
}
}
}()
if consensus.dHelper != nil {
@ -374,6 +321,15 @@ func (consensus *Consensus) Start(
}
}
func (consensus *Consensus) StartChannel() {
consensus.isInitialLeader = consensus.IsLeader()
if consensus.isInitialLeader {
consensus.start = true
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal")
consensus.ReadySignal <- SyncProposal
}
}
func (consensus *Consensus) syncReadyChan() {
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan")
if consensus.BlockNum() < consensus.Blockchain().CurrentHeader().Number().Uint64()+1 {
@ -394,7 +350,6 @@ func (consensus *Consensus) syncReadyChan() {
consensus.consensusTimeout[timeoutConsensus].Start()
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc()
}
}
func (consensus *Consensus) syncNotReadyChan() {
@ -405,7 +360,48 @@ func (consensus *Consensus) syncNotReadyChan() {
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc()
}
// Close close the consensus. If current is in normal commit phase, wait until the commit
func (consensus *Consensus) tick() {
if !consensus.start && consensus.isInitialLeader {
return
}
for k, v := range consensus.consensusTimeout {
// stop timer in listening mode
if consensus.current.Mode() == Listening {
v.Stop()
continue
}
if consensus.current.Mode() == Syncing {
// never stop bootstrap timer here in syncing mode as it only starts once
// if it is stopped, bootstrap will be stopped and nodes
// can't start view change or join consensus
// the bootstrap timer will be stopped once consensus is reached or view change
// is succeeded
if k != timeoutBootstrap {
consensus.getLogger().Debug().
Str("k", k.String()).
Str("Mode", consensus.current.Mode().String()).
Msg("[ConsensusMainLoop] consensusTimeout stopped!!!")
v.Stop()
continue
}
}
if !v.CheckExpire() {
continue
}
if k != timeoutViewChange {
consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops Consensus Timeout!!!")
consensus.startViewChange()
break
} else {
consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!")
consensus.startViewChange()
break
}
}
}
// Close closes the consensus. If current is in normal commit phase, wait until the commit
// phase end.
func (consensus *Consensus) Close() error {
if consensus.dHelper != nil {

@ -126,9 +126,7 @@ type Node struct {
serviceManager *service.Manager
ContractDeployerCurrentNonce uint64 // The nonce of the deployer contract at current block
ContractAddresses []common.Address
// Channel to notify consensus service to really start consensus
startConsensus chan struct{}
HarmonyConfig *harmonyconfig.HarmonyConfig
HarmonyConfig *harmonyconfig.HarmonyConfig
// node configuration, including group ID, shard ID, etc
NodeConfig *nodeconfig.ConfigType
// Chain configuration.
@ -1130,7 +1128,6 @@ func New(
Msg("Genesis block hash")
// Setup initial state of syncing.
node.peerRegistrationRecord = map[string]*syncConfig{}
node.startConsensus = make(chan struct{})
// Broadcast double-signers reported by consensus
if node.Consensus != nil {
go func() {

@ -434,7 +434,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
return nil
}
// BootstrapConsensus is the a goroutine to check number of peers and start the consensus
// BootstrapConsensus is a goroutine to check number of peers and start the consensus
func (node *Node) BootstrapConsensus() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
@ -464,7 +464,7 @@ func (node *Node) BootstrapConsensus() error {
return ctx.Err()
case <-enoughMinPeers:
go func() {
node.startConsensus <- struct{}{}
node.Consensus.StartChannel()
}()
return nil
}

@ -14,7 +14,7 @@ func (node *Node) RegisterValidatorServices() {
// Register consensus service.
node.serviceManager.Register(
service.Consensus,
consensus.New(node.Consensus, node.startConsensus),
consensus.New(node.Consensus),
)
// Register new block service.
node.serviceManager.Register(

Loading…
Cancel
Save