Merge pull request #3405 from rlan35/main

Make the leader commit and start proposing new block after 67% committed (pipelining)
pull/3439/head
Rongjian Lan 4 years ago committed by GitHub
commit b0fb0a3848
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      api/service/blockproposal/service.go
  2. 2
      api/service/explorer/service.go
  3. 41
      api/service/syncing/syncing.go
  4. 2
      cmd/harmony/main.go
  5. 24
      consensus/consensus.go
  6. 12
      consensus/consensus_msg_sender.go
  7. 2
      consensus/consensus_service.go
  8. 2
      consensus/consensus_test.go
  9. 217
      consensus/consensus_v2.go
  10. 9
      consensus/leader.go
  11. 12
      consensus/quorum/one-node-staked-vote.go
  12. 33
      consensus/validator.go
  13. 2
      consensus/view_change.go
  14. 2
      crypto/bls/bls.go
  15. 4
      internal/chain/engine.go
  16. 8
      internal/chain/reward.go
  17. 1
      node/node_cross_shard.go
  18. 16
      node/node_handler.go
  19. 57
      node/node_newblock.go
  20. 4
      node/node_syncing.go
  21. 2
      node/service_setup.go
  22. 4
      node/worker/worker.go

@ -3,6 +3,7 @@ package blockproposal
import ( import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
) )
@ -10,14 +11,15 @@ import (
type Service struct { type Service struct {
stopChan chan struct{} stopChan chan struct{}
stoppedChan chan struct{} stoppedChan chan struct{}
readySignal chan struct{} readySignal chan consensus.ProposalType
commitSigsChan chan []byte
messageChan chan *msg_pb.Message messageChan chan *msg_pb.Message
waitForConsensusReady func(readySignal chan struct{}, stopChan chan struct{}, stoppedChan chan struct{}) waitForConsensusReady func(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{})
} }
// New returns a block proposal service. // New returns a block proposal service.
func New(readySignal chan struct{}, waitForConsensusReady func(readySignal chan struct{}, stopChan chan struct{}, stoppedChan chan struct{})) *Service { func New(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, waitForConsensusReady func(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{})) *Service {
return &Service{readySignal: readySignal, waitForConsensusReady: waitForConsensusReady} return &Service{readySignal: readySignal, commitSigsChan: commitSigsChan, waitForConsensusReady: waitForConsensusReady}
} }
// StartService starts block proposal service. // StartService starts block proposal service.
@ -35,7 +37,7 @@ func (s *Service) Init() {
// Run runs block proposal. // Run runs block proposal.
func (s *Service) Run(stopChan chan struct{}, stoppedChan chan struct{}) { func (s *Service) Run(stopChan chan struct{}, stoppedChan chan struct{}) {
s.waitForConsensusReady(s.readySignal, s.stopChan, s.stoppedChan) s.waitForConsensusReady(s.readySignal, s.commitSigsChan, s.stopChan, s.stoppedChan)
} }
// StopService stops block proposal service. // StopService stops block proposal service.

@ -176,7 +176,7 @@ func (s *Service) GetTotalSupply(w http.ResponseWriter, r *http.Request) {
// GetNodeSync returns status code 500 if node is not in sync // GetNodeSync returns status code 500 if node is not in sync
func (s *Service) GetNodeSync(w http.ResponseWriter, r *http.Request) { func (s *Service) GetNodeSync(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
sync := !s.stateSync.IsOutOfSync(s.blockchain) sync := !s.stateSync.IsOutOfSync(s.blockchain, false)
if !sync { if !sync {
w.WriteHeader(http.StatusTeapot) w.WriteHeader(http.StatusTeapot)
} }

@ -29,13 +29,11 @@ import (
// Constants for syncing. // Constants for syncing.
const ( const (
downloadBlocksRetryLimit = 5 // downloadBlocks service retry limit downloadBlocksRetryLimit = 5 // downloadBlocks service retry limit
TimesToFail = 5 // downloadBlocks service retry limit
RegistrationNumber = 3 RegistrationNumber = 3
SyncingPortDifference = 3000 SyncingPortDifference = 3000
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
SyncLoopBatchSize uint32 = 1000 // maximum size for one query of block hashes SyncLoopBatchSize uint32 = 1000 // maximum size for one query of block hashes
verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size
SyncLoopFrequency = 1 // unit in second
LastMileBlocksSize = 50 LastMileBlocksSize = 50
// after cutting off a number of connected peers, the result number of peers // after cutting off a number of connected peers, the result number of peers
@ -958,18 +956,36 @@ func (ss *StateSync) GetMaxPeerHeight() uint64 {
} }
// IsOutOfSync checks whether the node is out of sync from other peers // IsOutOfSync checks whether the node is out of sync from other peers
func (ss *StateSync) IsOutOfSync(bc *core.BlockChain) bool { func (ss *StateSync) IsOutOfSync(bc *core.BlockChain, doubleCheck bool) bool {
if ss.syncConfig == nil { if ss.syncConfig == nil {
return true // If syncConfig is not instantiated, return not in sync return true // If syncConfig is not instantiated, return not in sync
} }
otherHeight := ss.getMaxPeerHeight(false) otherHeight1 := ss.getMaxPeerHeight(false)
lastHeight := bc.CurrentBlock().NumberU64()
wasOutOfSync := lastHeight+inSyncThreshold < otherHeight1
if !doubleCheck {
utils.Logger().Info().
Uint64("OtherHeight", otherHeight1).
Uint64("lastHeight", lastHeight).
Msg("[SYNC] Checking sync status")
return wasOutOfSync
}
time.Sleep(1 * time.Second)
// double check the sync status after 1 second to confirm (avoid false alarm)
otherHeight2 := ss.getMaxPeerHeight(false)
currentHeight := bc.CurrentBlock().NumberU64() currentHeight := bc.CurrentBlock().NumberU64()
utils.Logger().Debug().
Uint64("OtherHeight", otherHeight). isOutOfSync := currentHeight+inSyncThreshold < otherHeight2
Uint64("MyHeight", currentHeight). utils.Logger().Info().
Bool("IsOutOfSync", currentHeight+inSyncThreshold < otherHeight). Uint64("OtherHeight1", otherHeight1).
Uint64("OtherHeight2", otherHeight2).
Uint64("lastHeight", lastHeight).
Uint64("currentHeight", currentHeight).
Msg("[SYNC] Checking sync status") Msg("[SYNC] Checking sync status")
return currentHeight+inSyncThreshold < otherHeight // Only confirm out of sync when the node has lower height and didn't move in heights for 2 consecutive checks
return wasOutOfSync && isOutOfSync && lastHeight == currentHeight
} }
// SyncLoop will keep syncing with peers until catches up // SyncLoop will keep syncing with peers until catches up
@ -977,10 +993,7 @@ func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, isBeac
if !isBeacon { if !isBeacon {
ss.RegisterNodeInfo() ss.RegisterNodeInfo()
} }
// remove SyncLoopFrequency for {
ticker := time.NewTicker(SyncLoopFrequency * time.Second)
defer ticker.Stop()
for range ticker.C {
otherHeight := ss.getMaxPeerHeight(isBeacon) otherHeight := ss.getMaxPeerHeight(isBeacon)
currentHeight := bc.CurrentBlock().NumberU64() currentHeight := bc.CurrentBlock().NumberU64()
if currentHeight >= otherHeight { if currentHeight >= otherHeight {
@ -1032,7 +1045,7 @@ func (ss *StateSync) addConsensusLastMile(bc *core.BlockChain, consensus *consen
return errors.Wrap(err, "failed to InsertChain") return errors.Wrap(err, "failed to InsertChain")
} }
} }
consensus.FBFTLog.PruneCacheBeforeBlock(bc.CurrentBlock().NumberU64() + 1) consensus.FBFTLog.PruneCacheBeforeBlock(bc.CurrentBlock().NumberU64())
return nil return nil
} }

@ -653,7 +653,7 @@ func setupConsensusAndNode(hc harmonyConfig, nodeConfig *nodeconfig.ConfigType)
// Assign closure functions to the consensus object // Assign closure functions to the consensus object
currentConsensus.SetBlockVerifier(currentNode.VerifyNewBlock) currentConsensus.SetBlockVerifier(currentNode.VerifyNewBlock)
currentConsensus.OnConsensusDone = currentNode.PostConsensusProcessing currentConsensus.PostConsensusJob = currentNode.PostConsensusProcessing
// update consensus information based on the blockchain // update consensus information based on the blockchain
currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation()) currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation())
currentConsensus.NextBlockDue = time.Now() currentConsensus.NextBlockDue = time.Now()

@ -25,7 +25,16 @@ const (
vdfAndSeedSize = 548 // size of VDF/Proof and Seed vdfAndSeedSize = 548 // size of VDF/Proof and Seed
) )
var errLeaderPriKeyNotFound = errors.New("getting leader private key from consensus public keys failed") var errLeaderPriKeyNotFound = errors.New("leader private key not found locally")
// ProposalType is to indicate the type of signal for new block proposal
type ProposalType byte
// Constant of the type of new block proposal
const (
SyncProposal ProposalType = iota
AsyncProposal
)
// BlockVerifierFunc is a function used to verify the block // BlockVerifierFunc is a function used to verify the block
type BlockVerifierFunc func(*types.Block) error type BlockVerifierFunc func(*types.Block) error
@ -79,11 +88,13 @@ type Consensus struct {
mutex sync.Mutex mutex sync.Mutex
// ViewChange struct // ViewChange struct
vc *viewChange vc *viewChange
// Signal channel for starting a new consensus process // Signal channel for proposing a new block and start new consensus
ReadySignal chan struct{} ReadySignal chan ProposalType
// The post-consensus processing func passed from Node object // Channel to send full commit signatures to finish new block proposal
CommitSigChannel chan []byte
// The post-consensus job func passed from Node object
// Called when consensus on a new block is done // Called when consensus on a new block is done
OnConsensusDone func(*types.Block) error PostConsensusJob func(*types.Block) error
// The verifier func passed from Node object // The verifier func passed from Node object
BlockVerifier BlockVerifierFunc BlockVerifier BlockVerifierFunc
// verified block to state sync broadcast // verified block to state sync broadcast
@ -206,7 +217,8 @@ func New(
consensus.syncNotReadyChan = make(chan struct{}) consensus.syncNotReadyChan = make(chan struct{})
consensus.SlashChan = make(chan slash.Record) consensus.SlashChan = make(chan slash.Record)
consensus.commitFinishChan = make(chan uint64) consensus.commitFinishChan = make(chan uint64)
consensus.ReadySignal = make(chan struct{}) consensus.ReadySignal = make(chan ProposalType)
consensus.CommitSigChannel = make(chan []byte)
// channel for receiving newly generated VDF // channel for receiving newly generated VDF
consensus.RndChannel = make(chan [vdfAndSeedSize]byte) consensus.RndChannel = make(chan [vdfAndSeedSize]byte)
consensus.IgnoreViewIDCheck = abool.NewBool(false) consensus.IgnoreViewIDCheck = abool.NewBool(false)

@ -68,6 +68,18 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa
return sender.host.SendMessageToGroups(groups, p2pMsg) return sender.host.SendMessageToGroups(groups, p2pMsg)
} }
// DelayedSendWithRetry is similar to SendWithRetry but without the initial message sending but only retries.
func (sender *MessageSender) DelayedSendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte) {
if sender.retryTimes != 0 {
msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0}
atomic.StoreUint32(&msgRetry.isActive, 1)
sender.messagesToRetry.Store(msgType, &msgRetry)
go func() {
sender.Retry(&msgRetry)
}()
}
}
// SendWithoutRetry sends message without retry logic. // SendWithoutRetry sends message without retry logic.
func (sender *MessageSender) SendWithoutRetry(groups []nodeconfig.GroupID, p2pMsg []byte) error { func (sender *MessageSender) SendWithoutRetry(groups []nodeconfig.GroupID, p2pMsg []byte) error {
return sender.host.SendMessageToGroups(groups, p2pMsg) return sender.host.SendMessageToGroups(groups, p2pMsg)

@ -457,7 +457,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.getLogger().Info(). consensus.getLogger().Info().
Str("myKey", myPubKeys.SerializeToHexStr()). Str("myKey", myPubKeys.SerializeToHexStr()).
Msg("[UpdateConsensusInformation] I am the New Leader") Msg("[UpdateConsensusInformation] I am the New Leader")
consensus.ReadySignal <- struct{}{} consensus.ReadySignal <- SyncProposal
}() }()
} }
return Normal return Normal

