diff --git a/consensus/consensus.go b/consensus/consensus.go index 37fd2c190..aed78fcbd 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -142,6 +142,8 @@ type Consensus struct { // The p2p host used to send/receive p2p messages host p2p.Host + // MessageSender takes are of sending consensus message and the corresponding retry logic. + msgSender *MessageSender // Staking information finder stakeInfoFinder StakeInfoFinder @@ -233,6 +235,7 @@ type StakeInfoFinder interface { func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKey) (*Consensus, error) { consensus := Consensus{} consensus.host = host + consensus.msgSender = NewMessageSender(host) consensus.blockNumLowChan = make(chan struct{}) // pbft related diff --git a/consensus/consensus_msg_sender.go b/consensus/consensus_msg_sender.go new file mode 100644 index 000000000..97427a459 --- /dev/null +++ b/consensus/consensus_msg_sender.go @@ -0,0 +1,138 @@ +package consensus + +import ( + "sync" + "time" + + msg_pb "github.com/harmony-one/harmony/api/proto/message" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" +) + +const ( + // RetryIntervalInSec is the interval for message retry + RetryIntervalInSec = 10 +) + +// MessageSender is the wrapper object that controls how a consensus message is sent +type MessageSender struct { + blockNum uint64 // The current block number at consensus + blockNumMutex sync.Mutex + messagesToRetry sync.Map + // The p2p host used to send/receive p2p messages + host p2p.Host + // RetryTimes is number of retry attempts + retryTimes int +} + +// MessageRetry controls the message that can be retried +type MessageRetry struct { + blockNum uint64 // The block number this message is for + groups []p2p.GroupID + p2pMsg []byte + msgType msg_pb.MessageType + retryCount int + isActive bool + isActiveMutex sync.Mutex +} + +// NewMessageSender initializes the consensus message sender. +func NewMessageSender(host p2p.Host) *MessageSender { + return &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec} +} + +// Reset resets the sender's state for new block +func (sender *MessageSender) Reset(blockNum uint64) { + sender.blockNumMutex.Lock() + sender.blockNum = blockNum + sender.blockNumMutex.Unlock() + sender.StopAllRetriesExceptCommitted() + sender.messagesToRetry.Range(func(key interface{}, value interface{}) bool { + if msgRetry, ok := value.(*MessageRetry); ok { + if msgRetry.msgType != msg_pb.MessageType_COMMITTED { + sender.messagesToRetry.Delete(key) + } + } + return true + }) +} + +// SendWithRetry sends message with retry logic. +func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []p2p.GroupID, p2pMsg []byte) error { + willRetry := sender.retryTimes != 0 + msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0, isActive: willRetry} + if willRetry { + sender.messagesToRetry.Store(msgType, &msgRetry) + go func() { + sender.Retry(&msgRetry) + }() + } + return sender.host.SendMessageToGroups(groups, p2pMsg) +} + +// SendWithoutRetry sends message without retry logic. +func (sender *MessageSender) SendWithoutRetry(groups []p2p.GroupID, p2pMsg []byte) error { + return sender.host.SendMessageToGroups(groups, p2pMsg) +} + +// Retry will retry the consensus message for times. +func (sender *MessageSender) Retry(msgRetry *MessageRetry) { + for { + time.Sleep(RetryIntervalInSec * time.Second) + + if msgRetry.retryCount >= sender.retryTimes { + // Retried enough times + return + } + + msgRetry.isActiveMutex.Lock() + if !msgRetry.isActive { + msgRetry.isActiveMutex.Unlock() + // Retry is stopped + return + } + msgRetry.isActiveMutex.Unlock() + + if msgRetry.msgType != msg_pb.MessageType_COMMITTED { + sender.blockNumMutex.Lock() + if msgRetry.blockNum < sender.blockNum { + sender.blockNumMutex.Unlock() + // Block already moved ahead, no need to retry old block's messages + return + } + sender.blockNumMutex.Unlock() + } + + msgRetry.retryCount++ + if err := sender.host.SendMessageToGroups(msgRetry.groups, msgRetry.p2pMsg); err != nil { + utils.GetLogInstance().Warn("[Retry] Failed re-sending consensus message", "groupID", msgRetry.groups, "blockNum", msgRetry.blockNum, "MsgType", msgRetry.msgType, "RetryCount", msgRetry.retryCount) + } else { + utils.GetLogInstance().Info("[Retry] Successfully resent consensus message", "groupID", msgRetry.groups, "blockNum", msgRetry.blockNum, "MsgType", msgRetry.msgType, "RetryCount", msgRetry.retryCount) + } + } +} + +// StopRetry stops the retry. +func (sender *MessageSender) StopRetry(msgType msg_pb.MessageType) { + data, ok := sender.messagesToRetry.Load(msgType) + if ok { + msgRetry := data.(*MessageRetry) + msgRetry.isActiveMutex.Lock() + msgRetry.isActive = false + msgRetry.isActiveMutex.Unlock() + } +} + +// StopAllRetriesExceptCommitted stops all the existing retries except committed message (which lives across consensus). +func (sender *MessageSender) StopAllRetriesExceptCommitted() { + sender.messagesToRetry.Range(func(k, v interface{}) bool { + if msgRetry, ok := v.(*MessageRetry); ok { + if msgRetry.msgType != msg_pb.MessageType_COMMITTED { + msgRetry.isActiveMutex.Lock() + msgRetry.isActive = false + msgRetry.isActiveMutex.Unlock() + } + } + return true + }) +} diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index a4a37a3bc..3d499d396 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -127,7 +127,8 @@ func (consensus *Consensus) announce(block *types.Block) { } // Construct broadcast p2p message - if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + + if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { consensus.getLogger().Warn(). Str("groupID", string(p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))). Msg("[Announce] Cannot send announce message") @@ -257,7 +258,8 @@ func (consensus *Consensus) prepare() { // Construct and send prepare message msgToSend := consensus.constructPrepareMessage() // TODO: this will not return immediatey, may block - if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + + if err := consensus.msgSender.SendWithoutRetry([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") } else { consensus.getLogger().Info(). @@ -378,7 +380,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { return } - if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_PREPARED, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message") } else { consensus.getLogger().Debug(). @@ -386,6 +388,9 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { Uint64("BlockNum", consensus.blockNum). Msg("[OnPrepare] Sent Prepared Message!!") } + consensus.msgSender.StopRetry(msg_pb.MessageType_ANNOUNCE) + consensus.msgSender.StopRetry(msg_pb.MessageType_COMMITTED) // Stop retry committed msg of last consensus + consensus.getLogger().Debug(). Str("From", consensus.phase.String()). Str("To", Commit.String()). @@ -557,7 +562,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { time.Sleep(consensus.delayCommit) } - if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + if err := consensus.msgSender.SendWithoutRetry([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") } else { consensus.getLogger().Info(). @@ -681,6 +686,8 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { logger.Debug().Msg("[OnCommit] Commit Grace Period Ended") consensus.commitFinishChan <- viewID }(consensus.viewID) + + consensus.msgSender.StopRetry(msg_pb.MessageType_PREPARED) } if rewardThresholdIsMet { @@ -729,7 +736,8 @@ func (consensus *Consensus) finalizeCommits() { return } // if leader success finalize the block, send committed message to validators - if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + + if err := consensus.msgSender.SendWithRetry(block.NumberU64(), msg_pb.MessageType_COMMITTED, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { consensus.getLogger().Warn().Err(err).Msg("[Finalizing] Cannot send committed message") } else { consensus.getLogger().Info(). @@ -1104,6 +1112,8 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan } startTime = time.Now() + consensus.msgSender.Reset(newBlock.NumberU64()) + consensus.getLogger().Debug(). Int("numTxs", len(newBlock.Transactions())). Interface("consensus", consensus). diff --git a/consensus/view_change.go b/consensus/view_change.go index fda4d31d4..99aa58b1c 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -427,11 +427,12 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { consensus.mode.SetViewID(recvMsg.ViewID) msgToSend := consensus.constructNewViewMessage() + consensus.getLogger().Warn(). Int("payloadSize", len(consensus.m1Payload)). Bytes("M1Payload", consensus.m1Payload). Msg("[onViewChange] Sent NewView Message") - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) + consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_NEWVIEW, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.viewID = recvMsg.ViewID consensus.ResetViewChangeState() diff --git a/node/node_newblock.go b/node/node_newblock.go index c8451ceb1..3f7fbbd5b 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -15,8 +15,7 @@ import ( // Constants of lower bound limit of a new block. const ( - ConsensusTimeOut = 20 - PeriodicBlock = 200 * time.Millisecond + PeriodicBlock = 200 * time.Millisecond ) // WaitForConsensusReadyv2 listen for the readiness signal from consensus and generate new block for consensus. @@ -30,7 +29,6 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch utils.GetLogInstance().Debug("Waiting for Consensus ready") time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only) - timeoutCount := 0 var newBlock *types.Block // Set up the very first deadline. @@ -41,16 +39,6 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch case <-stopChan: utils.GetLogInstance().Debug("Consensus new block proposal: STOPPED!") return - case <-time.After(ConsensusTimeOut * time.Second): - if node.Consensus.PubKey.IsEqual(node.Consensus.LeaderPubKey) { - utils.GetLogInstance().Debug("Leader consensus timeout, retry!", "count", timeoutCount) - //node.Consensus.ResetState() - timeoutCount++ - if newBlock != nil { - // Send the new block to Consensus so it can be confirmed. - node.BlockChannel <- newBlock - } - } case <-readySignal: for { time.Sleep(PeriodicBlock)