add consensus service

pull/366/head
Minh Doan 6 years ago committed by Minh Doan
parent 743a3a1483
commit a12a72ba07
  1. 56
      consensus/consensus_leader.go
  2. 35
      node/service/consensus.go

@ -26,33 +26,41 @@ var (
)
// WaitForNewBlock waits for the next new block to run consensus on
func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block) {
func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}) {
utils.GetLogInstance().Debug("Waiting for block", "consensus", consensus)
for { // keep waiting for new blocks
newBlock := <-blockChannel
// TODO: think about potential race condition
c := consensus.RemovePeers(consensus.OfflinePeerList)
if c > 0 {
utils.GetLogInstance().Debug("WaitForNewBlock", "removed peers", c)
}
for !consensus.HasEnoughValidators() {
utils.GetLogInstance().Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(waitForEnoughValidators * time.Millisecond)
}
startTime = time.Now()
utils.GetLogInstance().Debug("STARTING CONSENSUS", "numTxs", len(newBlock.Transactions()), "consensus", consensus, "startTime", startTime, "publicKeys", len(consensus.PublicKeys))
for { // Wait until last consensus is finished
if consensus.state == Finished {
consensus.ResetState()
consensus.startConsensus(newBlock)
break
go func() {
defer close(stoppedChan)
for { // keep waiting for new blocks
select {
default:
newBlock := <-blockChannel
// TODO: think about potential race condition
c := consensus.RemovePeers(consensus.OfflinePeerList)
if c > 0 {
utils.GetLogInstance().Debug("WaitForNewBlock", "removed peers", c)
}
for !consensus.HasEnoughValidators() {
utils.GetLogInstance().Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(waitForEnoughValidators * time.Millisecond)
}
startTime = time.Now()
utils.GetLogInstance().Debug("STARTING CONSENSUS", "numTxs", len(newBlock.Transactions()), "consensus", consensus, "startTime", startTime, "publicKeys", len(consensus.PublicKeys))
for { // Wait until last consensus is finished
if consensus.state == Finished {
consensus.ResetState()
consensus.startConsensus(newBlock)
break
}
time.Sleep(500 * time.Millisecond)
}
case <-stopChan:
return
}
time.Sleep(500 * time.Millisecond)
}
}
}()
}
// ProcessMessageLeader dispatches consensus message for the leader.

@ -0,0 +1,35 @@
package service
import (
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
)
// ConsensusService is the consensus service.
type ConsensusService struct {
blockChannel chan *types.Block // The channel to receive new blocks from Node
consensus *consensus.Consensus
stopChan chan struct{}
stoppedChan chan struct{}
}
// NewConsensusService returns consensus service.
func NewConsensusService(blockChannel chan *types.Block, consensus *consensus.Consensus) *ConsensusService {
return &ConsensusService{blockChannel: blockChannel, consensus: consensus}
}
// StartService starts service.
func (cs *ConsensusService) StartService() {
cs.stopChan = make(chan struct{})
cs.stoppedChan = make(chan struct{})
cs.consensus.WaitForNewBlock(cs.blockChannel, cs.stopChan, cs.stoppedChan)
}
// StopService stops service.
func (cs *ConsensusService) StopService() {
utils.GetLogInstance().Info("Stopping consensus service.")
cs.stopChan <- struct{}{}
<-cs.stoppedChan
utils.GetLogInstance().Info("Consensus service stopped.")
}
Loading…
Cancel
Save