Merge pull request #1245 from rlan35/r3

Add leader consensus retry logic
pull/1268/head
Rongjian Lan 5 years ago committed by GitHub
commit 3e26c31d13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      consensus/consensus.go
  2. 138
      consensus/consensus_msg_sender.go
  3. 20
      consensus/consensus_v2.go
  4. 3
      consensus/view_change.go
  5. 14
      node/node_newblock.go

@ -142,6 +142,8 @@ type Consensus struct {
// The p2p host used to send/receive p2p messages // The p2p host used to send/receive p2p messages
host p2p.Host host p2p.Host
// MessageSender takes are of sending consensus message and the corresponding retry logic.
msgSender *MessageSender
// Staking information finder // Staking information finder
stakeInfoFinder StakeInfoFinder stakeInfoFinder StakeInfoFinder
@ -233,6 +235,7 @@ type StakeInfoFinder interface {
func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKey) (*Consensus, error) { func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKey) (*Consensus, error) {
consensus := Consensus{} consensus := Consensus{}
consensus.host = host consensus.host = host
consensus.msgSender = NewMessageSender(host)
consensus.blockNumLowChan = make(chan struct{}) consensus.blockNumLowChan = make(chan struct{})
// pbft related // pbft related

@ -0,0 +1,138 @@
package consensus
import (
"sync"
"time"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
)
const (
// RetryIntervalInSec is the interval for message retry
RetryIntervalInSec = 10
)
// MessageSender is the wrapper object that controls how a consensus message is sent
type MessageSender struct {
blockNum uint64 // The current block number at consensus
blockNumMutex sync.Mutex
messagesToRetry sync.Map
// The p2p host used to send/receive p2p messages
host p2p.Host
// RetryTimes is number of retry attempts
retryTimes int
}
// MessageRetry controls the message that can be retried
type MessageRetry struct {
blockNum uint64 // The block number this message is for
groups []p2p.GroupID
p2pMsg []byte
msgType msg_pb.MessageType
retryCount int
isActive bool
isActiveMutex sync.Mutex
}
// NewMessageSender initializes the consensus message sender.
func NewMessageSender(host p2p.Host) *MessageSender {
return &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
}
// Reset resets the sender's state for new block
func (sender *MessageSender) Reset(blockNum uint64) {
sender.blockNumMutex.Lock()
sender.blockNum = blockNum
sender.blockNumMutex.Unlock()
sender.StopAllRetriesExceptCommitted()
sender.messagesToRetry.Range(func(key interface{}, value interface{}) bool {
if msgRetry, ok := value.(*MessageRetry); ok {
if msgRetry.msgType != msg_pb.MessageType_COMMITTED {
sender.messagesToRetry.Delete(key)
}
}
return true
})
}
// SendWithRetry sends message with retry logic.
func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []p2p.GroupID, p2pMsg []byte) error {
willRetry := sender.retryTimes != 0
msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0, isActive: willRetry}
if willRetry {
sender.messagesToRetry.Store(msgType, &msgRetry)
go func() {
sender.Retry(&msgRetry)
}()
}
return sender.host.SendMessageToGroups(groups, p2pMsg)
}
// SendWithoutRetry sends message without retry logic.
func (sender *MessageSender) SendWithoutRetry(groups []p2p.GroupID, p2pMsg []byte) error {
return sender.host.SendMessageToGroups(groups, p2pMsg)
}
// Retry will retry the consensus message for <RetryTimes> times.
func (sender *MessageSender) Retry(msgRetry *MessageRetry) {
for {
time.Sleep(RetryIntervalInSec * time.Second)
if msgRetry.retryCount >= sender.retryTimes {
// Retried enough times
return
}
msgRetry.isActiveMutex.Lock()
if !msgRetry.isActive {
msgRetry.isActiveMutex.Unlock()
// Retry is stopped
return
}
msgRetry.isActiveMutex.Unlock()
if msgRetry.msgType != msg_pb.MessageType_COMMITTED {
sender.blockNumMutex.Lock()
if msgRetry.blockNum < sender.blockNum {
sender.blockNumMutex.Unlock()
// Block already moved ahead, no need to retry old block's messages
return
}
sender.blockNumMutex.Unlock()
}
msgRetry.retryCount++
if err := sender.host.SendMessageToGroups(msgRetry.groups, msgRetry.p2pMsg); err != nil {
utils.GetLogInstance().Warn("[Retry] Failed re-sending consensus message", "groupID", msgRetry.groups, "blockNum", msgRetry.blockNum, "MsgType", msgRetry.msgType, "RetryCount", msgRetry.retryCount)
} else {
utils.GetLogInstance().Info("[Retry] Successfully resent consensus message", "groupID", msgRetry.groups, "blockNum", msgRetry.blockNum, "MsgType", msgRetry.msgType, "RetryCount", msgRetry.retryCount)
}
}
}
// StopRetry stops the retry.
func (sender *MessageSender) StopRetry(msgType msg_pb.MessageType) {
data, ok := sender.messagesToRetry.Load(msgType)
if ok {
msgRetry := data.(*MessageRetry)
msgRetry.isActiveMutex.Lock()
msgRetry.isActive = false
msgRetry.isActiveMutex.Unlock()
}
}
// StopAllRetriesExceptCommitted stops all the existing retries except committed message (which lives across consensus).
func (sender *MessageSender) StopAllRetriesExceptCommitted() {
sender.messagesToRetry.Range(func(k, v interface{}) bool {
if msgRetry, ok := v.(*MessageRetry); ok {
if msgRetry.msgType != msg_pb.MessageType_COMMITTED {
msgRetry.isActiveMutex.Lock()
msgRetry.isActive = false
msgRetry.isActiveMutex.Unlock()
}
}
return true
})
}

