The core protocol of WoopChain
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
woop/consensus/fbft_log.go

426 lines
14 KiB

package consensus
import (
"encoding/binary"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
"github.com/pkg/errors"
"github.com/rs/zerolog"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
)
// FBFTMessage is the record of pbft messages received by a node during FBFT process
type FBFTMessage struct {
MessageType msg_pb.MessageType
ViewID uint64
BlockNum uint64
BlockHash common.Hash
Block []byte
SenderPubkeys []*bls.PublicKeyWrapper
SenderPubkeyBitmap []byte
LeaderPubkey *bls.PublicKeyWrapper
Payload []byte
ViewchangeSig *bls_core.Sign
ViewidSig *bls_core.Sign
M2AggSig *bls_core.Sign
M2Bitmap *bls_cosi.Mask
M3AggSig *bls_core.Sign
M3Bitmap *bls_cosi.Mask
Verified bool
}
// String ..
func (m *FBFTMessage) String() string {
sender := ""
for _, key := range m.SenderPubkeys {
if sender == "" {
sender = key.Bytes.Hex()
} else {
sender = sender + ";" + key.Bytes.Hex()
}
}
leader := ""
if m.LeaderPubkey != nil {
leader = m.LeaderPubkey.Bytes.Hex()
}
return fmt.Sprintf(
"[Type:%s ViewID:%d Num:%d BlockHash:%s Sender:%s Leader:%s]",
m.MessageType.String(),
m.ViewID,
m.BlockNum,
m.BlockHash.Hex(),
sender,
leader,
)
}
// HasSingleSender returns whether the message has only a single sender
func (m *FBFTMessage) HasSingleSender() bool {
return len(m.SenderPubkeys) == 1
}
const (
idTypeBytes = 4
idViewIDBytes = 8
idHashBytes = common.HashLength
idSenderBytes = bls.PublicKeySizeInBytes
idBytes = idTypeBytes + idViewIDBytes + idHashBytes + idSenderBytes
)
type (
// fbftMsgID is the id that uniquely defines a fbft message.
fbftMsgID [idBytes]byte
)
// id return the ID of the FBFT message which uniquely identifies a FBFT message.
// The ID is a concatenation of MsgType, BlockHash, and sender key
func (m *FBFTMessage) id() fbftMsgID {
var id fbftMsgID
binary.LittleEndian.PutUint32(id[:], uint32(m.MessageType))
binary.LittleEndian.PutUint64(id[idTypeBytes:], m.ViewID)
copy(id[idTypeBytes+idViewIDBytes:], m.BlockHash[:])
if m.HasSingleSender() {
copy(id[idTypeBytes+idViewIDBytes+idHashBytes:], m.SenderPubkeys[0].Bytes[:])
} else {
// Currently this case is not reachable as only validator will use id() func
// and validator won't receive message with multiple senders
copy(id[idTypeBytes+idViewIDBytes+idHashBytes:], m.SenderPubkeyBitmap[:])
}
return id
}
type FBFT interface {
GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage
IsBlockVerified(hash common.Hash) bool
DeleteBlockByNumber(number uint64)
GetBlockByHash(hash common.Hash) *types.Block
AddVerifiedMessage(msg *FBFTMessage)
AddBlock(block *types.Block)
GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage
}
// FBFTLog represents the log stored by a node during FBFT process
type FBFTLog struct {
blocks map[common.Hash]*types.Block // store blocks received in FBFT
verifiedBlocks map[common.Hash]struct{} // store block hashes for blocks that has already been verified
messages map[fbftMsgID]*FBFTMessage // store messages received in FBFT
verifyNewBlock func(*types.Block) error // block verification function
}
// NewFBFTLog returns new instance of FBFTLog
func NewFBFTLog(verifyNewBlock func(*types.Block) error) *FBFTLog {
pbftLog := FBFTLog{
blocks: make(map[common.Hash]*types.Block),
messages: make(map[fbftMsgID]*FBFTMessage),
verifiedBlocks: make(map[common.Hash]struct{}),
verifyNewBlock: verifyNewBlock,
}
return &pbftLog
}
// AddBlock add a new block into the log
func (log *FBFTLog) AddBlock(block *types.Block) {
log.blocks[block.Hash()] = block
}
func (log *FBFTLog) BlockVerify(block *types.Block) error {
return log.verifyNewBlock(block)
}
// MarkBlockVerified marks the block as verified
func (log *FBFTLog) MarkBlockVerified(block *types.Block) {
log.verifiedBlocks[block.Hash()] = struct{}{}
}
// IsBlockVerified checks whether the block is verified
func (log *FBFTLog) IsBlockVerified(hash common.Hash) bool {
_, exist := log.verifiedBlocks[hash]
return exist
}
// GetBlockByHash returns the block matches the given block hash
func (log *FBFTLog) GetBlockByHash(hash common.Hash) *types.Block {
return log.blocks[hash]
}
// GetBlocksByNumber returns the blocks match the given block number
func (log *FBFTLog) GetBlocksByNumber(number uint64) []*types.Block {
var blocks []*types.Block
for _, block := range log.blocks {
if block.NumberU64() == number {
blocks = append(blocks, block)
}
}
return blocks
}
// DeleteBlocksLessThan deletes blocks less than given block number
func (log *FBFTLog) deleteBlocksLessThan(number uint64) {
for h, block := range log.blocks {
if block.NumberU64() < number {
delete(log.blocks, h)
delete(log.verifiedBlocks, h)
}
}
}
// DeleteBlockByNumber deletes block of specific number
func (log *FBFTLog) DeleteBlockByNumber(number uint64) {
for h, block := range log.blocks {
if block.NumberU64() == number {
delete(log.blocks, h)
delete(log.verifiedBlocks, h)
}
}
}
// DeleteMessagesLessThan deletes messages less than given block number
func (log *FBFTLog) deleteMessagesLessThan(number uint64) {
for h, msg := range log.messages {
if msg.BlockNum < number {
delete(log.messages, h)
}
}
}
// AddVerifiedMessage adds a signature verified pbft message into the log
func (log *FBFTLog) AddVerifiedMessage(msg *FBFTMessage) {
msg.Verified = true
log.messages[msg.id()] = msg
}
// AddNotVerifiedMessage adds a not signature verified pbft message into the log
func (log *FBFTLog) AddNotVerifiedMessage(msg *FBFTMessage) {
msg.Verified = false
log.messages[msg.id()] = msg
}
// GetNotVerifiedCommittedMessages returns not verified committed pbft messages with matching blockNum, viewID and blockHash
func (log *FBFTLog) GetNotVerifiedCommittedMessages(blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage {
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == msg_pb.MessageType_COMMITTED && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash && !msg.Verified {
found = append(found, msg)
}
}
return found
}
// GetMessagesByTypeSeqViewHash returns pbft messages with matching type, blockNum, viewID and blockHash
func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage {
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == typ && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash && msg.Verified {
found = append(found, msg)
}
}
return found
}
// GetMessagesByTypeSeq returns pbft messages with matching type, blockNum
func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage {
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == typ && msg.BlockNum == blockNum && msg.Verified {
found = append(found, msg)
}
}
return found
}
// GetMessagesByTypeSeqHash returns pbft messages with matching type, blockNum
func (log *FBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage {
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == typ && msg.BlockNum == blockNum && msg.BlockHash == blockHash && msg.Verified {
found = append(found, msg)
}
}
return found
}
// HasMatchingAnnounce returns whether the log contains announce type message with given blockNum, blockHash
func (log *FBFTLog) HasMatchingAnnounce(blockNum uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqHash(msg_pb.MessageType_ANNOUNCE, blockNum, blockHash)
return len(found) >= 1
}
// HasMatchingViewAnnounce returns whether the log contains announce type message with given blockNum, viewID and blockHash
func (log *FBFTLog) HasMatchingViewAnnounce(blockNum uint64, viewID uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_ANNOUNCE, blockNum, viewID, blockHash)
return len(found) >= 1
}
// HasMatchingPrepared returns whether the log contains prepared message with given blockNum, viewID and blockHash
func (log *FBFTLog) HasMatchingPrepared(blockNum uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, blockNum, blockHash)
return len(found) >= 1
}
// HasMatchingViewPrepared returns whether the log contains prepared message with given blockNum, viewID and blockHash
func (log *FBFTLog) HasMatchingViewPrepared(blockNum uint64, viewID uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_PREPARED, blockNum, viewID, blockHash)
return len(found) >= 1
}
// GetMessagesByTypeSeqView returns pbft messages with matching type, blockNum and viewID
func (log *FBFTLog) GetMessagesByTypeSeqView(typ msg_pb.MessageType, blockNum uint64, viewID uint64) []*FBFTMessage {
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType != typ || msg.BlockNum != blockNum || msg.ViewID != viewID && msg.Verified {
continue
}
found = append(found, msg)
}
return found
}
// FindMessageByMaxViewID returns the message that has maximum ViewID
func (log *FBFTLog) FindMessageByMaxViewID(msgs []*FBFTMessage) *FBFTMessage {
if len(msgs) == 0 {
return nil
}
maxIdx := -1
maxViewID := uint64(0)
for k, v := range msgs {
if v.ViewID >= maxViewID {
maxIdx = k
maxViewID = v.ViewID
}
}
return msgs[maxIdx]
}
// ParseFBFTMessage parses FBFT message into FBFTMessage structure
func (consensus *Consensus) ParseFBFTMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.parseFBFTMessage(msg)
}
func (consensus *Consensus) parseFBFTMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
// TODO Have this do sanity checks on the message please
pbftMsg := FBFTMessage{}
pbftMsg.MessageType = msg.GetType()
consensusMsg := msg.GetConsensus()
pbftMsg.ViewID = consensusMsg.ViewId
pbftMsg.BlockNum = consensusMsg.BlockNum
copy(pbftMsg.BlockHash[:], consensusMsg.BlockHash[:])
pbftMsg.Payload = make([]byte, len(consensusMsg.Payload))
copy(pbftMsg.Payload[:], consensusMsg.Payload[:])
pbftMsg.Block = make([]byte, len(consensusMsg.Block))
copy(pbftMsg.Block[:], consensusMsg.Block[:])
pbftMsg.SenderPubkeyBitmap = make([]byte, len(consensusMsg.SenderPubkeyBitmap))
copy(pbftMsg.SenderPubkeyBitmap[:], consensusMsg.SenderPubkeyBitmap[:])
if len(consensusMsg.SenderPubkey) != 0 {
// If SenderPubKey is populated, treat it as a single key message
pubKey, err := bls_cosi.BytesToBLSPublicKey(consensusMsg.SenderPubkey)
if err != nil {
return nil, err
}
pbftMsg.SenderPubkeys = []*bls.PublicKeyWrapper{{Object: pubKey}}
copy(pbftMsg.SenderPubkeys[0].Bytes[:], consensusMsg.SenderPubkey[:])
} else {
// else, it should be a multi-key message where the bitmap is populated
pubKeys, err := consensus.multiSigBitmap.GetSignedPubKeysFromBitmap(pbftMsg.SenderPubkeyBitmap)
if err != nil {
return nil, err
}
pbftMsg.SenderPubkeys = pubKeys
}
return &pbftMsg, nil
}
var errFBFTLogNotFound = errors.New("FBFT log not found")
// GetCommittedBlockAndMsgsFromNumber get committed block and message starting from block number bn.
func (log *FBFTLog) GetCommittedBlockAndMsgsFromNumber(bn uint64, logger *zerolog.Logger) (*types.Block, *FBFTMessage, error) {
msgs := log.GetMessagesByTypeSeq(
msg_pb.MessageType_COMMITTED, bn,
)
if len(msgs) == 0 {
return nil, nil, errFBFTLogNotFound
}
if len(msgs) > 1 {
logger.Error().Int("numMsgs", len(msgs)).Err(errors.New("DANGER!!! multiple COMMITTED message in PBFT log observed"))
}
for i := range msgs {
block := log.GetBlockByHash(msgs[i].BlockHash)
if block == nil {
logger.Debug().
Uint64("blockNum", msgs[i].BlockNum).
Uint64("viewID", msgs[i].ViewID).
Str("blockHash", msgs[i].BlockHash.Hex()).
Err(errors.New("failed finding a matching block for committed message"))
continue
}
return block, msgs[i], nil
}
return nil, nil, errFBFTLogNotFound
}
// PruneCacheBeforeBlock prune all blocks before bn
func (log *FBFTLog) PruneCacheBeforeBlock(bn uint64) {
log.deleteBlocksLessThan(bn - 1)
log.deleteMessagesLessThan(bn - 1)
}
type threadsafeFBFTLog struct {
log *FBFTLog
mu *sync.RWMutex
}
func (t threadsafeFBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage {
t.mu.RLock()
defer t.mu.RUnlock()
return t.log.GetMessagesByTypeSeq(typ, blockNum)
}
func (t threadsafeFBFTLog) IsBlockVerified(hash common.Hash) bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.log.IsBlockVerified(hash)
}
func (t threadsafeFBFTLog) DeleteBlockByNumber(number uint64) {
t.mu.Lock()
defer t.mu.Unlock()
t.log.DeleteBlockByNumber(number)
}
func (t threadsafeFBFTLog) GetBlockByHash(hash common.Hash) *types.Block {
t.mu.RLock()
defer t.mu.RUnlock()
return t.log.GetBlockByHash(hash)
}
func (t threadsafeFBFTLog) AddVerifiedMessage(msg *FBFTMessage) {
t.mu.Lock()
defer t.mu.Unlock()
t.log.AddVerifiedMessage(msg)
}
func (t threadsafeFBFTLog) AddBlock(block *types.Block) {
t.mu.Lock()
defer t.mu.Unlock()
t.log.AddBlock(block)
}
func (t threadsafeFBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage {
t.mu.RLock()
defer t.mu.RUnlock()
return t.log.GetMessagesByTypeSeqHash(typ, blockNum, blockHash)
}