use a channel to signal consensus service to start

Signed-off-by: Leo Chen <leo@harmony.one>
pull/465/head
Leo Chen 6 years ago
parent 702e5e2f19
commit a27cff729c
  1. 7
      api/service/consensus/service.go
  2. 5
      consensus/consensus_leader.go
  3. 9
      node/node.go
  4. 1
      node/node_handler.go

@ -12,18 +12,19 @@ type Service struct {
consensus *consensus.Consensus
stopChan chan struct{}
stoppedChan chan struct{}
startChan chan struct{}
}
// New returns consensus service.
func New(blockChannel chan *types.Block, consensus *consensus.Consensus) *Service {
return &Service{blockChannel: blockChannel, consensus: consensus}
func New(blockChannel chan *types.Block, consensus *consensus.Consensus, startChan chan struct{}) *Service {
return &Service{blockChannel: blockChannel, consensus: consensus, startChan: startChan}
}
// StartService starts consensus service.
func (s *Service) StartService() {
s.stopChan = make(chan struct{})
s.stoppedChan = make(chan struct{})
s.consensus.WaitForNewBlock(s.blockChannel, s.stopChan, s.stoppedChan)
s.consensus.WaitForNewBlock(s.blockChannel, s.stopChan, s.stoppedChan, s.startChan)
}
// StopService stops consensus service.

@ -29,12 +29,15 @@ var (
)
// WaitForNewBlock waits for the next new block to run consensus on
func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}) {
func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}, startChannel chan struct{}) {
go func() {
defer close(stoppedChan)
for {
select {
default:
// got the signal to start consensus
_ = <-startChannel
utils.GetLogInstance().Debug("Waiting for block", "consensus", consensus)
// keep waiting for new blocks
newBlock := <-blockChannel

@ -197,6 +197,9 @@ type Node struct {
// Duplicated Ping Message Received
duplicatedPing map[string]bool
// Channel to notify consensus service to really start consensus
startConsensus chan struct{}
}
// Blockchain returns the blockchain from node
@ -318,6 +321,8 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node {
node.duplicatedPing = make(map[string]bool)
node.startConsensus = make(chan struct{})
return &node
}
@ -721,7 +726,7 @@ func (node *Node) setupForShardLeader() {
// Register explorer service.
node.serviceManager.RegisterService(service_manager.SupportExplorer, explorer.New(&node.SelfPeer))
// Register consensus service.
node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus))
node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus, node.startConsensus))
// Register new block service.
node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady))
// Register client support service.
@ -763,7 +768,7 @@ func (node *Node) setupForBeaconLeader() {
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, "0", chanPeer))
// Register consensus service.
node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus))
node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus, node.startConsensus))
// Register new block service.
node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady))
// Register client support service.

@ -396,6 +396,7 @@ func (node *Node) SendPongMessage() {
sentMessage = true
// stop sending ping message
node.serviceManager.TakeAction(&service.Action{Action: service.Stop, ServiceType: service.PeerDiscovery})
node.startConsensus <- struct{}{}
}
}
numPeers = numPeersNow

Loading…
Cancel
Save