|
|
|
@ -2,6 +2,7 @@ package consensus |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"sync" |
|
|
|
|
"sync/atomic" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
msg_pb "github.com/harmony-one/harmony/api/proto/message" |
|
|
|
@ -18,7 +19,6 @@ const ( |
|
|
|
|
// MessageSender is the wrapper object that controls how a consensus message is sent
|
|
|
|
|
type MessageSender struct { |
|
|
|
|
blockNum uint64 // The current block number at consensus
|
|
|
|
|
blockNumMutex sync.Mutex |
|
|
|
|
messagesToRetry sync.Map |
|
|
|
|
// The p2p host used to send/receive p2p messages
|
|
|
|
|
host p2p.Host |
|
|
|
@ -44,9 +44,7 @@ func NewMessageSender(host p2p.Host) *MessageSender { |
|
|
|
|
|
|
|
|
|
// Reset resets the sender's state for new block
|
|
|
|
|
func (sender *MessageSender) Reset(blockNum uint64) { |
|
|
|
|
sender.blockNumMutex.Lock() |
|
|
|
|
sender.blockNum = blockNum |
|
|
|
|
sender.blockNumMutex.Unlock() |
|
|
|
|
atomic.StoreUint64(&sender.blockNum, blockNum) |
|
|
|
|
sender.StopAllRetriesExceptCommitted() |
|
|
|
|
sender.messagesToRetry.Range(func(key interface{}, value interface{}) bool { |
|
|
|
|
if msgRetry, ok := value.(*MessageRetry); ok { |
|
|
|
@ -95,13 +93,11 @@ func (sender *MessageSender) Retry(msgRetry *MessageRetry) { |
|
|
|
|
msgRetry.isActiveMutex.Unlock() |
|
|
|
|
|
|
|
|
|
if msgRetry.msgType != msg_pb.MessageType_COMMITTED { |
|
|
|
|
sender.blockNumMutex.Lock() |
|
|
|
|
if msgRetry.blockNum < sender.blockNum { |
|
|
|
|
sender.blockNumMutex.Unlock() |
|
|
|
|
senderBlockNum := atomic.LoadUint64(&sender.blockNum) |
|
|
|
|
if msgRetry.blockNum < senderBlockNum { |
|
|
|
|
// Block already moved ahead, no need to retry old block's messages
|
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
sender.blockNumMutex.Unlock() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
msgRetry.retryCount++ |
|
|
|
|