From 87dd2cd2cddfebfac140ade53a186fb445c64977 Mon Sep 17 00:00:00 2001 From: chao Date: Sat, 23 Nov 2019 19:30:57 -0800 Subject: [PATCH] Simplified CrossLink structure; Remove temp flag of Read/Write crosslink --- api/proto/node/node.go | 18 ++- core/blockchain.go | 36 +++--- core/rawdb/accessors_chain.go | 27 ++-- core/rawdb/schema.go | 12 +- core/types/crosslink.go | 55 ++++---- node/node.go | 6 +- node/node_cross_shard.go | 236 +++++++++------------------------- node/node_handler.go | 27 ++-- node/node_newblock.go | 9 +- node/worker/worker.go | 1 + 10 files changed, 154 insertions(+), 273 deletions(-) diff --git a/api/proto/node/node.go b/api/proto/node/node.go index d8c4b5024..1d4f5421b 100644 --- a/api/proto/node/node.go +++ b/api/proto/node/node.go @@ -97,8 +97,8 @@ type BlockMessageType int const ( Sync BlockMessageType = iota - Header // used for crosslink from beacon chain to shard chain - Receipt // cross-shard transaction receipts + CrossLink // used for crosslink from beacon chain to shard chain + Receipt // cross-shard transaction receipts ) // SerializeBlockchainSyncMessage serializes BlockchainSyncMessage. @@ -149,14 +149,18 @@ func ConstructBlocksSyncMessage(blocks []*types.Block) []byte { return byteBuffer.Bytes() } -// ConstructCrossLinkHeadersMessage constructs cross link header message to send to beacon chain -func ConstructCrossLinkHeadersMessage(headers []*block.Header) []byte { +// ConstructCrossLinkMessage constructs cross link message to send to beacon chain +func ConstructCrossLinkMessage(headers []*block.Header) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) byteBuffer.WriteByte(byte(Block)) - byteBuffer.WriteByte(byte(Header)) + byteBuffer.WriteByte(byte(CrossLink)) - headersData, _ := rlp.EncodeToBytes(headers) - byteBuffer.Write(headersData) + crosslinks := []types.CrossLink{} + for _, header := range headers { + crosslinks = append(crosslinks, types.NewCrossLink(header)) + } + crosslinksData, _ := rlp.EncodeToBytes(crosslinks) + byteBuffer.Write(crosslinksData) return byteBuffer.Bytes() } diff --git a/core/blockchain.go b/core/blockchain.go index 4abbfd283..5694ce310 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1094,7 +1094,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. continue } shardReceipts := GetToShardReceipts(cxReceipts, uint32(i)) - err := rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts, false) + err := rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts) if err != nil { utils.Logger().Debug().Err(err).Interface("shardReceipts", shardReceipts).Int("toShardID", i).Msg("WriteCXReceipts cannot write into database") return NonStatTy, err @@ -1202,10 +1202,9 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. return NonStatTy, errors.New("proposed cross links are not sorted") } for _, crossLink := range *crossLinks { - if err := bc.WriteCrossLinks(types.CrossLinks{crossLink}, false); err == nil { - utils.Logger().Info().Uint64("blockNum", crossLink.BlockNum().Uint64()).Uint32("shardID", crossLink.ShardID()).Msg("[InsertChain] Cross Link Added to Beaconchain") + if err := bc.WriteCrossLinks(types.CrossLinks{crossLink}); err == nil { + utils.Logger().Info().Uint64("blockNum", crossLink.BlockNum()).Uint32("shardID", crossLink.ShardID()).Msg("[InsertChain] Cross Link Added to Beaconchain") } - bc.DeleteCrossLinks(types.CrossLinks{crossLink}, true) bc.WriteShardLastCrossLink(crossLink.ShardID(), crossLink) } } @@ -2093,31 +2092,28 @@ func (bc *BlockChain) WriteEpochVdfBlockNum(epoch *big.Int, blockNum *big.Int) e } // WriteCrossLinks saves the hashes of crosslinks by shardID and blockNum combination key -// temp=true is to write the just received cross link that's not committed into blockchain with consensus -func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink, temp bool) error { +func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink) error { var err error for i := 0; i < len(cls); i++ { cl := cls[i] - err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum().Uint64(), cl.Serialize(), temp) + err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum(), cl.Serialize()) } return err } // DeleteCrossLinks removes the hashes of crosslinks by shardID and blockNum combination key -// temp=true is to write the just received cross link that's not committed into blockchain with consensus -func (bc *BlockChain) DeleteCrossLinks(cls []types.CrossLink, temp bool) error { +func (bc *BlockChain) DeleteCrossLinks(cls []types.CrossLink) error { var err error for i := 0; i < len(cls); i++ { cl := cls[i] - err = rawdb.DeleteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum().Uint64(), temp) + err = rawdb.DeleteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum()) } return err } // ReadCrossLink retrieves crosslink given shardID and blockNum. -// temp=true is to retrieve the just received cross link that's not committed into blockchain with consensus -func (bc *BlockChain) ReadCrossLink(shardID uint32, blockNum uint64, temp bool) (*types.CrossLink, error) { - bytes, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum, temp) +func (bc *BlockChain) ReadCrossLink(shardID uint32, blockNum uint64) (*types.CrossLink, error) { + bytes, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum) if err != nil { return nil, err } @@ -2177,9 +2173,8 @@ func GetToShardReceipts(cxReceipts types.CXReceipts, shardID uint32) types.CXRec } // ReadCXReceipts retrieves the cross shard transaction receipts of a given shard -// temp=true is to retrieve the just received receipts that's not committed into blockchain with consensus -func (bc *BlockChain) ReadCXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash, temp bool) (types.CXReceipts, error) { - cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, blockNum, blockHash, temp) +func (bc *BlockChain) ReadCXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash) (types.CXReceipts, error) { + cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, blockNum, blockHash) if err != nil || len(cxs) == 0 { return nil, err } @@ -2187,15 +2182,14 @@ func (bc *BlockChain) ReadCXReceipts(shardID uint32, blockNum uint64, blockHash } // WriteCXReceipts saves the cross shard transaction receipts of a given shard -// temp=true is to store the just received receipts that's not committed into blockchain with consensus -func (bc *BlockChain) WriteCXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash, receipts types.CXReceipts, temp bool) error { - return rawdb.WriteCXReceipts(bc.db, shardID, blockNum, blockHash, receipts, temp) +func (bc *BlockChain) WriteCXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash, receipts types.CXReceipts) error { + return rawdb.WriteCXReceipts(bc.db, shardID, blockNum, blockHash, receipts) } // CXMerkleProof calculates the cross shard transaction merkle proof of a given destination shard func (bc *BlockChain) CXMerkleProof(shardID uint32, block *types.Block) (*types.CXMerkleProof, error) { proof := &types.CXMerkleProof{BlockNum: block.Number(), BlockHash: block.Hash(), ShardID: block.ShardID(), CXReceiptHash: block.Header().OutgoingReceiptHash(), CXShardHashes: []common.Hash{}, ShardIDs: []uint32{}} - cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, block.NumberU64(), block.Hash(), false) + cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, block.NumberU64(), block.Hash()) if err != nil || cxs == nil { return nil, err @@ -2206,7 +2200,7 @@ func (bc *BlockChain) CXMerkleProof(shardID uint32, block *types.Block) (*types. shardNum := int(shardingConfig.NumShards()) for i := 0; i < shardNum; i++ { - receipts, err := bc.ReadCXReceipts(uint32(i), block.NumberU64(), block.Hash(), false) + receipts, err := bc.ReadCXReceipts(uint32(i), block.NumberU64(), block.Hash()) if err != nil || len(receipts) == 0 { continue } else { diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 35492dd0d..1a26fcbfc 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -515,18 +515,18 @@ func WriteEpochVdfBlockNum(db DatabaseWriter, epoch *big.Int, data []byte) error } // ReadCrossLinkShardBlock retrieves the blockHash given shardID and blockNum -func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64, temp bool) ([]byte, error) { - return db.Get(crosslinkKey(shardID, blockNum, temp)) +func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64) ([]byte, error) { + return db.Get(crosslinkKey(shardID, blockNum)) } // WriteCrossLinkShardBlock stores the blockHash given shardID and blockNum -func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte, temp bool) error { - return db.Put(crosslinkKey(shardID, blockNum, temp), data) +func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte) error { + return db.Put(crosslinkKey(shardID, blockNum), data) } // DeleteCrossLinkShardBlock deletes the blockHash given shardID and blockNum -func DeleteCrossLinkShardBlock(db DatabaseDeleter, shardID uint32, blockNum uint64, temp bool) error { - return db.Delete(crosslinkKey(shardID, blockNum, temp)) +func DeleteCrossLinkShardBlock(db DatabaseDeleter, shardID uint32, blockNum uint64) error { + return db.Delete(crosslinkKey(shardID, blockNum)) } // ReadShardLastCrossLink read the last cross link of a shard @@ -540,8 +540,8 @@ func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) err } // ReadCXReceipts retrieves all the transactions of receipts given destination shardID, number and blockHash -func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash, temp bool) (types.CXReceipts, error) { - data, err := db.Get(cxReceiptKey(shardID, number, hash, temp)) +func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) (types.CXReceipts, error) { + data, err := db.Get(cxReceiptKey(shardID, number, hash)) if err != nil || len(data) == 0 { utils.Logger().Info().Err(err).Uint64("number", number).Int("dataLen", len(data)).Msg("ReadCXReceipts") return nil, err @@ -555,25 +555,18 @@ func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash commo } // WriteCXReceipts stores all the transaction receipts given destination shardID, blockNumber and blockHash -func WriteCXReceipts(db DatabaseWriter, shardID uint32, number uint64, hash common.Hash, receipts types.CXReceipts, temp bool) error { +func WriteCXReceipts(db DatabaseWriter, shardID uint32, number uint64, hash common.Hash, receipts types.CXReceipts) error { bytes, err := rlp.EncodeToBytes(receipts) if err != nil { utils.Logger().Error().Msg("[WriteCXReceipts] Failed to encode cross shard tx receipts") } // Store the receipt slice - if err := db.Put(cxReceiptKey(shardID, number, hash, temp), bytes); err != nil { + if err := db.Put(cxReceiptKey(shardID, number, hash), bytes); err != nil { utils.Logger().Error().Msg("[WriteCXReceipts] Failed to store cxreceipts") } return err } -// DeleteCXReceipts removes all receipt data associated with a block hash. -func DeleteCXReceipts(db DatabaseDeleter, shardID uint32, number uint64, hash common.Hash, temp bool) { - if err := db.Delete(cxReceiptKey(shardID, number, hash, temp)); err != nil { - utils.Logger().Error().Msg("Failed to delete cross shard tx receipts") - } -} - // ReadCXReceiptsProofSpent check whether a CXReceiptsProof is unspent func ReadCXReceiptsProofSpent(db DatabaseReader, shardID uint32, number uint64) (byte, error) { data, err := db.Get(cxReceiptSpentKey(shardID, number)) diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 4c127751a..4b0e466ad 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -63,13 +63,11 @@ var ( shardLastCrosslinkPrefix = []byte("lcl") // prefix for shard last crosslink crosslinkPrefix = []byte("cl") // prefix for crosslink - tempCrosslinkPrefix = []byte("tcl") // prefix for tempCrosslink delegatorValidatorListPrefix = []byte("dvl") // prefix for delegator's validator list // TODO: shorten the key prefix so we don't waste db space cxReceiptPrefix = []byte("cxReceipt") // prefix for cross shard transaction receipt - tempCxReceiptPrefix = []byte("tempCxReceipt") // prefix for temporary cross shard transaction receipt cxReceiptHashPrefix = []byte("cxReceiptHash") // prefix for cross shard transaction receipt hash cxReceiptSpentPrefix = []byte("cxReceiptSpent") // prefix for indicator of unspent of cxReceiptsProof cxReceiptUnspentCheckpointPrefix = []byte("cxReceiptUnspentCheckpoint") // prefix for cxReceiptsProof unspent checkpoint @@ -195,11 +193,8 @@ func shardLastCrosslinkKey(shardID uint32) []byte { return key } -func crosslinkKey(shardID uint32, blockNum uint64, temp bool) []byte { +func crosslinkKey(shardID uint32, blockNum uint64) []byte { prefix := crosslinkPrefix - if temp { - prefix = tempCrosslinkPrefix - } sbKey := make([]byte, 12) binary.BigEndian.PutUint32(sbKey, shardID) binary.BigEndian.PutUint64(sbKey[4:], blockNum) @@ -212,11 +207,8 @@ func delegatorValidatorListKey(delegator common.Address) []byte { } // cxReceiptKey = cxReceiptsPrefix + shardID + num (uint64 big endian) + hash -func cxReceiptKey(shardID uint32, number uint64, hash common.Hash, temp bool) []byte { +func cxReceiptKey(shardID uint32, number uint64, hash common.Hash) []byte { prefix := cxReceiptPrefix - if temp { - prefix = tempCxReceiptPrefix - } sKey := make([]byte, 4) binary.BigEndian.PutUint32(sKey, shardID) tmp := append(prefix, sKey...) diff --git a/core/types/crosslink.go b/core/types/crosslink.go index b6436b2f6..9b1915a72 100644 --- a/core/types/crosslink.go +++ b/core/types/crosslink.go @@ -4,51 +4,62 @@ import ( "math/big" "sort" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/common" - + "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/block" ) // CrossLink is only used on beacon chain to store the hash links from other shards +// signature and bitmap correspond to |blockNumber|parentHash| byte array +// Captial to enable rlp encoding type CrossLink struct { - ChainHeader *block.Header + ParentHashF common.Hash + BlockNumberF *big.Int + SignatureF [96]byte //aggregated signature + BitmapF []byte //corresponding bitmap mask for agg signature + ShardIDF uint32 //need first verify signature on |blockNumber|blockHash| is correct + EpochF *big.Int //need first verify signature on |blockNumber|blockHash| is correct } // NewCrossLink returns a new cross link object func NewCrossLink(header *block.Header) CrossLink { - return CrossLink{header} + parentBlockNum := header.Number().Sub(header.Number(), big.NewInt(1)) + return CrossLink{header.ParentHash(), parentBlockNum, header.LastCommitSignature(), header.LastCommitBitmap(), header.ShardID(), header.Epoch()} } -// Header returns header -func (cl CrossLink) Header() *block.Header { - return cl.ChainHeader +// ShardID returns shardID +func (cl CrossLink) ShardID() uint32 { + return cl.ShardIDF } // ShardID returns shardID -func (cl CrossLink) ShardID() uint32 { - return cl.ChainHeader.ShardID() +func (cl CrossLink) Epoch() *big.Int { + return cl.EpochF +} + +// Number returns blockNum with big.Int format +func (cl CrossLink) Number() *big.Int { + return cl.BlockNumberF } // BlockNum returns blockNum -func (cl CrossLink) BlockNum() *big.Int { - return cl.ChainHeader.Number() +func (cl CrossLink) BlockNum() uint64 { + return cl.BlockNumberF.Uint64() } // Hash returns hash -func (cl CrossLink) Hash() common.Hash { - return cl.ChainHeader.Hash() +func (cl CrossLink) ParentHash() common.Hash { + return cl.ParentHashF } -// StateRoot returns hash of state root -func (cl CrossLink) StateRoot() common.Hash { - return cl.ChainHeader.Root() +// Bitmap returns bitmap +func (cl CrossLink) Bitmap() []byte { + return cl.BitmapF } -// OutgoingReceiptsRoot returns hash of cross shard receipts -func (cl CrossLink) OutgoingReceiptsRoot() common.Hash { - return cl.ChainHeader.OutgoingReceiptHash() +// Signature returns aggregated signature +func (cl CrossLink) Signature() [96]byte { + return cl.SignatureF } // Serialize returns bytes of cross link rlp-encoded content @@ -73,13 +84,13 @@ type CrossLinks []CrossLink // Sort crosslinks by shardID and then by blockNum func (cls CrossLinks) Sort() { sort.Slice(cls, func(i, j int) bool { - return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].BlockNum().Cmp(cls[j].BlockNum()) < 0) + return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].Number().Cmp(cls[j].Number()) < 0) }) } // IsSorted checks whether the cross links are sorted func (cls CrossLinks) IsSorted() bool { return sort.SliceIsSorted(cls, func(i, j int) bool { - return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].BlockNum().Cmp(cls[j].BlockNum()) < 0) + return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].Number().Cmp(cls[j].Number()) < 0) }) } diff --git a/node/node.go b/node/node.go index 7401e3daf..a25695bde 100644 --- a/node/node.go +++ b/node/node.go @@ -15,7 +15,6 @@ import ( "github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/api/service/syncing" "github.com/harmony-one/harmony/api/service/syncing/downloader" - "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus/reward" "github.com/harmony-one/harmony/core" @@ -108,12 +107,13 @@ type Node struct { ConfirmedBlockChannel chan *types.Block // The channel to send confirmed blocks BeaconBlockChannel chan *types.Block // The channel to send beacon blocks for non-beaconchain nodes DRand *drand.DRand // The instance for distributed randomness protocol - pendingCrossLinks []*block.Header - pendingClMutex sync.Mutex pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus pendingCXMutex sync.Mutex + pendingCrossLinks []types.CrossLink + pendingCLMutex sync.Mutex + // Shard databases shardChains shardchain.Collection diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 0e97e877d..61e92636a 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -1,14 +1,13 @@ package node import ( + "bytes" "encoding/binary" - "errors" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/bls/ffi/go/bls" proto_node "github.com/harmony-one/harmony/api/proto/node" - "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" bls_cosi "github.com/harmony-one/harmony/crypto/bls" @@ -54,7 +53,7 @@ func (node *Node) BroadcastCXReceiptsWithShardID(block *types.Block, commitSig [ myShardID := node.Consensus.ShardID utils.Logger().Info().Uint32("toShardID", toShardID).Uint32("myShardID", myShardID).Uint64("blockNum", block.NumberU64()).Msg("[BroadcastCXReceiptsWithShardID]") - cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash(), false) + cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash()) if err != nil || len(cxReceipts) == 0 { utils.Logger().Info().Err(err).Uint32("ToShardID", toShardID).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceiptsWithShardID] No ReadCXReceipts found") return @@ -120,124 +119,75 @@ func (node *Node) VerifyBlockCrossLinks(block *types.Block) error { ) } - firstCrossLinkBlock := core.EpochFirstBlock(node.Blockchain().Config().CrossLinkEpoch) - - for i, crossLink := range *crossLinks { - lastLink := &types.CrossLink{} - if i == 0 { - if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 { - lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID()) - if err != nil { - return ctxerror.New("[CrossLinkVerification] no last cross link found 1", - "blockHash", block.Hash(), - "crossLink", lastLink, - ).WithCause(err) - } - } - } else { - if (*crossLinks)[i-1].Header().ShardID() != crossLink.Header().ShardID() { - if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 { - lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID()) - if err != nil { - return ctxerror.New("[CrossLinkVerification] no last cross link found 2", - "blockHash", block.Hash(), - "crossLink", lastLink, - ).WithCause(err) - } - } - } else { - lastLink = &(*crossLinks)[i-1] - } - } - - if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 { // TODO: verify genesis block - err = node.VerifyCrosslinkHeader(lastLink.Header(), crossLink.Header()) - if err != nil { - return ctxerror.New("cannot ValidateNewBlock", + for _, crossLink := range *crossLinks { + cl, err := node.Blockchain().ReadCrossLink(crossLink.ShardID(), crossLink.BlockNum()) + if err == nil && cl != nil { + if !bytes.Equal(cl.Serialize(), crossLink.Serialize()) { + return ctxerror.New("[CrossLinkVerification] Double signed crossLink", "blockHash", block.Hash(), - "numTx", len(block.Transactions()), - ).WithCause(err) + "Previous committed crossLink", cl, + "crossLink", crossLink, + ) } + continue + } + if err = node.VerifyCrossLink(crossLink); err != nil { + return ctxerror.New("cannot VerifyBlockCrossLinks", + "blockHash", block.Hash(), + "blockNum", block.Number(), + "crossLinkShard", crossLink.ShardID(), + "crossLinkBlock", crossLink.BlockNum(), + "numTx", len(block.Transactions()), + ).WithCause(err) } } return nil } -// ProcessHeaderMessage verify and process Node/Header message into crosslink when it's valid -func (node *Node) ProcessHeaderMessage(msgPayload []byte) { +// ProcessCrossLinkMessage verify and process Node/CrossLink message into crosslink when it's valid +func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { if node.NodeConfig.ShardID == 0 { - - var headers []*block.Header - err := rlp.DecodeBytes(msgPayload, &headers) + var crosslinks []types.CrossLink + err := rlp.DecodeBytes(msgPayload, &crosslinks) if err != nil { utils.Logger().Error(). Err(err). - Msg("[ProcessingHeader] Crosslink Headers Broadcast Unable to Decode") + Msg("[ProcessingCrossLink] Crosslink Message Broadcast Unable to Decode") return } - - // Try to reprocess all the pending cross links - node.pendingClMutex.Lock() - crossLinkHeadersToProcess := node.pendingCrossLinks - node.pendingCrossLinks = []*block.Header{} - node.pendingClMutex.Unlock() - firstCrossLinkBlock := core.EpochFirstBlock(node.Blockchain().Config().CrossLinkEpoch) - for _, header := range headers { - if header.Number().Cmp(firstCrossLinkBlock) >= 0 { - // Only process cross link starting from FirstCrossLinkBlock - utils.Logger().Debug().Msgf("[ProcessHeaderMessage] Add Pending CrossLink, shardID %d, blockNum %d", header.ShardID(), header.Number()) - crossLinkHeadersToProcess = append(crossLinkHeadersToProcess, header) - } - } - utils.Logger().Debug(). - Msgf("[ProcessingHeader] number of crosslink headers to propose %d, firstCrossLinkBlock %d", len(crossLinkHeadersToProcess), firstCrossLinkBlock) - headersToQuque := []*block.Header{} + candidates := []types.CrossLink{} + utils.Logger().Debug(). + Msgf("[ProcessingCrossLink] Crosslink going to propose: %d", len(crosslinks)) - for _, header := range crossLinkHeadersToProcess { - if len(headersToQuque) > crossLinkBatchSize { - break + for i, cl := range crosslinks { + if cl.Number() == nil || cl.Number().Cmp(firstCrossLinkBlock) < 0 { + utils.Logger().Debug(). + Msgf("[ProcessingCrossLink] Crosslink %d skipped: %v", i, cl) + continue } - exist, err := node.Blockchain().ReadCrossLink(header.ShardID(), header.Number().Uint64(), false) + exist, err := node.Blockchain().ReadCrossLink(cl.ShardID(), cl.Number().Uint64()) if err == nil && exist != nil { utils.Logger().Debug(). - Msgf("[ProcessingHeader] Cross Link already exists, pass. Block num: %d, shardID %d", header.Number(), header.ShardID()) + Msgf("[ProcessingCrossLink] Cross Link already exists, pass. Block num: %d, shardID %d", cl.Number(), cl.ShardID()) continue } - if header.Number().Cmp(firstCrossLinkBlock) > 0 { // Directly trust the first cross-link - // Sanity check on the previous link with the new link - previousLink, err := node.Blockchain().ReadCrossLink(header.ShardID(), header.Number().Uint64()-1, false) - if err != nil { - previousLink, err = node.Blockchain().ReadCrossLink(header.ShardID(), header.Number().Uint64()-1, true) - if err != nil { - headersToQuque = append(headersToQuque, header) - utils.Logger().Error().Err(err). - Msgf("[ProcessingHeader] ReadCrossLink cannot read previousLink with number %d, shardID %d", header.Number().Uint64()-1, header.ShardID()) - continue - } - } - - err = node.VerifyCrosslinkHeader(previousLink.Header(), header) - if err != nil { - utils.Logger().Error(). - Err(err). - Msgf("[ProcessingHeader] Failed to verify new cross link header for shardID %d, blockNum %d", header.ShardID(), header.Number()) - continue - } + err = node.VerifyCrossLink(cl) + if err != nil { + utils.Logger().Error(). + Err(err). + Msgf("[ProcessingCrossLink] Failed to verify new cross link for shardID %d, blockNum %d", cl.ShardID(), cl.Number()) + continue } - - crossLink := types.NewCrossLink(header) + candidates = append(candidates, cl) utils.Logger().Debug(). - Msgf("[ProcessingHeader] committing for shardID %d, blockNum %d", header.ShardID(), header.Number().Uint64()) - node.Blockchain().WriteCrossLinks(types.CrossLinks{crossLink}, true) + Msgf("[ProcessingCrossLink] committing for shardID %d, blockNum %d", cl.ShardID(), cl.Number().Uint64()) } - - // Queue up the cross links that's in the future - node.pendingClMutex.Lock() - node.pendingCrossLinks = append(node.pendingCrossLinks, headersToQuque...) - node.pendingClMutex.Unlock() + node.pendingCLMutex.Lock() + node.pendingCrossLinks = append(node.pendingCrossLinks, candidates...) + node.pendingCLMutex.Unlock() } } @@ -278,22 +228,18 @@ func (node *Node) verifyIncomingReceipts(block *types.Block) error { return nil } -// VerifyCrosslinkHeader verifies the header is valid against the prevHeader. -func (node *Node) VerifyCrosslinkHeader(prevHeader, header *block.Header) error { +// VerifyCrossLink verifies the header is valid against the prevHeader. +func (node *Node) VerifyCrossLink(cl types.CrossLink) error { // TODO: add fork choice rule - parentHash := header.ParentHash() - if prevHeader.Hash() != parentHash { - return ctxerror.New("[CrossLink] Invalid cross link header - parent hash mismatch", "shardID", header.ShardID(), "blockNum", header.Number()) - } // Verify signature of the new cross link header // TODO: check whether to recalculate shard state - shardState, err := node.Blockchain().ReadShardState(prevHeader.Epoch()) - committee := shardState.FindCommitteeByID(prevHeader.ShardID()) + shardState, err := node.Blockchain().ReadShardState(cl.Epoch()) + committee := shardState.FindCommitteeByID(cl.ShardID()) if err != nil || committee == nil { - return ctxerror.New("[CrossLink] Failed to read shard state for cross link header", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err) + return ctxerror.New("[CrossLink] Failed to read shard state for cross link", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) } var committerKeys []*bls.PublicKey @@ -308,98 +254,36 @@ func (node *Node) VerifyCrosslinkHeader(prevHeader, header *block.Header) error committerKeys = append(committerKeys, committerKey) } if !parseKeysSuccess { - return ctxerror.New("[CrossLink] cannot convert BLS public key", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err) + return ctxerror.New("[CrossLink] cannot convert BLS public key", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) } - if header.Number().Uint64() > 1 { // First block doesn't have last sig + if cl.BlockNum() > 1 { // First block doesn't have last sig mask, err := bls_cosi.NewMask(committerKeys, nil) if err != nil { - return ctxerror.New("cannot create group sig mask", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err) + return ctxerror.New("cannot create group sig mask", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) } - if err := mask.SetMask(header.LastCommitBitmap()); err != nil { - return ctxerror.New("cannot set group sig mask bits", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err) + if err := mask.SetMask(cl.Bitmap()); err != nil { + return ctxerror.New("cannot set group sig mask bits", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) } aggSig := bls.Sign{} - sig := header.LastCommitSignature() + sig := cl.Signature() err = aggSig.Deserialize(sig[:]) if err != nil { return ctxerror.New("unable to deserialize multi-signature from payload").WithCause(err) } + parentHash := cl.ParentHash() blockNumBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(blockNumBytes, header.Number().Uint64()-1) + binary.LittleEndian.PutUint64(blockNumBytes, cl.BlockNum()) commitPayload := append(blockNumBytes, parentHash[:]...) if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) { - return ctxerror.New("Failed to verify the signature for cross link header ", "shardID", header.ShardID(), "blockNum", header.Number()) + return ctxerror.New("Failed to verify the signature for cross link", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()) } } return nil } -// ProposeCrossLinkDataForBeaconchain propose cross links for beacon chain new block -func (node *Node) ProposeCrossLinkDataForBeaconchain() (types.CrossLinks, error) { - utils.Logger().Info(). - Uint64("blockNum", node.Blockchain().CurrentBlock().NumberU64()+1). - Msg("Proposing cross links ...") - curBlock := node.Blockchain().CurrentBlock() - numShards := shard.Schedule.InstanceForEpoch(curBlock.Header().Epoch()).NumShards() - - shardCrossLinks := make([]types.CrossLinks, numShards) - - firstCrossLinkBlock := core.EpochFirstBlock(node.Blockchain().Config().CrossLinkEpoch) - - for i := 0; i < int(numShards); i++ { - curShardID := uint32(i) - lastLink, err := node.Blockchain().ReadShardLastCrossLink(curShardID) - - lastLinkblockNum := firstCrossLinkBlock - blockNumoffset := 0 - if err == nil && lastLink != nil { - blockNumoffset = 1 - lastLinkblockNum = lastLink.BlockNum() - } - - for true { - link, err := node.Blockchain().ReadCrossLink(curShardID, lastLinkblockNum.Uint64()+uint64(blockNumoffset), true) - if err != nil || link == nil { - break - } - - if link.BlockNum().Cmp(firstCrossLinkBlock) > 0 { - if lastLink == nil { - utils.Logger().Error(). - Err(err). - Msgf("[CrossLink] Haven't received the first cross link %d", link.BlockNum().Uint64()) - break - } else { - err := node.VerifyCrosslinkHeader(lastLink.Header(), link.Header()) - if err != nil { - utils.Logger().Error(). - Err(err). - Msgf("[CrossLink] Failed verifying temp cross link %d", link.BlockNum().Uint64()) - break - } - } - } - shardCrossLinks[i] = append(shardCrossLinks[i], *link) - lastLink = link - blockNumoffset++ - } - } - - crossLinksToPropose := types.CrossLinks{} - for _, crossLinks := range shardCrossLinks { - crossLinksToPropose = append(crossLinksToPropose, crossLinks...) - } - if len(crossLinksToPropose) != 0 { - crossLinksToPropose.Sort() - - return crossLinksToPropose, nil - } - return types.CrossLinks{}, errors.New("No cross link to propose") -} - // ProcessReceiptMessage store the receipts and merkle proof in local data store func (node *Node) ProcessReceiptMessage(msgPayload []byte) { cxp := types.CXReceiptsProof{} diff --git a/node/node_handler.go b/node/node_handler.go index 861367021..fa8ef4a3d 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -144,13 +144,13 @@ func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) { } } - case proto_node.Header: + case proto_node.CrossLink: // only beacon chain will accept the header from other shards - utils.Logger().Debug().Uint32("shardID", node.NodeConfig.ShardID).Msg("NET: received message: Node/Header") + utils.Logger().Debug().Uint32("shardID", node.NodeConfig.ShardID).Msg("NET: received message: Node/CrossLink") if node.NodeConfig.ShardID != 0 { return } - node.ProcessHeaderMessage(msgPayload[1:]) // skip first byte which is blockMsgType + node.ProcessCrossLinkMessage(msgPayload[1:]) // skip first byte which is blockMsgType case proto_node.Receipt: utils.Logger().Debug().Msg("NET: received message: Node/Receipt") @@ -211,16 +211,17 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) { } } -// BroadcastCrossLinkHeader is called by consensus leader to send the new header as cross link to beacon chain. -func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) { - utils.Logger().Info().Msgf("Broadcasting new header to beacon chain groupID %s", nodeconfig.NewGroupIDByShardID(0)) +// BroadcastCrossLink is called by consensus leader to send the new header as cross link to beacon chain. +func (node *Node) BroadcastCrossLink(newBlock *types.Block) { + utils.Logger().Info().Msgf("Construct and Broadcasting new crosslink to beacon chain groupID %s", nodeconfig.NewGroupIDByShardID(0)) headers := []*block.Header{} lastLink, err := node.Beaconchain().ReadShardLastCrossLink(newBlock.ShardID()) var latestBlockNum uint64 - // if cannot find latest crosslink header, broadcast latest 3 block headers + // TODO chao: record the missing crosslink in local database instead of using latest crosslink + // if cannot find latest crosslink, broadcast latest 3 block headers if err != nil { - utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLinkHeader] ReadShardLastCrossLink Failed") + utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLink] ReadShardLastCrossLink Failed") header := node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 2) if header != nil { headers = append(headers, header) @@ -231,7 +232,7 @@ func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) { } headers = append(headers, newBlock.Header()) } else { - latestBlockNum = lastLink.BlockNum().Uint64() + latestBlockNum = lastLink.BlockNum() for blockNum := latestBlockNum + 1; blockNum <= newBlock.NumberU64(); blockNum++ { if blockNum > latestBlockNum+crossLinkBatchSize { break @@ -243,11 +244,11 @@ func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) { } } - utils.Logger().Info().Msgf("[BroadcastCrossLinkHeader] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers)) + utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers)) for _, header := range headers { - utils.Logger().Debug().Msgf("[BroadcastCrossLinkHeader] Broadcasting %d", header.Number().Uint64()) + utils.Logger().Debug().Msgf("[BroadcastCrossLink] Broadcasting %d", header.Number().Uint64()) } - node.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(0)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(headers))) + node.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(0)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkMessage(headers))) } // VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on @@ -344,7 +345,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit node.BroadcastNewBlock(newBlock) } if node.NodeConfig.ShardID != shard.BeaconChainShardID && newBlock.Epoch().Cmp(node.Blockchain().Config().CrossLinkEpoch) >= 0 { - node.BroadcastCrossLinkHeader(newBlock) + node.BroadcastCrossLink(newBlock) } node.BroadcastCXReceipts(newBlock, commitSigAndBitmap) } else { diff --git a/node/node_newblock.go b/node/node_newblock.go index b8dec7191..ceaf8d391 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -112,10 +112,11 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { // Prepare cross links var crossLinks types.CrossLinks if node.NodeConfig.ShardID == 0 { - crossLinksToPropose, localErr := node.ProposeCrossLinkDataForBeaconchain() - if localErr == nil { - crossLinks = crossLinksToPropose - } + crossLinks = node.pendingCrossLinks + node.pendingCLMutex.Lock() + node.pendingCrossLinks = []types.CrossLink{} + node.pendingCLMutex.Unlock() + utils.Logger().Debug().Msgf("Number of crosslinks to propose: %d", len(crossLinks)) } // Prepare shard state diff --git a/node/worker/worker.go b/node/worker/worker.go index 20f43c8f1..f7101612c 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -326,6 +326,7 @@ func (w *Worker) FinalizeNewBlock(sig []byte, signers []byte, viewID uint64, coi // Cross Links if crossLinks != nil && len(crossLinks) != 0 { + crossLinks.Sort() crossLinkData, err := rlp.EncodeToBytes(crossLinks) if err == nil { utils.Logger().Debug().