diff --git a/api/service/blockproposal/service.go b/api/service/blockproposal/service.go index 02eb97efd..cfb4efa04 100644 --- a/api/service/blockproposal/service.go +++ b/api/service/blockproposal/service.go @@ -3,6 +3,7 @@ package blockproposal import ( "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/internal/utils" ) @@ -10,14 +11,15 @@ import ( type Service struct { stopChan chan struct{} stoppedChan chan struct{} - readySignal chan struct{} + readySignal chan consensus.ProposalType + commitSigsChan chan []byte messageChan chan *msg_pb.Message - waitForConsensusReady func(readySignal chan struct{}, stopChan chan struct{}, stoppedChan chan struct{}) + waitForConsensusReady func(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{}) } // New returns a block proposal service. -func New(readySignal chan struct{}, waitForConsensusReady func(readySignal chan struct{}, stopChan chan struct{}, stoppedChan chan struct{})) *Service { - return &Service{readySignal: readySignal, waitForConsensusReady: waitForConsensusReady} +func New(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, waitForConsensusReady func(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{})) *Service { + return &Service{readySignal: readySignal, commitSigsChan: commitSigsChan, waitForConsensusReady: waitForConsensusReady} } // StartService starts block proposal service. @@ -35,7 +37,7 @@ func (s *Service) Init() { // Run runs block proposal. func (s *Service) Run(stopChan chan struct{}, stoppedChan chan struct{}) { - s.waitForConsensusReady(s.readySignal, s.stopChan, s.stoppedChan) + s.waitForConsensusReady(s.readySignal, s.commitSigsChan, s.stopChan, s.stoppedChan) } // StopService stops block proposal service. diff --git a/api/service/explorer/service.go b/api/service/explorer/service.go index bed0e2272..112d4dda7 100644 --- a/api/service/explorer/service.go +++ b/api/service/explorer/service.go @@ -176,7 +176,7 @@ func (s *Service) GetTotalSupply(w http.ResponseWriter, r *http.Request) { // GetNodeSync returns status code 500 if node is not in sync func (s *Service) GetNodeSync(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - sync := !s.stateSync.IsOutOfSync(s.blockchain) + sync := !s.stateSync.IsOutOfSync(s.blockchain, false) if !sync { w.WriteHeader(http.StatusTeapot) } diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index 740199f9e..08cdf38a9 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -29,13 +29,11 @@ import ( // Constants for syncing. const ( downloadBlocksRetryLimit = 5 // downloadBlocks service retry limit - TimesToFail = 5 // downloadBlocks service retry limit RegistrationNumber = 3 SyncingPortDifference = 3000 inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus SyncLoopBatchSize uint32 = 1000 // maximum size for one query of block hashes verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size - SyncLoopFrequency = 1 // unit in second LastMileBlocksSize = 50 // after cutting off a number of connected peers, the result number of peers @@ -958,18 +956,36 @@ func (ss *StateSync) GetMaxPeerHeight() uint64 { } // IsOutOfSync checks whether the node is out of sync from other peers -func (ss *StateSync) IsOutOfSync(bc *core.BlockChain) bool { +func (ss *StateSync) IsOutOfSync(bc *core.BlockChain, doubleCheck bool) bool { if ss.syncConfig == nil { return true // If syncConfig is not instantiated, return not in sync } - otherHeight := ss.getMaxPeerHeight(false) + otherHeight1 := ss.getMaxPeerHeight(false) + lastHeight := bc.CurrentBlock().NumberU64() + wasOutOfSync := lastHeight+inSyncThreshold < otherHeight1 + + if !doubleCheck { + utils.Logger().Info(). + Uint64("OtherHeight", otherHeight1). + Uint64("lastHeight", lastHeight). + Msg("[SYNC] Checking sync status") + return wasOutOfSync + } + time.Sleep(1 * time.Second) + // double check the sync status after 1 second to confirm (avoid false alarm) + + otherHeight2 := ss.getMaxPeerHeight(false) currentHeight := bc.CurrentBlock().NumberU64() - utils.Logger().Debug(). - Uint64("OtherHeight", otherHeight). - Uint64("MyHeight", currentHeight). - Bool("IsOutOfSync", currentHeight+inSyncThreshold < otherHeight). + + isOutOfSync := currentHeight+inSyncThreshold < otherHeight2 + utils.Logger().Info(). + Uint64("OtherHeight1", otherHeight1). + Uint64("OtherHeight2", otherHeight2). + Uint64("lastHeight", lastHeight). + Uint64("currentHeight", currentHeight). Msg("[SYNC] Checking sync status") - return currentHeight+inSyncThreshold < otherHeight + // Only confirm out of sync when the node has lower height and didn't move in heights for 2 consecutive checks + return wasOutOfSync && isOutOfSync && lastHeight == currentHeight } // SyncLoop will keep syncing with peers until catches up @@ -977,10 +993,7 @@ func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, isBeac if !isBeacon { ss.RegisterNodeInfo() } - // remove SyncLoopFrequency - ticker := time.NewTicker(SyncLoopFrequency * time.Second) - defer ticker.Stop() - for range ticker.C { + for { otherHeight := ss.getMaxPeerHeight(isBeacon) currentHeight := bc.CurrentBlock().NumberU64() if currentHeight >= otherHeight { @@ -1032,7 +1045,7 @@ func (ss *StateSync) addConsensusLastMile(bc *core.BlockChain, consensus *consen return errors.Wrap(err, "failed to InsertChain") } } - consensus.FBFTLog.PruneCacheBeforeBlock(bc.CurrentBlock().NumberU64() + 1) + consensus.FBFTLog.PruneCacheBeforeBlock(bc.CurrentBlock().NumberU64()) return nil } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 07971a601..0c3e69c90 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -653,7 +653,7 @@ func setupConsensusAndNode(hc harmonyConfig, nodeConfig *nodeconfig.ConfigType) // Assign closure functions to the consensus object currentConsensus.SetBlockVerifier(currentNode.VerifyNewBlock) - currentConsensus.OnConsensusDone = currentNode.PostConsensusProcessing + currentConsensus.PostConsensusJob = currentNode.PostConsensusProcessing // update consensus information based on the blockchain currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation()) currentConsensus.NextBlockDue = time.Now() diff --git a/consensus/consensus.go b/consensus/consensus.go index 5449c612c..08a559b6a 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -25,7 +25,16 @@ const ( vdfAndSeedSize = 548 // size of VDF/Proof and Seed ) -var errLeaderPriKeyNotFound = errors.New("getting leader private key from consensus public keys failed") +var errLeaderPriKeyNotFound = errors.New("leader private key not found locally") + +// ProposalType is to indicate the type of signal for new block proposal +type ProposalType byte + +// Constant of the type of new block proposal +const ( + SyncProposal ProposalType = iota + AsyncProposal +) // BlockVerifierFunc is a function used to verify the block type BlockVerifierFunc func(*types.Block) error @@ -79,11 +88,13 @@ type Consensus struct { mutex sync.Mutex // ViewChange struct vc *viewChange - // Signal channel for starting a new consensus process - ReadySignal chan struct{} - // The post-consensus processing func passed from Node object + // Signal channel for proposing a new block and start new consensus + ReadySignal chan ProposalType + // Channel to send full commit signatures to finish new block proposal + CommitSigChannel chan []byte + // The post-consensus job func passed from Node object // Called when consensus on a new block is done - OnConsensusDone func(*types.Block) error + PostConsensusJob func(*types.Block) error // The verifier func passed from Node object BlockVerifier BlockVerifierFunc // verified block to state sync broadcast @@ -206,7 +217,8 @@ func New( consensus.syncNotReadyChan = make(chan struct{}) consensus.SlashChan = make(chan slash.Record) consensus.commitFinishChan = make(chan uint64) - consensus.ReadySignal = make(chan struct{}) + consensus.ReadySignal = make(chan ProposalType) + consensus.CommitSigChannel = make(chan []byte) // channel for receiving newly generated VDF consensus.RndChannel = make(chan [vdfAndSeedSize]byte) consensus.IgnoreViewIDCheck = abool.NewBool(false) diff --git a/consensus/consensus_msg_sender.go b/consensus/consensus_msg_sender.go index 9d1da5149..6712ea3e7 100644 --- a/consensus/consensus_msg_sender.go +++ b/consensus/consensus_msg_sender.go @@ -68,6 +68,18 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa return sender.host.SendMessageToGroups(groups, p2pMsg) } +// DelayedSendWithRetry is similar to SendWithRetry but without the initial message sending but only retries. +func (sender *MessageSender) DelayedSendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte) { + if sender.retryTimes != 0 { + msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0} + atomic.StoreUint32(&msgRetry.isActive, 1) + sender.messagesToRetry.Store(msgType, &msgRetry) + go func() { + sender.Retry(&msgRetry) + }() + } +} + // SendWithoutRetry sends message without retry logic. func (sender *MessageSender) SendWithoutRetry(groups []nodeconfig.GroupID, p2pMsg []byte) error { return sender.host.SendMessageToGroups(groups, p2pMsg) diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index b3ede1404..2682b37cb 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -457,7 +457,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode { consensus.getLogger().Info(). Str("myKey", myPubKeys.SerializeToHexStr()). Msg("[UpdateConsensusInformation] I am the New Leader") - consensus.ReadySignal <- struct{}{} + consensus.ReadySignal <- SyncProposal }() } return Normal diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 2cada40e4..43de0d5d8 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -72,7 +72,7 @@ func TestConsensusInitialization(t *testing.T) { assert.IsType(t, make(chan uint64), consensus.commitFinishChan) assert.NotNil(t, consensus.commitFinishChan) - assert.IsType(t, make(chan struct{}), consensus.ReadySignal) + assert.IsType(t, make(chan ProposalType), consensus.ReadySignal) assert.NotNil(t, consensus.ReadySignal) assert.IsType(t, make(chan [vdfAndSeedSize]byte), consensus.RndChannel) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index c9f6d7161..f9a5dc910 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -7,6 +7,8 @@ import ( "sync/atomic" "time" + "github.com/harmony-one/harmony/internal/utils" + "github.com/rs/zerolog" msg_pb "github.com/harmony-one/harmony/api/proto/message" @@ -27,6 +29,15 @@ var ( errVerifyMessageSignature = errors.New("verify message signature failed") ) +// timeout constant +const ( + // CommitSigSenderTimeout is the timeout for sending the commit sig to finish block proposal + CommitSigSenderTimeout = 6 * time.Second + // CommitSigReceiverTimeout is the timeout for the receiving side of the commit sig + // if timeout, the receiver should instead ready directly from db for the commit sig + CommitSigReceiverTimeout = 4 * time.Second +) + // IsViewChangingMode return true if curernt mode is viewchanging func (consensus *Consensus) IsViewChangingMode() bool { return consensus.current.Mode() == ViewChanging @@ -100,28 +111,28 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb return nil } -func (consensus *Consensus) finalizeCommits() { +func (consensus *Consensus) finalCommit() { consensus.getLogger().Info(). Int64("NumCommits", consensus.Decider.SignersCount(quorum.Commit)). - Msg("[finalizeCommits] Finalizing Block") + Msg("[finalCommit] Finalizing Consensus") beforeCatchupNum := consensus.blockNum + leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey() if err != nil { - consensus.getLogger().Error().Err(err).Msg("[FinalizeCommits] leader not found") + consensus.getLogger().Error().Err(err).Msg("[finalCommit] leader not found") return } // Construct committed message network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey}) if err != nil { consensus.getLogger().Warn().Err(err). - Msg("[FinalizeCommits] Unable to construct Committed message") + Msg("[finalCommit] Unable to construct Committed message") return } - msgToSend, aggSig, FBFTMsg := + msgToSend, FBFTMsg := network.Bytes, - network.OptionalAggregateSignature, network.FBFTMsg - consensus.aggregatedCommitSig = aggSig // this may not needed + commitSigAndBitmap := FBFTMsg.Payload consensus.FBFTLog.AddMessage(FBFTMsg) // find correct block content curBlockHash := consensus.blockHash @@ -129,31 +140,55 @@ func (consensus *Consensus) finalizeCommits() { if block == nil { consensus.getLogger().Warn(). Str("blockHash", hex.EncodeToString(curBlockHash[:])). - Msg("[FinalizeCommits] Cannot find block by hash") + Msg("[finalCommit] Cannot find block by hash") return } - consensus.tryCatchup() - if consensus.blockNum-beforeCatchupNum != 1 { - consensus.getLogger().Warn(). + consensus.getLogger().Info().Hex("new", commitSigAndBitmap).Msg("[finalCommit] Overriding commit signatures!!") + consensus.Blockchain.WriteCommitSig(block.NumberU64(), commitSigAndBitmap) + + block.SetCurrentCommitSig(commitSigAndBitmap) + err = consensus.commitBlock(block, FBFTMsg) + + if err != nil || consensus.blockNum-beforeCatchupNum != 1 { + consensus.getLogger().Err(err). Uint64("beforeCatchupBlockNum", beforeCatchupNum). - Msg("[FinalizeCommits] Leader cannot provide the correct block for committed message") + Msg("[finalCommit] Leader failed to commit the confirmed block") return } - // if leader success finalize the block, send committed message to validators - if err := consensus.msgSender.SendWithRetry( - block.NumberU64(), - msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ - nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), - }, - p2p.ConstructMessage(msgToSend)); err != nil { - consensus.getLogger().Warn().Err(err).Msg("[finalizeCommits] Cannot send committed message") + // if leader successfully finalizes the block, send committed message to validators + // Note: leader already sent 67% commit in preCommit. The 100% commit won't be sent immediately + // to save network traffic. It will only be sent in retry if consensus doesn't move forward. + // Or if the leader is changed for next block, the 100% committed sig will be sent to the next leader immediately. + if !consensus.IsLeader() || block.IsLastBlockInEpoch() { + // send immediately + if err := consensus.msgSender.SendWithRetry( + block.NumberU64(), + msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ + nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), + }, + p2p.ConstructMessage(msgToSend)); err != nil { + consensus.getLogger().Warn().Err(err).Msg("[finalCommit] Cannot send committed message") + } else { + consensus.getLogger().Info(). + Hex("blockHash", curBlockHash[:]). + Uint64("blockNum", consensus.blockNum). + Msg("[finalCommit] Sent Committed Message") + } + consensus.consensusTimeout[timeoutConsensus].Start() } else { + // delayed send + consensus.msgSender.DelayedSendWithRetry( + block.NumberU64(), + msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ + nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), + }, + p2p.ConstructMessage(msgToSend)) consensus.getLogger().Info(). Hex("blockHash", curBlockHash[:]). Uint64("blockNum", consensus.blockNum). - Msg("[finalizeCommits] Sent Committed Message") + Msg("[finalCommit] Queued Committed Message") } // Dump new block into level db @@ -164,11 +199,10 @@ func (consensus *Consensus) finalizeCommits() { if consensus.consensusTimeout[timeoutBootstrap].IsActive() { consensus.consensusTimeout[timeoutBootstrap].Stop() - consensus.getLogger().Info().Msg("[finalizeCommits] Start consensus timer; stop bootstrap timer only once") + consensus.getLogger().Info().Msg("[finalCommit] Start consensus timer; stop bootstrap timer only once") } else { - consensus.getLogger().Info().Msg("[finalizeCommits] Start consensus timer") + consensus.getLogger().Info().Msg("[finalCommit] Start consensus timer") } - consensus.consensusTimeout[timeoutConsensus].Start() consensus.getLogger().Info(). Uint64("blockNum", block.NumberU64()). @@ -179,15 +213,26 @@ func (consensus *Consensus) finalizeCommits() { Int("numStakingTxns", len(block.StakingTransactions())). Msg("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!") - // Sleep to wait for the full block time - consensus.getLogger().Info().Msg("[finalizeCommits] Waiting for Block Time") - <-time.After(time.Until(consensus.NextBlockDue)) - - // Send signal to Node to propose the new block for consensus - consensus.ReadySignal <- struct{}{} - - // Update time due for next block - consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) + // If still the leader, send commit sig/bitmap to finish the new block proposal, + // else, the block proposal will timeout by itself. + if consensus.IsLeader() { + if block.IsLastBlockInEpoch() { + // No pipelining + go func() { + consensus.getLogger().Info().Msg("[finalCommit] sending block proposal signal") + consensus.ReadySignal <- SyncProposal + }() + } else { + // pipelining + go func() { + select { + case consensus.CommitSigChannel <- commitSigAndBitmap: + case <-time.After(CommitSigSenderTimeout): + utils.Logger().Error().Err(err).Msg("[finalCommit] channel not received after 6s for commitSigAndBitmap") + } + }() + } + } } // BlockCommitSigs returns the byte array of aggregated @@ -231,7 +276,7 @@ func (consensus *Consensus) Start( <-startChannel toStart <- struct{}{} consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal") - consensus.ReadySignal <- struct{}{} + consensus.ReadySignal <- SyncProposal }() } consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started") @@ -274,11 +319,15 @@ func (consensus *Consensus) Start( } case <-consensus.syncReadyChan: consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") - consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1) - consensus.SetViewIDs(consensus.Blockchain.CurrentHeader().ViewID().Uint64() + 1) - mode := consensus.UpdateConsensusInformation() - consensus.current.SetMode(mode) - consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC") + consensus.mutex.Lock() + if consensus.blockNum < consensus.Blockchain.CurrentHeader().Number().Uint64()+1 { + consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1) + consensus.SetViewIDs(consensus.Blockchain.CurrentHeader().ViewID().Uint64() + 1) + mode := consensus.UpdateConsensusInformation() + consensus.current.SetMode(mode) + consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC") + } + consensus.mutex.Unlock() case <-consensus.syncNotReadyChan: consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") @@ -291,6 +340,12 @@ func (consensus *Consensus) Start( Uint64("MsgBlockNum", newBlock.NumberU64()). Msg("[ConsensusMainLoop] Received Proposed New Block!") + // Sleep to wait for the full block time + consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time") + <-time.After(time.Until(consensus.NextBlockDue)) + // Update time due for next block + consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) + //VRF/VDF is only generated in the beacon chain if consensus.NeedsRandomNumberGeneration(newBlock.Header().Epoch()) { // generate VRF if the current block has a new leader @@ -376,14 +431,14 @@ func (consensus *Consensus) Start( consensus.announce(newBlock) case viewID := <-consensus.commitFinishChan: - consensus.getLogger().Info().Msg("[ConsensusMainLoop] commitFinishChan") + consensus.getLogger().Info().Uint64("viewID", viewID).Msg("[ConsensusMainLoop] commitFinishChan") // Only Leader execute this condition func() { consensus.mutex.Lock() defer consensus.mutex.Unlock() if viewID == consensus.GetCurBlockViewID() { - consensus.finalizeCommits() + consensus.finalCommit() } }() @@ -464,6 +519,64 @@ func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Bl return blocks, msgs, nil } +// preCommitAndPropose commit the current block with 67% commit signatures and start +// proposing new block which will wait on the full commit signatures to finish +func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { + if blk == nil { + return errors.New("block to pre-commit is nil") + } + + leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey() + if err != nil { + consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] leader not found") + return err + } + + // Construct committed message + consensus.mutex.Lock() + network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey}) + consensus.mutex.Unlock() + if err != nil { + consensus.getLogger().Warn().Err(err). + Msg("[preCommitAndPropose] Unable to construct Committed message") + return err + } + msgToSend, FBFTMsg := + network.Bytes, + network.FBFTMsg + bareMinimumCommit := FBFTMsg.Payload + consensus.FBFTLog.AddMessage(FBFTMsg) + + blk.SetCurrentCommitSig(bareMinimumCommit) + + if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, true); err != nil { + consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain") + return err + } + + // if leader successfully finalizes the block, send committed message to validators + if err := consensus.msgSender.SendWithRetry( + blk.NumberU64(), + msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ + nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), + }, + p2p.ConstructMessage(msgToSend)); err != nil { + consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message") + } else { + consensus.getLogger().Info(). + Str("blockHash", blk.Hash().Hex()). + Uint64("blockNum", consensus.blockNum). + Msg("[preCommitAndPropose] Sent Committed Message") + } + consensus.consensusTimeout[timeoutConsensus].Start() + + // Send signal to Node to propose the new block for consensus + consensus.getLogger().Info().Msg("[preCommitAndPropose] sending block proposal signal") + + consensus.ReadySignal <- AsyncProposal + return nil +} + // tryCatchup add the last mile block in PBFT log memory cache to blockchain. func (consensus *Consensus) tryCatchup() error { // TODO: change this to a more systematic symbol @@ -509,15 +622,30 @@ func (consensus *Consensus) tryCatchup() error { } func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error { - if err := consensus.OnConsensusDone(blk); err != nil { - return err + if consensus.Blockchain.CurrentBlock().NumberU64() < blk.NumberU64() { + if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, true); err != nil { + consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain") + return err + } } + if !committedMsg.HasSingleSender() { consensus.getLogger().Error().Msg("[TryCatchup] Leader message can not have multiple sender keys") return errIncorrectSender } - atomic.AddUint64(&consensus.blockNum, 1) + consensus.PostConsensusJob(blk) + consensus.SetupForNewConsensus(blk, committedMsg) + consensus.FinishFinalityCount() + utils.Logger().Info().Uint64("blockNum", blk.NumberU64()). + Str("hash", blk.Header().Hash().Hex()). + Msg("Added New Block to Blockchain!!!") + return nil +} + +// SetupForNewConsensus sets the state for new consensus +func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) { + atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1) consensus.SetCurBlockViewID(committedMsg.ViewID + 1) consensus.LeaderPubKey = committedMsg.SenderPubkeys[0] // Update consensus keys at last so the change of leader status doesn't mess up normal flow @@ -525,7 +653,6 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess consensus.SetMode(consensus.UpdateConsensusInformation()) } consensus.ResetState() - return nil } func (consensus *Consensus) postCatchup(initBN uint64) { diff --git a/consensus/leader.go b/consensus/leader.go index 988380ffa..664054383 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -48,7 +48,6 @@ func (consensus *Consensus) announce(block *types.Block) { } msgToSend, FPBTMsg := networkMessage.Bytes, networkMessage.FBFTMsg - // TODO(chao): review FPBT log data structure consensus.FBFTLog.AddMessage(FPBTMsg) consensus.getLogger().Debug(). Str("MsgBlockHash", FPBTMsg.BlockHash.Hex()). @@ -301,6 +300,14 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { if !quorumWasMet && quorumIsMet { logger.Info().Msg("[OnCommit] 2/3 Enough commits received") + if !blockObj.IsLastBlockInEpoch() { + // only do early commit if it's not epoch block to avoid problems + go func() { + // TODO: make it synchronized with commitFinishChan + consensus.preCommitAndPropose(blockObj) + }() + } + consensus.getLogger().Info().Msg("[OnCommit] Starting Grace Period") go func(viewID uint64) { time.Sleep(2500 * time.Millisecond) diff --git a/consensus/quorum/one-node-staked-vote.go b/consensus/quorum/one-node-staked-vote.go index 0f65257c9..e18b99f99 100644 --- a/consensus/quorum/one-node-staked-vote.go +++ b/consensus/quorum/one-node-staked-vote.go @@ -112,12 +112,12 @@ func (v *stakedVoteWeight) AddNewVote( t := v.QuorumThreshold() - msg := "Attempt to reach quorum" + msg := "[AddNewVote] New Vote Added!" if !tallyQuorum.quorumAchieved { tallyQuorum.quorumAchieved = tallyQuorum.tally.GT(t) if tallyQuorum.quorumAchieved { - msg = "Quorum Achieved!" + msg = "[AddNewVote] Quorum Achieved!" } } utils.Logger().Info(). @@ -173,9 +173,11 @@ func (v *stakedVoteWeight) computeTotalPowerByMask(mask *bls_cosi.Mask) *numeric for key, i := range mask.PublicsIndex { if enabled, err := mask.IndexEnabled(i); err == nil && enabled { - currentTotal = currentTotal.Add( - v.roster.Voters[key].OverallPercent, - ) + if voter, ok := v.roster.Voters[key]; ok { + currentTotal = currentTotal.Add( + voter.OverallPercent, + ) + } } } return ¤tTotal diff --git a/consensus/validator.go b/consensus/validator.go index 3dd3f03f8..e1615541f 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -1,7 +1,6 @@ package consensus import ( - "bytes" "encoding/hex" "time" @@ -169,12 +168,6 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { Hex("blockHash", recvMsg.BlockHash[:]). Msg("[OnPrepared] Prepared message and block added") - // tryCatchup is also run in onCommitted(), so need to lock with commitMutex. - if consensus.current.Mode() != Normal { - // don't sign the block that is not verified - consensus.getLogger().Info().Msg("[OnPrepared] Not in normal mode, Exiting!!") - return - } if consensus.BlockVerifier == nil { consensus.getLogger().Debug().Msg("[onPrepared] consensus received message before init. Ignoring") return @@ -212,9 +205,13 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { consensus.prepareBitmap = mask // Optimistically add blockhash field of prepare message - emptyHash := [32]byte{} - if bytes.Equal(consensus.blockHash[:], emptyHash[:]) { - copy(consensus.blockHash[:], blockHash[:]) + copy(consensus.blockHash[:], blockHash[:]) + + // tryCatchup is also run in onCommitted(), so need to lock with commitMutex. + if consensus.current.Mode() != Normal { + // don't sign the block that is not verified + consensus.getLogger().Info().Msg("[OnPrepared] Not in normal mode, Exiting!!") + return } consensus.sendCommitMessages(&blockObj) @@ -232,10 +229,14 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { Uint64("MsgViewID", recvMsg.ViewID). Msg("[OnCommitted] Received committed message") - // NOTE let it handle its own logs - if !consensus.isRightBlockNumCheck(recvMsg) { + // Ok to receive committed from last block since it could have more signatures + if recvMsg.BlockNum < consensus.blockNum-1 { + consensus.getLogger().Debug(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Msg("Wrong BlockNum Received, ignoring!") return } + if recvMsg.BlockNum > consensus.blockNum { consensus.getLogger().Info().Msg("[OnCommitted] low consensus block number. Spin up state sync") consensus.spinUpStateSync() @@ -247,10 +248,13 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { return } if !consensus.Decider.IsQuorumAchievedByMask(mask) { - consensus.getLogger().Warn().Msgf("[OnCommitted] Quorum Not achieved.") + consensus.getLogger().Warn().Hex("sigbitmap", recvMsg.Payload).Msgf("[OnCommitted] Quorum Not achieved.") return } + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + // Must have the corresponding block to verify committed message. blockObj := consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash) if blockObj == nil { @@ -272,9 +276,6 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { consensus.FBFTLog.AddMessage(recvMsg) - consensus.mutex.Lock() - defer consensus.mutex.Unlock() - consensus.aggregatedCommitSig = aggSig consensus.commitBitmap = mask diff --git a/consensus/view_change.go b/consensus/view_change.go index 3ef66eb9d..7b8cb42a4 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -396,7 +396,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { } go func() { - consensus.ReadySignal <- struct{}{} + consensus.ReadySignal <- SyncProposal }() return } diff --git a/crypto/bls/bls.go b/crypto/bls/bls.go index c8d0bddf9..2c84e0fc7 100644 --- a/crypto/bls/bls.go +++ b/crypto/bls/bls.go @@ -99,7 +99,7 @@ func (pk *SerializedPublicKey) FromLibBLSPublicKey(key *bls.PublicKey) error { // SeparateSigAndMask parse the commig signature data into signature and bitmap. func SeparateSigAndMask(commitSigs []byte) ([]byte, []byte, error) { if len(commitSigs) < BLSSignatureSizeInBytes { - return nil, nil, errors.New("no mask data found in commit sigs") + return nil, nil, errors.Errorf("no mask data found in commit sigs: %x", commitSigs) } //#### Read payload data from committed msg aggSig := make([]byte, BLSSignatureSizeInBytes) diff --git a/internal/chain/engine.go b/internal/chain/engine.go index 9cb28e4cb..5a6f3b4b1 100644 --- a/internal/chain/engine.go +++ b/internal/chain/engine.go @@ -195,8 +195,8 @@ func (e *engineImpl) VerifySeal(chain engine.ChainReader, header *block.Header) lastCommitPayload := signature.ConstructCommitPayload(chain, parentHeader.Epoch(), parentHeader.Hash(), parentHeader.Number().Uint64(), parentHeader.ViewID().Uint64()) if !aggSig.VerifyHash(mask.AggregatePublic, lastCommitPayload) { - const msg = "[VerifySeal] Unable to verify aggregated signature from last block" - return errors.New(msg) + const msg = "[VerifySeal] Unable to verify aggregated signature from last block: %x" + return errors.Errorf(msg, payload) } return nil } diff --git a/internal/chain/reward.go b/internal/chain/reward.go index 6b0a7d06e..eeaf1e873 100644 --- a/internal/chain/reward.go +++ b/internal/chain/reward.go @@ -28,6 +28,12 @@ import ( "github.com/pkg/errors" ) +// timeout constant +const ( + // AsyncBlockProposalTimeout is the timeout which will abort the async block proposal. + AsyncBlockProposalTimeout = 5 * time.Second +) + func ballotResultBeaconchain( bc engine.ChainReader, header *block.Header, ) (*big.Int, shard.SlotList, shard.SlotList, shard.SlotList, error) { @@ -473,7 +479,7 @@ func waitForCommitSigs(sigsReady chan bool) error { return errors.New("Failed to get commit sigs") } utils.Logger().Info().Msg("Commit sigs are ready") - case <-time.After(5 * time.Second): + case <-time.After(AsyncBlockProposalTimeout): return errors.New("Timeout waiting for commit sigs for reward calculation") } return nil diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 8738ecbb1..3e7878a7c 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -20,6 +20,7 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block) { //#### Read payload data from committed msg if len(commitSigAndBitmap) <= 96 { utils.Logger().Debug().Int("commitSigAndBitmapLen", len(commitSigAndBitmap)).Msg("[BroadcastCXReceipts] commitSigAndBitmap Not Enough Length") + return } commitSig := make([]byte, 96) commitBitmap := make([]byte, len(commitSigAndBitmap)-96) diff --git a/node/node_handler.go b/node/node_handler.go index 88265722a..5a6a72864 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -353,16 +353,6 @@ func (node *Node) numSignaturesIncludedInBlock(block *types.Block) uint32 { // 2. [leader] send new block to the client // 3. [leader] send cross shard tx receipts to destination shard func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { - if _, err := node.Blockchain().InsertChain([]*types.Block{newBlock}, true); err != nil { - return err - } - utils.Logger().Info(). - Uint64("blockNum", newBlock.NumberU64()). - Str("hash", newBlock.Header().Hash().Hex()). - Msg("Added New Block to Blockchain!!!") - - node.Consensus.FinishFinalityCount() - if node.Consensus.IsLeader() { if node.NodeConfig.ShardID == shard.BeaconChainShardID { node.BroadcastNewBlock(newBlock) @@ -400,11 +390,13 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { for _, addr := range node.GetAddresses(newBlock.Epoch()) { wrapper, err := node.Beaconchain().ReadValidatorInformation(addr) if err != nil { - return err + utils.Logger().Err(err).Str("addr", addr.Hex()).Msg("failed reaching validator info") + return nil } snapshot, err := node.Beaconchain().ReadValidatorSnapshot(addr) if err != nil { - return err + utils.Logger().Err(err).Str("addr", addr.Hex()).Msg("failed reaching validator snapshot") + return nil } computed := availability.ComputeCurrentSigning( snapshot.Validator, wrapper, diff --git a/node/node_newblock.go b/node/node_newblock.go index fcfd45461..e8f8698d9 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -6,6 +6,10 @@ import ( "strings" "time" + "github.com/harmony-one/harmony/consensus" + + "github.com/harmony-one/harmony/crypto/bls" + staking "github.com/harmony-one/harmony/staking/types" "github.com/ethereum/go-ethereum/common" @@ -23,14 +27,13 @@ const ( // WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus. // only leader will receive the ready signal -func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan chan struct{}, stoppedChan chan struct{}) { +func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{}) { go func() { // Setup stoppedChan defer close(stoppedChan) utils.Logger().Debug(). Msg("Waiting for Consensus ready") - // TODO: make local net start faster time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only) for { @@ -40,7 +43,8 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan ch utils.Logger().Debug(). Msg("Consensus new block proposal: STOPPED!") return - case <-readySignal: + case proposalType := <-readySignal: + retryCount := 0 for node.Consensus != nil && node.Consensus.IsLeader() { time.Sleep(SleepPeriod) utils.Logger().Info(). @@ -48,20 +52,36 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan ch Msg("PROPOSING NEW BLOCK ------------------------------------------------") // Prepare last commit signatures - commitSigs := make(chan []byte) - sigs, err := node.Consensus.BlockCommitSigs(node.Blockchain().CurrentBlock().NumberU64()) + newCommitSigsChan := make(chan []byte) - node.Consensus.StartFinalityCount() - if err != nil { - utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Cannot get commit signatures from last block") - break - } - // Currently the block proposal is not triggered asynchronously yet with last consensus. - // TODO: trigger block proposal when 66% commit, and feed and final commit sigs here. go func() { - commitSigs <- sigs + waitTime := 0 * time.Second + if proposalType == consensus.AsyncProposal { + waitTime = consensus.CommitSigReceiverTimeout + } + select { + case <-time.After(waitTime): + if waitTime == 0 { + utils.Logger().Info().Msg("[ProposeNewBlock] Sync block proposal, reading commit sigs directly from DB") + } else { + utils.Logger().Info().Msg("[ProposeNewBlock] Timeout waiting for commit sigs, reading directly from DB") + } + sigs, err := node.Consensus.BlockCommitSigs(node.Blockchain().CurrentBlock().NumberU64()) + + if err != nil { + utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Cannot get commit signatures from last block") + } else { + newCommitSigsChan <- sigs + } + case commitSigs := <-commitSigsChan: + utils.Logger().Info().Msg("[ProposeNewBlock] received commit sigs asynchronously") + if len(commitSigs) > bls.BLSSignatureSizeInBytes { + newCommitSigsChan <- commitSigs + } + } }() - newBlock, err := node.ProposeNewBlock(commitSigs) + node.Consensus.StartFinalityCount() + newBlock, err := node.ProposeNewBlock(newCommitSigsChan) if err == nil { utils.Logger().Info(). @@ -77,7 +97,14 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan ch node.BlockChannel <- newBlock break } else { - utils.Logger().Err(err).Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!") + retryCount++ + utils.Logger().Err(err).Int("retryCount", retryCount). + Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!") + if retryCount > 3 { + // break to avoid repeated failures + break + } + continue } } } diff --git a/node/node_syncing.go b/node/node_syncing.go index 77d2aa8ca..7e5fab3b6 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -260,7 +260,7 @@ func (node *Node) doSync(bc *core.BlockChain, worker *worker.Worker, willJoinCon utils.Logger().Debug().Int("len", node.stateSync.GetActivePeerNumber()).Msg("[SYNC] Get Active Peers") } // TODO: treat fake maximum height - if node.stateSync.IsOutOfSync(bc) { + if node.stateSync.IsOutOfSync(bc, true) { node.IsInSync.UnSet() if willJoinConsensus { node.Consensus.BlocksNotSynchronized() @@ -542,5 +542,5 @@ func (node *Node) GetMaxPeerHeight() uint64 { // IsOutOfSync ... func (node *Node) IsOutOfSync(bc *core.BlockChain) bool { - return node.stateSync.IsOutOfSync(bc) + return node.stateSync.IsOutOfSync(bc, false) } diff --git a/node/service_setup.go b/node/service_setup.go index 9b0000e38..ed55607db 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -30,7 +30,7 @@ func (node *Node) setupForValidator() { // Register new block service. node.serviceManager.RegisterService( service.BlockProposal, - blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReadyV2), + blockproposal.New(node.Consensus.ReadySignal, node.Consensus.CommitSigChannel, node.WaitForConsensusReadyV2), ) } diff --git a/node/worker/worker.go b/node/worker/worker.go index 496c81091..78272fd67 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -7,6 +7,8 @@ import ( "sort" "time" + "github.com/harmony-one/harmony/consensus" + "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/hash" @@ -504,7 +506,7 @@ func (w *Worker) FinalizeNewBlock( copyHeader.SetLastCommitBitmap(signers) } sigsReady <- true - case <-time.After(5 * time.Second): + case <-time.After(consensus.CommitSigReceiverTimeout): // Exit goroutine utils.Logger().Warn().Msg("Timeout waiting for commit sigs") }