@ -127,7 +127,8 @@ func (consensus *Consensus) announce(block *types.Block) {
} }
// Construct broadcast p2p message // Construct broadcast p2p message
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn(). consensus.getLogger().Warn().
Str("groupID", string(p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))). Str("groupID", string(p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))).
Msg("[Announce] Cannot send announce message") Msg("[Announce] Cannot send announce message")
@ -257,7 +258,8 @@ func (consensus *Consensus) prepare() {
// Construct and send prepare message // Construct and send prepare message
msgToSend := consensus.constructPrepareMessage() msgToSend := consensus.constructPrepareMessage()
// TODO: this will not return immediatey, may block // TODO: this will not return immediatey, may block
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
if err := consensus.msgSender.SendWithoutRetry([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message")
} else { } else {
consensus.getLogger().Info(). consensus.getLogger().Info().
@ -378,7 +380,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
return return
} }
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_PREPARED, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message") consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message")
} else { } else {
consensus.getLogger().Debug(). consensus.getLogger().Debug().
@ -386,6 +388,9 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
Uint64("BlockNum", consensus.blockNum). Uint64("BlockNum", consensus.blockNum).
Msg("[OnPrepare] Sent Prepared Message!!") Msg("[OnPrepare] Sent Prepared Message!!")
} }
consensus.msgSender.StopRetry(msg_pb.MessageType_ANNOUNCE)
consensus.msgSender.StopRetry(msg_pb.MessageType_COMMITTED) // Stop retry committed msg of last consensus
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Str("From", consensus.phase.String()). Str("From", consensus.phase.String()).
Str("To", Commit.String()). Str("To", Commit.String()).
@ -557,7 +562,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
time.Sleep(consensus.delayCommit) time.Sleep(consensus.delayCommit)
} }
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { if err := consensus.msgSender.SendWithoutRetry([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!")
} else { } else {
consensus.getLogger().Info(). consensus.getLogger().Info().
@ -681,6 +686,8 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
logger.Debug().Msg("[OnCommit] Commit Grace Period Ended") logger.Debug().Msg("[OnCommit] Commit Grace Period Ended")
consensus.commitFinishChan <- viewID consensus.commitFinishChan <- viewID
}(consensus.viewID) }(consensus.viewID)
consensus.msgSender.StopRetry(msg_pb.MessageType_PREPARED)
} }
if rewardThresholdIsMet { if rewardThresholdIsMet {
@ -729,7 +736,8 @@ func (consensus *Consensus) finalizeCommits() {
return return
} }
// if leader success finalize the block, send committed message to validators // if leader success finalize the block, send committed message to validators
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
if err := consensus.msgSender.SendWithRetry(block.NumberU64(), msg_pb.MessageType_COMMITTED, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[Finalizing] Cannot send committed message") consensus.getLogger().Warn().Err(err).Msg("[Finalizing] Cannot send committed message")
} else { } else {
consensus.getLogger().Info(). consensus.getLogger().Info().
@ -1104,6 +1112,8 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
} }
startTime = time.Now() startTime = time.Now()
consensus.msgSender.Reset(newBlock.NumberU64())
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Int("numTxs", len(newBlock.Transactions())). Int("numTxs", len(newBlock.Transactions())).
Interface("consensus", consensus). Interface("consensus", consensus).

@ -427,11 +427,12 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.mode.SetViewID(recvMsg.ViewID) consensus.mode.SetViewID(recvMsg.ViewID)
msgToSend := consensus.constructNewViewMessage() msgToSend := consensus.constructNewViewMessage()
consensus.getLogger().Warn(). consensus.getLogger().Warn().
Int("payloadSize", len(consensus.m1Payload)). Int("payloadSize", len(consensus.m1Payload)).
Bytes("M1Payload", consensus.m1Payload). Bytes("M1Payload", consensus.m1Payload).
Msg("[onViewChange] Sent NewView Message") Msg("[onViewChange] Sent NewView Message")
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_NEWVIEW, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.viewID = recvMsg.ViewID consensus.viewID = recvMsg.ViewID
consensus.ResetViewChangeState() consensus.ResetViewChangeState()

@ -15,8 +15,7 @@ import (
// Constants of lower bound limit of a new block. // Constants of lower bound limit of a new block.
const ( const (
ConsensusTimeOut = 20 PeriodicBlock = 200 * time.Millisecond
PeriodicBlock = 200 * time.Millisecond
) )
// WaitForConsensusReadyv2 listen for the readiness signal from consensus and generate new block for consensus. // WaitForConsensusReadyv2 listen for the readiness signal from consensus and generate new block for consensus.
@ -30,7 +29,6 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
utils.GetLogInstance().Debug("Waiting for Consensus ready") utils.GetLogInstance().Debug("Waiting for Consensus ready")
time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only) time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only)
timeoutCount := 0
var newBlock *types.Block var newBlock *types.Block
// Set up the very first deadline. // Set up the very first deadline.
@ -41,16 +39,6 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
case <-stopChan: case <-stopChan:
utils.GetLogInstance().Debug("Consensus new block proposal: STOPPED!") utils.GetLogInstance().Debug("Consensus new block proposal: STOPPED!")
return return
case <-time.After(ConsensusTimeOut * time.Second):
if node.Consensus.PubKey.IsEqual(node.Consensus.LeaderPubKey) {
utils.GetLogInstance().Debug("Leader consensus timeout, retry!", "count", timeoutCount)
//node.Consensus.ResetState()
timeoutCount++
if newBlock != nil {
// Send the new block to Consensus so it can be confirmed.
node.BlockChannel <- newBlock
}
}
case <-readySignal: case <-readySignal:
for { for {
time.Sleep(PeriodicBlock) time.Sleep(PeriodicBlock)

Loading…
Cancel
Save