@ -72,7 +72,7 @@ func TestConsensusInitialization(t *testing.T) {
assert.IsType(t, make(chan uint64), consensus.commitFinishChan) assert.IsType(t, make(chan uint64), consensus.commitFinishChan)
assert.NotNil(t, consensus.commitFinishChan) assert.NotNil(t, consensus.commitFinishChan)
assert.IsType(t, make(chan struct{}), consensus.ReadySignal) assert.IsType(t, make(chan ProposalType), consensus.ReadySignal)
assert.NotNil(t, consensus.ReadySignal) assert.NotNil(t, consensus.ReadySignal)
assert.IsType(t, make(chan [vdfAndSeedSize]byte), consensus.RndChannel) assert.IsType(t, make(chan [vdfAndSeedSize]byte), consensus.RndChannel)

@ -7,6 +7,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/harmony-one/harmony/internal/utils"
"github.com/rs/zerolog" "github.com/rs/zerolog"
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
@ -27,6 +29,15 @@ var (
errVerifyMessageSignature = errors.New("verify message signature failed") errVerifyMessageSignature = errors.New("verify message signature failed")
) )
// timeout constant
const (
// CommitSigSenderTimeout is the timeout for sending the commit sig to finish block proposal
CommitSigSenderTimeout = 6 * time.Second
// CommitSigReceiverTimeout is the timeout for the receiving side of the commit sig
// if timeout, the receiver should instead ready directly from db for the commit sig
CommitSigReceiverTimeout = 4 * time.Second
)
// IsViewChangingMode return true if curernt mode is viewchanging // IsViewChangingMode return true if curernt mode is viewchanging
func (consensus *Consensus) IsViewChangingMode() bool { func (consensus *Consensus) IsViewChangingMode() bool {
return consensus.current.Mode() == ViewChanging return consensus.current.Mode() == ViewChanging
@ -100,28 +111,28 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb
return nil return nil
} }
func (consensus *Consensus) finalizeCommits() { func (consensus *Consensus) finalCommit() {
consensus.getLogger().Info(). consensus.getLogger().Info().
Int64("NumCommits", consensus.Decider.SignersCount(quorum.Commit)). Int64("NumCommits", consensus.Decider.SignersCount(quorum.Commit)).
Msg("[finalizeCommits] Finalizing Block") Msg("[finalCommit] Finalizing Consensus")
beforeCatchupNum := consensus.blockNum beforeCatchupNum := consensus.blockNum
leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey() leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey()
if err != nil { if err != nil {
consensus.getLogger().Error().Err(err).Msg("[FinalizeCommits] leader not found") consensus.getLogger().Error().Err(err).Msg("[finalCommit] leader not found")
return return
} }
// Construct committed message // Construct committed message
network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey}) network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey})
if err != nil { if err != nil {
consensus.getLogger().Warn().Err(err). consensus.getLogger().Warn().Err(err).
Msg("[FinalizeCommits] Unable to construct Committed message") Msg("[finalCommit] Unable to construct Committed message")
return return
} }
msgToSend, aggSig, FBFTMsg := msgToSend, FBFTMsg :=
network.Bytes, network.Bytes,
network.OptionalAggregateSignature,
network.FBFTMsg network.FBFTMsg
consensus.aggregatedCommitSig = aggSig // this may not needed commitSigAndBitmap := FBFTMsg.Payload
consensus.FBFTLog.AddMessage(FBFTMsg) consensus.FBFTLog.AddMessage(FBFTMsg)
// find correct block content // find correct block content
curBlockHash := consensus.blockHash curBlockHash := consensus.blockHash
@ -129,31 +140,55 @@ func (consensus *Consensus) finalizeCommits() {
if block == nil { if block == nil {
consensus.getLogger().Warn(). consensus.getLogger().Warn().
Str("blockHash", hex.EncodeToString(curBlockHash[:])). Str("blockHash", hex.EncodeToString(curBlockHash[:])).
Msg("[FinalizeCommits] Cannot find block by hash") Msg("[finalCommit] Cannot find block by hash")
return return
} }
consensus.tryCatchup() consensus.getLogger().Info().Hex("new", commitSigAndBitmap).Msg("[finalCommit] Overriding commit signatures!!")
if consensus.blockNum-beforeCatchupNum != 1 { consensus.Blockchain.WriteCommitSig(block.NumberU64(), commitSigAndBitmap)
consensus.getLogger().Warn().
block.SetCurrentCommitSig(commitSigAndBitmap)
err = consensus.commitBlock(block, FBFTMsg)
if err != nil || consensus.blockNum-beforeCatchupNum != 1 {
consensus.getLogger().Err(err).
Uint64("beforeCatchupBlockNum", beforeCatchupNum). Uint64("beforeCatchupBlockNum", beforeCatchupNum).
Msg("[FinalizeCommits] Leader cannot provide the correct block for committed message") Msg("[finalCommit] Leader failed to commit the confirmed block")
return return
} }
// if leader success finalize the block, send committed message to validators // if leader successfully finalizes the block, send committed message to validators
if err := consensus.msgSender.SendWithRetry( // Note: leader already sent 67% commit in preCommit. The 100% commit won't be sent immediately
block.NumberU64(), // to save network traffic. It will only be sent in retry if consensus doesn't move forward.
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ // Or if the leader is changed for next block, the 100% committed sig will be sent to the next leader immediately.
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), if !consensus.IsLeader() || block.IsLastBlockInEpoch() {
}, // send immediately
p2p.ConstructMessage(msgToSend)); err != nil { if err := consensus.msgSender.SendWithRetry(
consensus.getLogger().Warn().Err(err).Msg("[finalizeCommits] Cannot send committed message") block.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend)); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[finalCommit] Cannot send committed message")
} else {
consensus.getLogger().Info().
Hex("blockHash", curBlockHash[:]).
Uint64("blockNum", consensus.blockNum).
Msg("[finalCommit] Sent Committed Message")
}
consensus.consensusTimeout[timeoutConsensus].Start()
} else { } else {
// delayed send
consensus.msgSender.DelayedSendWithRetry(
block.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend))
consensus.getLogger().Info(). consensus.getLogger().Info().
Hex("blockHash", curBlockHash[:]). Hex("blockHash", curBlockHash[:]).
Uint64("blockNum", consensus.blockNum). Uint64("blockNum", consensus.blockNum).
Msg("[finalizeCommits] Sent Committed Message") Msg("[finalCommit] Queued Committed Message")
} }
// Dump new block into level db // Dump new block into level db
@ -164,11 +199,10 @@ func (consensus *Consensus) finalizeCommits() {
if consensus.consensusTimeout[timeoutBootstrap].IsActive() { if consensus.consensusTimeout[timeoutBootstrap].IsActive() {
consensus.consensusTimeout[timeoutBootstrap].Stop() consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.getLogger().Info().Msg("[finalizeCommits] Start consensus timer; stop bootstrap timer only once") consensus.getLogger().Info().Msg("[finalCommit] Start consensus timer; stop bootstrap timer only once")
} else { } else {
consensus.getLogger().Info().Msg("[finalizeCommits] Start consensus timer") consensus.getLogger().Info().Msg("[finalCommit] Start consensus timer")
} }
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info(). consensus.getLogger().Info().
Uint64("blockNum", block.NumberU64()). Uint64("blockNum", block.NumberU64()).
@ -179,15 +213,26 @@ func (consensus *Consensus) finalizeCommits() {
Int("numStakingTxns", len(block.StakingTransactions())). Int("numStakingTxns", len(block.StakingTransactions())).
Msg("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!") Msg("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!")
// Sleep to wait for the full block time // If still the leader, send commit sig/bitmap to finish the new block proposal,
consensus.getLogger().Info().Msg("[finalizeCommits] Waiting for Block Time") // else, the block proposal will timeout by itself.
<-time.After(time.Until(consensus.NextBlockDue)) if consensus.IsLeader() {
if block.IsLastBlockInEpoch() {
// Send signal to Node to propose the new block for consensus // No pipelining
consensus.ReadySignal <- struct{}{} go func() {
consensus.getLogger().Info().Msg("[finalCommit] sending block proposal signal")
// Update time due for next block consensus.ReadySignal <- SyncProposal
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) }()
} else {
// pipelining
go func() {
select {
case consensus.CommitSigChannel <- commitSigAndBitmap:
case <-time.After(CommitSigSenderTimeout):
utils.Logger().Error().Err(err).Msg("[finalCommit] channel not received after 6s for commitSigAndBitmap")
}
}()
}
}
} }
// BlockCommitSigs returns the byte array of aggregated // BlockCommitSigs returns the byte array of aggregated
@ -231,7 +276,7 @@ func (consensus *Consensus) Start(
<-startChannel <-startChannel
toStart <- struct{}{} toStart <- struct{}{}
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal") consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal")
consensus.ReadySignal <- struct{}{} consensus.ReadySignal <- SyncProposal
}() }()
} }
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started") consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started")
@ -274,11 +319,15 @@ func (consensus *Consensus) Start(
} }
case <-consensus.syncReadyChan: case <-consensus.syncReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan")
consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1) consensus.mutex.Lock()
consensus.SetViewIDs(consensus.Blockchain.CurrentHeader().ViewID().Uint64() + 1) if consensus.blockNum < consensus.Blockchain.CurrentHeader().Number().Uint64()+1 {
mode := consensus.UpdateConsensusInformation() consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1)
consensus.current.SetMode(mode) consensus.SetViewIDs(consensus.Blockchain.CurrentHeader().ViewID().Uint64() + 1)
consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC") mode := consensus.UpdateConsensusInformation()
consensus.current.SetMode(mode)
consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC")
}
consensus.mutex.Unlock()
case <-consensus.syncNotReadyChan: case <-consensus.syncNotReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan")
@ -291,6 +340,12 @@ func (consensus *Consensus) Start(
Uint64("MsgBlockNum", newBlock.NumberU64()). Uint64("MsgBlockNum", newBlock.NumberU64()).
Msg("[ConsensusMainLoop] Received Proposed New Block!") Msg("[ConsensusMainLoop] Received Proposed New Block!")
// Sleep to wait for the full block time
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time")
<-time.After(time.Until(consensus.NextBlockDue))
// Update time due for next block
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod)
//VRF/VDF is only generated in the beacon chain //VRF/VDF is only generated in the beacon chain
if consensus.NeedsRandomNumberGeneration(newBlock.Header().Epoch()) { if consensus.NeedsRandomNumberGeneration(newBlock.Header().Epoch()) {
// generate VRF if the current block has a new leader // generate VRF if the current block has a new leader
@ -376,14 +431,14 @@ func (consensus *Consensus) Start(
consensus.announce(newBlock) consensus.announce(newBlock)
case viewID := <-consensus.commitFinishChan: case viewID := <-consensus.commitFinishChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] commitFinishChan") consensus.getLogger().Info().Uint64("viewID", viewID).Msg("[ConsensusMainLoop] commitFinishChan")
// Only Leader execute this condition // Only Leader execute this condition
func() { func() {
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
if viewID == consensus.GetCurBlockViewID() { if viewID == consensus.GetCurBlockViewID() {
consensus.finalizeCommits() consensus.finalCommit()
} }
}() }()
@ -464,6 +519,64 @@ func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Bl
return blocks, msgs, nil return blocks, msgs, nil
} }
// preCommitAndPropose commit the current block with 67% commit signatures and start
// proposing new block which will wait on the full commit signatures to finish
func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
if blk == nil {
return errors.New("block to pre-commit is nil")
}
leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey()
if err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] leader not found")
return err
}
// Construct committed message
consensus.mutex.Lock()
network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey})
consensus.mutex.Unlock()
if err != nil {
consensus.getLogger().Warn().Err(err).
Msg("[preCommitAndPropose] Unable to construct Committed message")
return err
}
msgToSend, FBFTMsg :=
network.Bytes,
network.FBFTMsg
bareMinimumCommit := FBFTMsg.Payload
consensus.FBFTLog.AddMessage(FBFTMsg)
blk.SetCurrentCommitSig(bareMinimumCommit)
if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, true); err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain")
return err
}
// if leader successfully finalizes the block, send committed message to validators
if err := consensus.msgSender.SendWithRetry(
blk.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend)); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message")
} else {
consensus.getLogger().Info().
Str("blockHash", blk.Hash().Hex()).
Uint64("blockNum", consensus.blockNum).
Msg("[preCommitAndPropose] Sent Committed Message")
}
consensus.consensusTimeout[timeoutConsensus].Start()
// Send signal to Node to propose the new block for consensus
consensus.getLogger().Info().Msg("[preCommitAndPropose] sending block proposal signal")
consensus.ReadySignal <- AsyncProposal
return nil
}
// tryCatchup add the last mile block in PBFT log memory cache to blockchain. // tryCatchup add the last mile block in PBFT log memory cache to blockchain.
func (consensus *Consensus) tryCatchup() error { func (consensus *Consensus) tryCatchup() error {
// TODO: change this to a more systematic symbol // TODO: change this to a more systematic symbol
@ -509,15 +622,30 @@ func (consensus *Consensus) tryCatchup() error {
} }
func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error { func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error {
if err := consensus.OnConsensusDone(blk); err != nil { if consensus.Blockchain.CurrentBlock().NumberU64() < blk.NumberU64() {
return err if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, true); err != nil {
consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain")
return err
}
} }
if !committedMsg.HasSingleSender() { if !committedMsg.HasSingleSender() {
consensus.getLogger().Error().Msg("[TryCatchup] Leader message can not have multiple sender keys") consensus.getLogger().Error().Msg("[TryCatchup] Leader message can not have multiple sender keys")
return errIncorrectSender return errIncorrectSender
} }
atomic.AddUint64(&consensus.blockNum, 1) consensus.PostConsensusJob(blk)
consensus.SetupForNewConsensus(blk, committedMsg)
consensus.FinishFinalityCount()
utils.Logger().Info().Uint64("blockNum", blk.NumberU64()).
Str("hash", blk.Header().Hash().Hex()).
Msg("Added New Block to Blockchain!!!")
return nil
}
// SetupForNewConsensus sets the state for new consensus
func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) {
atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1)
consensus.SetCurBlockViewID(committedMsg.ViewID + 1) consensus.SetCurBlockViewID(committedMsg.ViewID + 1)
consensus.LeaderPubKey = committedMsg.SenderPubkeys[0] consensus.LeaderPubKey = committedMsg.SenderPubkeys[0]
// Update consensus keys at last so the change of leader status doesn't mess up normal flow // Update consensus keys at last so the change of leader status doesn't mess up normal flow
@ -525,7 +653,6 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess
consensus.SetMode(consensus.UpdateConsensusInformation()) consensus.SetMode(consensus.UpdateConsensusInformation())
} }
consensus.ResetState() consensus.ResetState()
return nil
} }
func (consensus *Consensus) postCatchup(initBN uint64) { func (consensus *Consensus) postCatchup(initBN uint64) {

@ -48,7 +48,6 @@ func (consensus *Consensus) announce(block *types.Block) {
} }
msgToSend, FPBTMsg := networkMessage.Bytes, networkMessage.FBFTMsg msgToSend, FPBTMsg := networkMessage.Bytes, networkMessage.FBFTMsg
// TODO(chao): review FPBT log data structure
consensus.FBFTLog.AddMessage(FPBTMsg) consensus.FBFTLog.AddMessage(FPBTMsg)
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Str("MsgBlockHash", FPBTMsg.BlockHash.Hex()). Str("MsgBlockHash", FPBTMsg.BlockHash.Hex()).
@ -301,6 +300,14 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
if !quorumWasMet && quorumIsMet { if !quorumWasMet && quorumIsMet {
logger.Info().Msg("[OnCommit] 2/3 Enough commits received") logger.Info().Msg("[OnCommit] 2/3 Enough commits received")
if !blockObj.IsLastBlockInEpoch() {
// only do early commit if it's not epoch block to avoid problems
go func() {
// TODO: make it synchronized with commitFinishChan
consensus.preCommitAndPropose(blockObj)
}()
}
consensus.getLogger().Info().Msg("[OnCommit] Starting Grace Period") consensus.getLogger().Info().Msg("[OnCommit] Starting Grace Period")
go func(viewID uint64) { go func(viewID uint64) {
time.Sleep(2500 * time.Millisecond) time.Sleep(2500 * time.Millisecond)

@ -112,12 +112,12 @@ func (v *stakedVoteWeight) AddNewVote(
t := v.QuorumThreshold() t := v.QuorumThreshold()
msg := "Attempt to reach quorum" msg := "[AddNewVote] New Vote Added!"
if !tallyQuorum.quorumAchieved { if !tallyQuorum.quorumAchieved {
tallyQuorum.quorumAchieved = tallyQuorum.tally.GT(t) tallyQuorum.quorumAchieved = tallyQuorum.tally.GT(t)
if tallyQuorum.quorumAchieved { if tallyQuorum.quorumAchieved {
msg = "Quorum Achieved!" msg = "[AddNewVote] Quorum Achieved!"
} }
} }
utils.Logger().Info(). utils.Logger().Info().
@ -173,9 +173,11 @@ func (v *stakedVoteWeight) computeTotalPowerByMask(mask *bls_cosi.Mask) *numeric
for key, i := range mask.PublicsIndex { for key, i := range mask.PublicsIndex {
if enabled, err := mask.IndexEnabled(i); err == nil && enabled { if enabled, err := mask.IndexEnabled(i); err == nil && enabled {
currentTotal = currentTotal.Add( if voter, ok := v.roster.Voters[key]; ok {
v.roster.Voters[key].OverallPercent, currentTotal = currentTotal.Add(
) voter.OverallPercent,
)
}
} }
} }
return &currentTotal return &currentTotal

@ -1,7 +1,6 @@
package consensus package consensus
import ( import (
"bytes"
"encoding/hex" "encoding/hex"
"time" "time"
@ -169,12 +168,6 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
Hex("blockHash", recvMsg.BlockHash[:]). Hex("blockHash", recvMsg.BlockHash[:]).
Msg("[OnPrepared] Prepared message and block added") Msg("[OnPrepared] Prepared message and block added")
// tryCatchup is also run in onCommitted(), so need to lock with commitMutex.
if consensus.current.Mode() != Normal {
// don't sign the block that is not verified
consensus.getLogger().Info().Msg("[OnPrepared] Not in normal mode, Exiting!!")
return
}
if consensus.BlockVerifier == nil { if consensus.BlockVerifier == nil {
consensus.getLogger().Debug().Msg("[onPrepared] consensus received message before init. Ignoring") consensus.getLogger().Debug().Msg("[onPrepared] consensus received message before init. Ignoring")
return return
@ -212,9 +205,13 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
consensus.prepareBitmap = mask consensus.prepareBitmap = mask
// Optimistically add blockhash field of prepare message // Optimistically add blockhash field of prepare message
emptyHash := [32]byte{} copy(consensus.blockHash[:], blockHash[:])
if bytes.Equal(consensus.blockHash[:], emptyHash[:]) {
copy(consensus.blockHash[:], blockHash[:]) // tryCatchup is also run in onCommitted(), so need to lock with commitMutex.
if consensus.current.Mode() != Normal {
// don't sign the block that is not verified
consensus.getLogger().Info().Msg("[OnPrepared] Not in normal mode, Exiting!!")
return
} }
consensus.sendCommitMessages(&blockObj) consensus.sendCommitMessages(&blockObj)
@ -232,10 +229,14 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
Uint64("MsgViewID", recvMsg.ViewID). Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnCommitted] Received committed message") Msg("[OnCommitted] Received committed message")
// NOTE let it handle its own logs // Ok to receive committed from last block since it could have more signatures
if !consensus.isRightBlockNumCheck(recvMsg) { if recvMsg.BlockNum < consensus.blockNum-1 {
consensus.getLogger().Debug().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("Wrong BlockNum Received, ignoring!")
return return
} }
if recvMsg.BlockNum > consensus.blockNum { if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Info().Msg("[OnCommitted] low consensus block number. Spin up state sync") consensus.getLogger().Info().Msg("[OnCommitted] low consensus block number. Spin up state sync")
consensus.spinUpStateSync() consensus.spinUpStateSync()
@ -247,10 +248,13 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
return return
} }
if !consensus.Decider.IsQuorumAchievedByMask(mask) { if !consensus.Decider.IsQuorumAchievedByMask(mask) {
consensus.getLogger().Warn().Msgf("[OnCommitted] Quorum Not achieved.") consensus.getLogger().Warn().Hex("sigbitmap", recvMsg.Payload).Msgf("[OnCommitted] Quorum Not achieved.")
return return
} }
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
// Must have the corresponding block to verify committed message. // Must have the corresponding block to verify committed message.
blockObj := consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash) blockObj := consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash)
if blockObj == nil { if blockObj == nil {
@ -272,9 +276,6 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.FBFTLog.AddMessage(recvMsg) consensus.FBFTLog.AddMessage(recvMsg)
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.aggregatedCommitSig = aggSig consensus.aggregatedCommitSig = aggSig
consensus.commitBitmap = mask consensus.commitBitmap = mask

@ -396,7 +396,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
} }
go func() { go func() {
consensus.ReadySignal <- struct{}{} consensus.ReadySignal <- SyncProposal
}() }()
return return
} }

@ -99,7 +99,7 @@ func (pk *SerializedPublicKey) FromLibBLSPublicKey(key *bls.PublicKey) error {
// SeparateSigAndMask parse the commig signature data into signature and bitmap. // SeparateSigAndMask parse the commig signature data into signature and bitmap.
func SeparateSigAndMask(commitSigs []byte) ([]byte, []byte, error) { func SeparateSigAndMask(commitSigs []byte) ([]byte, []byte, error) {
if len(commitSigs) < BLSSignatureSizeInBytes { if len(commitSigs) < BLSSignatureSizeInBytes {
return nil, nil, errors.New("no mask data found in commit sigs") return nil, nil, errors.Errorf("no mask data found in commit sigs: %x", commitSigs)
} }
//#### Read payload data from committed msg //#### Read payload data from committed msg
aggSig := make([]byte, BLSSignatureSizeInBytes) aggSig := make([]byte, BLSSignatureSizeInBytes)

@ -195,8 +195,8 @@ func (e *engineImpl) VerifySeal(chain engine.ChainReader, header *block.Header)
lastCommitPayload := signature.ConstructCommitPayload(chain, lastCommitPayload := signature.ConstructCommitPayload(chain,
parentHeader.Epoch(), parentHeader.Hash(), parentHeader.Number().Uint64(), parentHeader.ViewID().Uint64()) parentHeader.Epoch(), parentHeader.Hash(), parentHeader.Number().Uint64(), parentHeader.ViewID().Uint64())
if !aggSig.VerifyHash(mask.AggregatePublic, lastCommitPayload) { if !aggSig.VerifyHash(mask.AggregatePublic, lastCommitPayload) {
const msg = "[VerifySeal] Unable to verify aggregated signature from last block" const msg = "[VerifySeal] Unable to verify aggregated signature from last block: %x"
return errors.New(msg) return errors.Errorf(msg, payload)
} }
return nil return nil
} }

@ -28,6 +28,12 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// timeout constant
const (
// AsyncBlockProposalTimeout is the timeout which will abort the async block proposal.
AsyncBlockProposalTimeout = 5 * time.Second
)
func ballotResultBeaconchain( func ballotResultBeaconchain(
bc engine.ChainReader, header *block.Header, bc engine.ChainReader, header *block.Header,
) (*big.Int, shard.SlotList, shard.SlotList, shard.SlotList, error) { ) (*big.Int, shard.SlotList, shard.SlotList, shard.SlotList, error) {
@ -473,7 +479,7 @@ func waitForCommitSigs(sigsReady chan bool) error {
return errors.New("Failed to get commit sigs") return errors.New("Failed to get commit sigs")
} }
utils.Logger().Info().Msg("Commit sigs are ready") utils.Logger().Info().Msg("Commit sigs are ready")
case <-time.After(5 * time.Second): case <-time.After(AsyncBlockProposalTimeout):
return errors.New("Timeout waiting for commit sigs for reward calculation") return errors.New("Timeout waiting for commit sigs for reward calculation")
} }
return nil return nil

@ -20,6 +20,7 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block) {
//#### Read payload data from committed msg //#### Read payload data from committed msg
if len(commitSigAndBitmap) <= 96 { if len(commitSigAndBitmap) <= 96 {
utils.Logger().Debug().Int("commitSigAndBitmapLen", len(commitSigAndBitmap)).Msg("[BroadcastCXReceipts] commitSigAndBitmap Not Enough Length") utils.Logger().Debug().Int("commitSigAndBitmapLen", len(commitSigAndBitmap)).Msg("[BroadcastCXReceipts] commitSigAndBitmap Not Enough Length")
return
} }
commitSig := make([]byte, 96) commitSig := make([]byte, 96)
commitBitmap := make([]byte, len(commitSigAndBitmap)-96) commitBitmap := make([]byte, len(commitSigAndBitmap)-96)

@ -353,16 +353,6 @@ func (node *Node) numSignaturesIncludedInBlock(block *types.Block) uint32 {
// 2. [leader] send new block to the client // 2. [leader] send new block to the client
// 3. [leader] send cross shard tx receipts to destination shard // 3. [leader] send cross shard tx receipts to destination shard
func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
if _, err := node.Blockchain().InsertChain([]*types.Block{newBlock}, true); err != nil {
return err
}
utils.Logger().Info().
Uint64("blockNum", newBlock.NumberU64()).
Str("hash", newBlock.Header().Hash().Hex()).
Msg("Added New Block to Blockchain!!!")
node.Consensus.FinishFinalityCount()
if node.Consensus.IsLeader() { if node.Consensus.IsLeader() {
if node.NodeConfig.ShardID == shard.BeaconChainShardID { if node.NodeConfig.ShardID == shard.BeaconChainShardID {
node.BroadcastNewBlock(newBlock) node.BroadcastNewBlock(newBlock)
@ -400,11 +390,13 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
for _, addr := range node.GetAddresses(newBlock.Epoch()) { for _, addr := range node.GetAddresses(newBlock.Epoch()) {
wrapper, err := node.Beaconchain().ReadValidatorInformation(addr) wrapper, err := node.Beaconchain().ReadValidatorInformation(addr)
if err != nil { if err != nil {
return err utils.Logger().Err(err).Str("addr", addr.Hex()).Msg("failed reaching validator info")
return nil
} }
snapshot, err := node.Beaconchain().ReadValidatorSnapshot(addr) snapshot, err := node.Beaconchain().ReadValidatorSnapshot(addr)
if err != nil { if err != nil {
return err utils.Logger().Err(err).Str("addr", addr.Hex()).Msg("failed reaching validator snapshot")
return nil
} }
computed := availability.ComputeCurrentSigning( computed := availability.ComputeCurrentSigning(
snapshot.Validator, wrapper, snapshot.Validator, wrapper,

@ -6,6 +6,10 @@ import (
"strings" "strings"
"time" "time"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/crypto/bls"
staking "github.com/harmony-one/harmony/staking/types" staking "github.com/harmony-one/harmony/staking/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -23,14 +27,13 @@ const (
// WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus. // WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus.
// only leader will receive the ready signal // only leader will receive the ready signal
func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan chan struct{}, stoppedChan chan struct{}) { func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{}) {
go func() { go func() {
// Setup stoppedChan // Setup stoppedChan
defer close(stoppedChan) defer close(stoppedChan)
utils.Logger().Debug(). utils.Logger().Debug().
Msg("Waiting for Consensus ready") Msg("Waiting for Consensus ready")
// TODO: make local net start faster
time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only) time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only)
for { for {
@ -40,7 +43,8 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan ch
utils.Logger().Debug(). utils.Logger().Debug().
Msg("Consensus new block proposal: STOPPED!") Msg("Consensus new block proposal: STOPPED!")
return return
case <-readySignal: case proposalType := <-readySignal:
retryCount := 0
for node.Consensus != nil && node.Consensus.IsLeader() { for node.Consensus != nil && node.Consensus.IsLeader() {
time.Sleep(SleepPeriod) time.Sleep(SleepPeriod)
utils.Logger().Info(). utils.Logger().Info().
@ -48,20 +52,36 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan ch
Msg("PROPOSING NEW BLOCK ------------------------------------------------") Msg("PROPOSING NEW BLOCK ------------------------------------------------")
// Prepare last commit signatures // Prepare last commit signatures
commitSigs := make(chan []byte) newCommitSigsChan := make(chan []byte)
sigs, err := node.Consensus.BlockCommitSigs(node.Blockchain().CurrentBlock().NumberU64())
node.Consensus.StartFinalityCount()
if err != nil {
utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Cannot get commit signatures from last block")
break
}
// Currently the block proposal is not triggered asynchronously yet with last consensus.
// TODO: trigger block proposal when 66% commit, and feed and final commit sigs here.
go func() { go func() {
commitSigs <- sigs waitTime := 0 * time.Second
if proposalType == consensus.AsyncProposal {
waitTime = consensus.CommitSigReceiverTimeout
}
select {
case <-time.After(waitTime):
if waitTime == 0 {
utils.Logger().Info().Msg("[ProposeNewBlock] Sync block proposal, reading commit sigs directly from DB")
} else {
utils.Logger().Info().Msg("[ProposeNewBlock] Timeout waiting for commit sigs, reading directly from DB")
}
sigs, err := node.Consensus.BlockCommitSigs(node.Blockchain().CurrentBlock().NumberU64())
if err != nil {
utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Cannot get commit signatures from last block")
} else {
newCommitSigsChan <- sigs
}
case commitSigs := <-commitSigsChan:
utils.Logger().Info().Msg("[ProposeNewBlock] received commit sigs asynchronously")
if len(commitSigs) > bls.BLSSignatureSizeInBytes {
newCommitSigsChan <- commitSigs
}
}
}() }()
newBlock, err := node.ProposeNewBlock(commitSigs) node.Consensus.StartFinalityCount()
newBlock, err := node.ProposeNewBlock(newCommitSigsChan)
if err == nil { if err == nil {
utils.Logger().Info(). utils.Logger().Info().
@ -77,7 +97,14 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan ch
node.BlockChannel <- newBlock node.BlockChannel <- newBlock
break break
} else { } else {
utils.Logger().Err(err).Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!") retryCount++
utils.Logger().Err(err).Int("retryCount", retryCount).
Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!")
if retryCount > 3 {
// break to avoid repeated failures
break
}
continue
} }
} }
} }

@ -260,7 +260,7 @@ func (node *Node) doSync(bc *core.BlockChain, worker *worker.Worker, willJoinCon
utils.Logger().Debug().Int("len", node.stateSync.GetActivePeerNumber()).Msg("[SYNC] Get Active Peers") utils.Logger().Debug().Int("len", node.stateSync.GetActivePeerNumber()).Msg("[SYNC] Get Active Peers")
} }
// TODO: treat fake maximum height // TODO: treat fake maximum height
if node.stateSync.IsOutOfSync(bc) { if node.stateSync.IsOutOfSync(bc, true) {
node.IsInSync.UnSet() node.IsInSync.UnSet()
if willJoinConsensus { if willJoinConsensus {
node.Consensus.BlocksNotSynchronized() node.Consensus.BlocksNotSynchronized()
@ -542,5 +542,5 @@ func (node *Node) GetMaxPeerHeight() uint64 {
// IsOutOfSync ... // IsOutOfSync ...
func (node *Node) IsOutOfSync(bc *core.BlockChain) bool { func (node *Node) IsOutOfSync(bc *core.BlockChain) bool {
return node.stateSync.IsOutOfSync(bc) return node.stateSync.IsOutOfSync(bc, false)
} }

@ -30,7 +30,7 @@ func (node *Node) setupForValidator() {
// Register new block service. // Register new block service.
node.serviceManager.RegisterService( node.serviceManager.RegisterService(
service.BlockProposal, service.BlockProposal,
blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReadyV2), blockproposal.New(node.Consensus.ReadySignal, node.Consensus.CommitSigChannel, node.WaitForConsensusReadyV2),
) )
} }

@ -7,6 +7,8 @@ import (
"sort" "sort"
"time" "time"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/crypto/hash" "github.com/harmony-one/harmony/crypto/hash"
@ -504,7 +506,7 @@ func (w *Worker) FinalizeNewBlock(
copyHeader.SetLastCommitBitmap(signers) copyHeader.SetLastCommitBitmap(signers)
} }
sigsReady <- true sigsReady <- true
case <-time.After(5 * time.Second): case <-time.After(consensus.CommitSigReceiverTimeout):
// Exit goroutine // Exit goroutine
utils.Logger().Warn().Msg("Timeout waiting for commit sigs") utils.Logger().Warn().Msg("Timeout waiting for commit sigs")
} }

Loading…
Cancel
Save