[multi-bls] validator sign and send message in parallel (#3054)

pull/3061/head
Ganesha Upadhyaya 5 years ago committed by GitHub
parent cc965edba4
commit 3c6b574b08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 87
      consensus/validator.go

@ -3,6 +3,7 @@ package consensus
import ( import (
"bytes" "bytes"
"encoding/hex" "encoding/hex"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -58,30 +59,36 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
} }
func (consensus *Consensus) prepare() { func (consensus *Consensus) prepare() {
var wg sync.WaitGroup
wg.Add(len(consensus.PubKey.PublicKey))
groupID := []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))} groupID := []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}
for i, key := range consensus.PubKey.PublicKey { for i, key := range consensus.PubKey.PublicKey {
networkMessage, err := consensus.construct(msg_pb.MessageType_PREPARE, nil, key, consensus.priKey.PrivateKey[i]) go func(i int) {
if err != nil { defer wg.Done()
consensus.getLogger().Err(err). networkMessage, err := consensus.construct(msg_pb.MessageType_PREPARE, nil, key, consensus.priKey.PrivateKey[i])
Str("message-type", msg_pb.MessageType_PREPARE.String()). if err != nil {
Msg("could not construct message") consensus.getLogger().Err(err).
return Str("message-type", msg_pb.MessageType_PREPARE.String()).
} Msg("could not construct message")
return
}
// TODO: this will not return immediatey, may block // TODO: this will not return immediatey, may block
if consensus.current.Mode() != Listening { if consensus.current.Mode() != Listening {
if err := consensus.msgSender.SendWithoutRetry( if err := consensus.msgSender.SendWithoutRetry(
groupID, groupID,
p2p.ConstructMessage(networkMessage.Bytes), p2p.ConstructMessage(networkMessage.Bytes),
); err != nil { ); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message")
} else { } else {
consensus.getLogger().Info(). consensus.getLogger().Info().
Str("blockHash", hex.EncodeToString(consensus.blockHash[:])). Str("blockHash", hex.EncodeToString(consensus.blockHash[:])).
Msg("[OnAnnounce] Sent Prepare Message!!") Msg("[OnAnnounce] Sent Prepare Message!!")
}
} }
} }(i)
} }
wg.Wait()
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Str("From", consensus.phase.String()). Str("From", consensus.phase.String()).
Str("To", FBFTPrepare.String()). Str("To", FBFTPrepare.String()).
@ -207,27 +214,33 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
groupID := []nodeconfig.GroupID{ groupID := []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
} }
var wg sync.WaitGroup
wg.Add(len(consensus.PubKey.PublicKey))
for i, key := range consensus.PubKey.PublicKey { for i, key := range consensus.PubKey.PublicKey {
networkMessage, _ := consensus.construct( go func(i int) {
msg_pb.MessageType_COMMIT, defer wg.Done()
commitPayload, networkMessage, _ := consensus.construct(
key, consensus.priKey.PrivateKey[i], msg_pb.MessageType_COMMIT,
) commitPayload,
key, consensus.priKey.PrivateKey[i],
if consensus.current.Mode() != Listening { )
if err := consensus.msgSender.SendWithoutRetry(
groupID, if consensus.current.Mode() != Listening {
p2p.ConstructMessage(networkMessage.Bytes), if err := consensus.msgSender.SendWithoutRetry(
); err != nil { groupID,
consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") p2p.ConstructMessage(networkMessage.Bytes),
} else { ); err != nil {
consensus.getLogger().Info(). consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!")
Uint64("blockNum", consensus.blockNum). } else {
Hex("blockHash", consensus.blockHash[:]). consensus.getLogger().Info().
Msg("[OnPrepared] Sent Commit Message!!") Uint64("blockNum", consensus.blockNum).
Hex("blockHash", consensus.blockHash[:]).
Msg("[OnPrepared] Sent Commit Message!!")
}
} }
} }(i)
} }
wg.Wait()
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Str("From", consensus.phase.String()). Str("From", consensus.phase.String()).
Str("To", FBFTCommit.String()). Str("To", FBFTCommit.String()).

Loading…
Cancel
Save