diff --git a/consensus/validator.go b/consensus/validator.go index c52b291ff..70d2c01d9 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -3,6 +3,7 @@ package consensus import ( "bytes" "encoding/hex" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -58,30 +59,36 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { } func (consensus *Consensus) prepare() { + var wg sync.WaitGroup + wg.Add(len(consensus.PubKey.PublicKey)) groupID := []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))} for i, key := range consensus.PubKey.PublicKey { - networkMessage, err := consensus.construct(msg_pb.MessageType_PREPARE, nil, key, consensus.priKey.PrivateKey[i]) - if err != nil { - consensus.getLogger().Err(err). - Str("message-type", msg_pb.MessageType_PREPARE.String()). - Msg("could not construct message") - return - } + go func(i int) { + defer wg.Done() + networkMessage, err := consensus.construct(msg_pb.MessageType_PREPARE, nil, key, consensus.priKey.PrivateKey[i]) + if err != nil { + consensus.getLogger().Err(err). + Str("message-type", msg_pb.MessageType_PREPARE.String()). + Msg("could not construct message") + return + } - // TODO: this will not return immediatey, may block - if consensus.current.Mode() != Listening { - if err := consensus.msgSender.SendWithoutRetry( - groupID, - p2p.ConstructMessage(networkMessage.Bytes), - ); err != nil { - consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") - } else { - consensus.getLogger().Info(). - Str("blockHash", hex.EncodeToString(consensus.blockHash[:])). - Msg("[OnAnnounce] Sent Prepare Message!!") + // TODO: this will not return immediatey, may block + if consensus.current.Mode() != Listening { + if err := consensus.msgSender.SendWithoutRetry( + groupID, + p2p.ConstructMessage(networkMessage.Bytes), + ); err != nil { + consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") + } else { + consensus.getLogger().Info(). + Str("blockHash", hex.EncodeToString(consensus.blockHash[:])). + Msg("[OnAnnounce] Sent Prepare Message!!") + } } - } + }(i) } + wg.Wait() consensus.getLogger().Debug(). Str("From", consensus.phase.String()). Str("To", FBFTPrepare.String()). @@ -207,27 +214,33 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { groupID := []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), } + var wg sync.WaitGroup + wg.Add(len(consensus.PubKey.PublicKey)) for i, key := range consensus.PubKey.PublicKey { - networkMessage, _ := consensus.construct( - msg_pb.MessageType_COMMIT, - commitPayload, - key, consensus.priKey.PrivateKey[i], - ) - - if consensus.current.Mode() != Listening { - if err := consensus.msgSender.SendWithoutRetry( - groupID, - p2p.ConstructMessage(networkMessage.Bytes), - ); err != nil { - consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") - } else { - consensus.getLogger().Info(). - Uint64("blockNum", consensus.blockNum). - Hex("blockHash", consensus.blockHash[:]). - Msg("[OnPrepared] Sent Commit Message!!") + go func(i int) { + defer wg.Done() + networkMessage, _ := consensus.construct( + msg_pb.MessageType_COMMIT, + commitPayload, + key, consensus.priKey.PrivateKey[i], + ) + + if consensus.current.Mode() != Listening { + if err := consensus.msgSender.SendWithoutRetry( + groupID, + p2p.ConstructMessage(networkMessage.Bytes), + ); err != nil { + consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") + } else { + consensus.getLogger().Info(). + Uint64("blockNum", consensus.blockNum). + Hex("blockHash", consensus.blockHash[:]). + Msg("[OnPrepared] Sent Commit Message!!") + } } - } + }(i) } + wg.Wait() consensus.getLogger().Debug(). Str("From", consensus.phase.String()). Str("To", FBFTCommit.String()).