add Delayed sending

pull/3405/head
Rongjian Lan 4 years ago
parent ceaf76e021
commit f91d781fe4
  1. 17
      consensus/consensus_msg_sender.go
  2. 37
      consensus/consensus_v2.go
  3. 2
      consensus/leader.go
  4. 1
      consensus/threshold.go
  5. 2
      consensus/view_change.go
  6. 9
      node/node_newblock.go

@ -56,7 +56,7 @@ func (sender *MessageSender) Reset(blockNum uint64) {
} }
// SendWithRetry sends message with retry logic. // SendWithRetry sends message with retry logic.
func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte, immediate bool) error { func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte) error {
if sender.retryTimes != 0 { if sender.retryTimes != 0 {
msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0} msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0}
atomic.StoreUint32(&msgRetry.isActive, 1) atomic.StoreUint32(&msgRetry.isActive, 1)
@ -65,10 +65,19 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa
sender.Retry(&msgRetry) sender.Retry(&msgRetry)
}() }()
} }
if immediate { return sender.host.SendMessageToGroups(groups, p2pMsg)
return sender.host.SendMessageToGroups(groups, p2pMsg) }
// DelayedSendWithRetry is similar to SendWithRetry but without the initial message sending but only retries.
func (sender *MessageSender) DelayedSendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte) {
if sender.retryTimes != 0 {
msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0}
atomic.StoreUint32(&msgRetry.isActive, 1)
sender.messagesToRetry.Store(msgType, &msgRetry)
go func() {
sender.Retry(&msgRetry)
}()
} }
return nil
} }
// SendWithoutRetry sends message without retry logic. // SendWithoutRetry sends message without retry logic.

@ -161,23 +161,33 @@ func (consensus *Consensus) finalCommit() {
// Note: leader already sent 67% commit in preCommit. The 100% commit won't be sent immediately // Note: leader already sent 67% commit in preCommit. The 100% commit won't be sent immediately
// to save network traffic. It will only be sent in retry if consensus doesn't move forward. // to save network traffic. It will only be sent in retry if consensus doesn't move forward.
// Or if the leader is changed for next block, the 100% committed sig will be sent to the next leader immediately. // Or if the leader is changed for next block, the 100% committed sig will be sent to the next leader immediately.
sendImmediately := false
if !consensus.IsLeader() || block.IsLastBlockInEpoch() { if !consensus.IsLeader() || block.IsLastBlockInEpoch() {
sendImmediately = true // send immediately
} if err := consensus.msgSender.SendWithRetry(
if err := consensus.msgSender.SendWithRetry( block.NumberU64(),
block.NumberU64(), msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), },
}, p2p.ConstructMessage(msgToSend)); err != nil {
p2p.ConstructMessage(msgToSend), consensus.getLogger().Warn().Err(err).Msg("[finalCommit] Cannot send committed message")
sendImmediately); err != nil { } else {
consensus.getLogger().Warn().Err(err).Msg("[finalCommit] Cannot send committed message") consensus.getLogger().Info().
Hex("blockHash", curBlockHash[:]).
Uint64("blockNum", consensus.blockNum).
Msg("[finalCommit] Sent Committed Message")
}
} else { } else {
// delayed send
consensus.msgSender.DelayedSendWithRetry(
block.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend))
consensus.getLogger().Info(). consensus.getLogger().Info().
Hex("blockHash", curBlockHash[:]). Hex("blockHash", curBlockHash[:]).
Uint64("blockNum", consensus.blockNum). Uint64("blockNum", consensus.blockNum).
Msg("[finalCommit] Sent Committed Message") Msg("[finalCommit] Queued Committed Message")
} }
// Dump new block into level db // Dump new block into level db
@ -550,8 +560,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
}, },
p2p.ConstructMessage(msgToSend), p2p.ConstructMessage(msgToSend)); err != nil {
true); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message") consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message")
} else { } else {
consensus.getLogger().Info(). consensus.getLogger().Info().

@ -80,7 +80,7 @@ func (consensus *Consensus) announce(block *types.Block) {
if err := consensus.msgSender.SendWithRetry( if err := consensus.msgSender.SendWithRetry(
consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{ consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
}, p2p.ConstructMessage(msgToSend), true); err != nil { }, p2p.ConstructMessage(msgToSend)); err != nil {
consensus.getLogger().Warn(). consensus.getLogger().Warn().
Str("groupID", string(nodeconfig.NewGroupIDByShardID( Str("groupID", string(nodeconfig.NewGroupIDByShardID(
nodeconfig.ShardID(consensus.ShardID), nodeconfig.ShardID(consensus.ShardID),

@ -74,7 +74,6 @@ func (consensus *Consensus) didReachPrepareQuorum() error {
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
}, },
p2p.ConstructMessage(msgToSend), p2p.ConstructMessage(msgToSend),
true,
); err != nil { ); err != nil {
consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message") consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message")
} else { } else {

@ -265,7 +265,6 @@ func (consensus *Consensus) startViewChange() {
[]nodeconfig.GroupID{ []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))},
p2p.ConstructMessage(msgToSend), p2p.ConstructMessage(msgToSend),
true,
); err != nil { ); err != nil {
consensus.getLogger().Err(err). consensus.getLogger().Err(err).
Msg("[startViewChange] could not send out the ViewChange message") Msg("[startViewChange] could not send out the ViewChange message")
@ -295,7 +294,6 @@ func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.Pri
[]nodeconfig.GroupID{ []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))},
p2p.ConstructMessage(msgToSend), p2p.ConstructMessage(msgToSend),
true,
); err != nil { ); err != nil {
return errors.New("failed to send out the NewView message") return errors.New("failed to send out the NewView message")
} }

@ -44,7 +44,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
Msg("Consensus new block proposal: STOPPED!") Msg("Consensus new block proposal: STOPPED!")
return return
case proposalType := <-readySignal: case proposalType := <-readySignal:
retryCount := 3 retryCount := 0
for node.Consensus != nil && node.Consensus.IsLeader() { for node.Consensus != nil && node.Consensus.IsLeader() {
time.Sleep(SleepPeriod) time.Sleep(SleepPeriod)
utils.Logger().Info(). utils.Logger().Info().
@ -97,9 +97,10 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
node.BlockChannel <- newBlock node.BlockChannel <- newBlock
break break
} else { } else {
utils.Logger().Err(err).Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!") retryCount++
retryCount-- utils.Logger().Err(err).Int("retryCount", retryCount).
if retryCount == 0 { Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!")
if retryCount > 3 {
// break to avoid repeated failures // break to avoid repeated failures
break break
} }

Loading…
Cancel
Save