Simplified CrossLink structure;

Remove temp flag of Read/Write crosslink
pull/1877/head
chao 5 years ago committed by Chao Ma
parent 5ca5632dfc
commit 87dd2cd2cd
  1. 18
      api/proto/node/node.go
  2. 36
      core/blockchain.go
  3. 27
      core/rawdb/accessors_chain.go
  4. 12
      core/rawdb/schema.go
  5. 55
      core/types/crosslink.go
  6. 6
      node/node.go
  7. 236
      node/node_cross_shard.go
  8. 27
      node/node_handler.go
  9. 9
      node/node_newblock.go
  10. 1
      node/worker/worker.go

@ -97,8 +97,8 @@ type BlockMessageType int
const (
Sync BlockMessageType = iota
Header // used for crosslink from beacon chain to shard chain
Receipt // cross-shard transaction receipts
CrossLink // used for crosslink from beacon chain to shard chain
Receipt // cross-shard transaction receipts
)
// SerializeBlockchainSyncMessage serializes BlockchainSyncMessage.
@ -149,14 +149,18 @@ func ConstructBlocksSyncMessage(blocks []*types.Block) []byte {
return byteBuffer.Bytes()
}
// ConstructCrossLinkHeadersMessage constructs cross link header message to send to beacon chain
func ConstructCrossLinkHeadersMessage(headers []*block.Header) []byte {
// ConstructCrossLinkMessage constructs cross link message to send to beacon chain
func ConstructCrossLinkMessage(headers []*block.Header) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(Block))
byteBuffer.WriteByte(byte(Header))
byteBuffer.WriteByte(byte(CrossLink))
headersData, _ := rlp.EncodeToBytes(headers)
byteBuffer.Write(headersData)
crosslinks := []types.CrossLink{}
for _, header := range headers {
crosslinks = append(crosslinks, types.NewCrossLink(header))
}
crosslinksData, _ := rlp.EncodeToBytes(crosslinks)
byteBuffer.Write(crosslinksData)
return byteBuffer.Bytes()
}

@ -1094,7 +1094,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
continue
}
shardReceipts := GetToShardReceipts(cxReceipts, uint32(i))
err := rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts, false)
err := rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts)
if err != nil {
utils.Logger().Debug().Err(err).Interface("shardReceipts", shardReceipts).Int("toShardID", i).Msg("WriteCXReceipts cannot write into database")
return NonStatTy, err
@ -1202,10 +1202,9 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, errors.New("proposed cross links are not sorted")
}
for _, crossLink := range *crossLinks {
if err := bc.WriteCrossLinks(types.CrossLinks{crossLink}, false); err == nil {
utils.Logger().Info().Uint64("blockNum", crossLink.BlockNum().Uint64()).Uint32("shardID", crossLink.ShardID()).Msg("[InsertChain] Cross Link Added to Beaconchain")
if err := bc.WriteCrossLinks(types.CrossLinks{crossLink}); err == nil {
utils.Logger().Info().Uint64("blockNum", crossLink.BlockNum()).Uint32("shardID", crossLink.ShardID()).Msg("[InsertChain] Cross Link Added to Beaconchain")
}
bc.DeleteCrossLinks(types.CrossLinks{crossLink}, true)
bc.WriteShardLastCrossLink(crossLink.ShardID(), crossLink)
}
}
@ -2093,31 +2092,28 @@ func (bc *BlockChain) WriteEpochVdfBlockNum(epoch *big.Int, blockNum *big.Int) e
}
// WriteCrossLinks saves the hashes of crosslinks by shardID and blockNum combination key
// temp=true is to write the just received cross link that's not committed into blockchain with consensus
func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink, temp bool) error {
func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink) error {
var err error
for i := 0; i < len(cls); i++ {
cl := cls[i]
err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum().Uint64(), cl.Serialize(), temp)
err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum(), cl.Serialize())
}
return err
}
// DeleteCrossLinks removes the hashes of crosslinks by shardID and blockNum combination key
// temp=true is to write the just received cross link that's not committed into blockchain with consensus
func (bc *BlockChain) DeleteCrossLinks(cls []types.CrossLink, temp bool) error {
func (bc *BlockChain) DeleteCrossLinks(cls []types.CrossLink) error {
var err error
for i := 0; i < len(cls); i++ {
cl := cls[i]
err = rawdb.DeleteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum().Uint64(), temp)
err = rawdb.DeleteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum())
}
return err
}
// ReadCrossLink retrieves crosslink given shardID and blockNum.
// temp=true is to retrieve the just received cross link that's not committed into blockchain with consensus
func (bc *BlockChain) ReadCrossLink(shardID uint32, blockNum uint64, temp bool) (*types.CrossLink, error) {
bytes, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum, temp)
func (bc *BlockChain) ReadCrossLink(shardID uint32, blockNum uint64) (*types.CrossLink, error) {
bytes, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum)
if err != nil {
return nil, err
}
@ -2177,9 +2173,8 @@ func GetToShardReceipts(cxReceipts types.CXReceipts, shardID uint32) types.CXRec
}
// ReadCXReceipts retrieves the cross shard transaction receipts of a given shard
// temp=true is to retrieve the just received receipts that's not committed into blockchain with consensus
func (bc *BlockChain) ReadCXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash, temp bool) (types.CXReceipts, error) {
cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, blockNum, blockHash, temp)
func (bc *BlockChain) ReadCXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash) (types.CXReceipts, error) {
cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, blockNum, blockHash)
if err != nil || len(cxs) == 0 {
return nil, err
}
@ -2187,15 +2182,14 @@ func (bc *BlockChain) ReadCXReceipts(shardID uint32, blockNum uint64, blockHash
}
// WriteCXReceipts saves the cross shard transaction receipts of a given shard
// temp=true is to store the just received receipts that's not committed into blockchain with consensus
func (bc *BlockChain) WriteCXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash, receipts types.CXReceipts, temp bool) error {
return rawdb.WriteCXReceipts(bc.db, shardID, blockNum, blockHash, receipts, temp)
func (bc *BlockChain) WriteCXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash, receipts types.CXReceipts) error {
return rawdb.WriteCXReceipts(bc.db, shardID, blockNum, blockHash, receipts)
}
// CXMerkleProof calculates the cross shard transaction merkle proof of a given destination shard
func (bc *BlockChain) CXMerkleProof(shardID uint32, block *types.Block) (*types.CXMerkleProof, error) {
proof := &types.CXMerkleProof{BlockNum: block.Number(), BlockHash: block.Hash(), ShardID: block.ShardID(), CXReceiptHash: block.Header().OutgoingReceiptHash(), CXShardHashes: []common.Hash{}, ShardIDs: []uint32{}}
cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, block.NumberU64(), block.Hash(), false)
cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, block.NumberU64(), block.Hash())
if err != nil || cxs == nil {
return nil, err
@ -2206,7 +2200,7 @@ func (bc *BlockChain) CXMerkleProof(shardID uint32, block *types.Block) (*types.
shardNum := int(shardingConfig.NumShards())
for i := 0; i < shardNum; i++ {
receipts, err := bc.ReadCXReceipts(uint32(i), block.NumberU64(), block.Hash(), false)
receipts, err := bc.ReadCXReceipts(uint32(i), block.NumberU64(), block.Hash())
if err != nil || len(receipts) == 0 {
continue
} else {

@ -515,18 +515,18 @@ func WriteEpochVdfBlockNum(db DatabaseWriter, epoch *big.Int, data []byte) error
}
// ReadCrossLinkShardBlock retrieves the blockHash given shardID and blockNum
func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64, temp bool) ([]byte, error) {
return db.Get(crosslinkKey(shardID, blockNum, temp))
func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64) ([]byte, error) {
return db.Get(crosslinkKey(shardID, blockNum))
}
// WriteCrossLinkShardBlock stores the blockHash given shardID and blockNum
func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte, temp bool) error {
return db.Put(crosslinkKey(shardID, blockNum, temp), data)
func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte) error {
return db.Put(crosslinkKey(shardID, blockNum), data)
}
// DeleteCrossLinkShardBlock deletes the blockHash given shardID and blockNum
func DeleteCrossLinkShardBlock(db DatabaseDeleter, shardID uint32, blockNum uint64, temp bool) error {
return db.Delete(crosslinkKey(shardID, blockNum, temp))
func DeleteCrossLinkShardBlock(db DatabaseDeleter, shardID uint32, blockNum uint64) error {
return db.Delete(crosslinkKey(shardID, blockNum))
}
// ReadShardLastCrossLink read the last cross link of a shard
@ -540,8 +540,8 @@ func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) err
}
// ReadCXReceipts retrieves all the transactions of receipts given destination shardID, number and blockHash
func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash, temp bool) (types.CXReceipts, error) {
data, err := db.Get(cxReceiptKey(shardID, number, hash, temp))
func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) (types.CXReceipts, error) {
data, err := db.Get(cxReceiptKey(shardID, number, hash))
if err != nil || len(data) == 0 {
utils.Logger().Info().Err(err).Uint64("number", number).Int("dataLen", len(data)).Msg("ReadCXReceipts")
return nil, err
@ -555,25 +555,18 @@ func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash commo
}
// WriteCXReceipts stores all the transaction receipts given destination shardID, blockNumber and blockHash
func WriteCXReceipts(db DatabaseWriter, shardID uint32, number uint64, hash common.Hash, receipts types.CXReceipts, temp bool) error {
func WriteCXReceipts(db DatabaseWriter, shardID uint32, number uint64, hash common.Hash, receipts types.CXReceipts) error {
bytes, err := rlp.EncodeToBytes(receipts)
if err != nil {
utils.Logger().Error().Msg("[WriteCXReceipts] Failed to encode cross shard tx receipts")
}
// Store the receipt slice
if err := db.Put(cxReceiptKey(shardID, number, hash, temp), bytes); err != nil {
if err := db.Put(cxReceiptKey(shardID, number, hash), bytes); err != nil {
utils.Logger().Error().Msg("[WriteCXReceipts] Failed to store cxreceipts")
}
return err
}
// DeleteCXReceipts removes all receipt data associated with a block hash.
func DeleteCXReceipts(db DatabaseDeleter, shardID uint32, number uint64, hash common.Hash, temp bool) {
if err := db.Delete(cxReceiptKey(shardID, number, hash, temp)); err != nil {
utils.Logger().Error().Msg("Failed to delete cross shard tx receipts")
}
}
// ReadCXReceiptsProofSpent check whether a CXReceiptsProof is unspent
func ReadCXReceiptsProofSpent(db DatabaseReader, shardID uint32, number uint64) (byte, error) {
data, err := db.Get(cxReceiptSpentKey(shardID, number))

@ -63,13 +63,11 @@ var (
shardLastCrosslinkPrefix = []byte("lcl") // prefix for shard last crosslink
crosslinkPrefix = []byte("cl") // prefix for crosslink
tempCrosslinkPrefix = []byte("tcl") // prefix for tempCrosslink
delegatorValidatorListPrefix = []byte("dvl") // prefix for delegator's validator list
// TODO: shorten the key prefix so we don't waste db space
cxReceiptPrefix = []byte("cxReceipt") // prefix for cross shard transaction receipt
tempCxReceiptPrefix = []byte("tempCxReceipt") // prefix for temporary cross shard transaction receipt
cxReceiptHashPrefix = []byte("cxReceiptHash") // prefix for cross shard transaction receipt hash
cxReceiptSpentPrefix = []byte("cxReceiptSpent") // prefix for indicator of unspent of cxReceiptsProof
cxReceiptUnspentCheckpointPrefix = []byte("cxReceiptUnspentCheckpoint") // prefix for cxReceiptsProof unspent checkpoint
@ -195,11 +193,8 @@ func shardLastCrosslinkKey(shardID uint32) []byte {
return key
}
func crosslinkKey(shardID uint32, blockNum uint64, temp bool) []byte {
func crosslinkKey(shardID uint32, blockNum uint64) []byte {
prefix := crosslinkPrefix
if temp {
prefix = tempCrosslinkPrefix
}
sbKey := make([]byte, 12)
binary.BigEndian.PutUint32(sbKey, shardID)
binary.BigEndian.PutUint64(sbKey[4:], blockNum)
@ -212,11 +207,8 @@ func delegatorValidatorListKey(delegator common.Address) []byte {
}
// cxReceiptKey = cxReceiptsPrefix + shardID + num (uint64 big endian) + hash
func cxReceiptKey(shardID uint32, number uint64, hash common.Hash, temp bool) []byte {
func cxReceiptKey(shardID uint32, number uint64, hash common.Hash) []byte {
prefix := cxReceiptPrefix
if temp {
prefix = tempCxReceiptPrefix
}
sKey := make([]byte, 4)
binary.BigEndian.PutUint32(sKey, shardID)
tmp := append(prefix, sKey...)

@ -4,51 +4,62 @@ import (
"math/big"
"sort"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/block"
)
// CrossLink is only used on beacon chain to store the hash links from other shards
// signature and bitmap correspond to |blockNumber|parentHash| byte array
// Captial to enable rlp encoding
type CrossLink struct {
ChainHeader *block.Header
ParentHashF common.Hash
BlockNumberF *big.Int
SignatureF [96]byte //aggregated signature
BitmapF []byte //corresponding bitmap mask for agg signature
ShardIDF uint32 //need first verify signature on |blockNumber|blockHash| is correct
EpochF *big.Int //need first verify signature on |blockNumber|blockHash| is correct
}
// NewCrossLink returns a new cross link object
func NewCrossLink(header *block.Header) CrossLink {
return CrossLink{header}
parentBlockNum := header.Number().Sub(header.Number(), big.NewInt(1))
return CrossLink{header.ParentHash(), parentBlockNum, header.LastCommitSignature(), header.LastCommitBitmap(), header.ShardID(), header.Epoch()}
}
// Header returns header
func (cl CrossLink) Header() *block.Header {
return cl.ChainHeader
// ShardID returns shardID
func (cl CrossLink) ShardID() uint32 {
return cl.ShardIDF
}
// ShardID returns shardID
func (cl CrossLink) ShardID() uint32 {
return cl.ChainHeader.ShardID()
func (cl CrossLink) Epoch() *big.Int {
return cl.EpochF
}
// Number returns blockNum with big.Int format
func (cl CrossLink) Number() *big.Int {
return cl.BlockNumberF
}
// BlockNum returns blockNum
func (cl CrossLink) BlockNum() *big.Int {
return cl.ChainHeader.Number()
func (cl CrossLink) BlockNum() uint64 {
return cl.BlockNumberF.Uint64()
}
// Hash returns hash
func (cl CrossLink) Hash() common.Hash {
return cl.ChainHeader.Hash()
func (cl CrossLink) ParentHash() common.Hash {
return cl.ParentHashF
}
// StateRoot returns hash of state root
func (cl CrossLink) StateRoot() common.Hash {
return cl.ChainHeader.Root()
// Bitmap returns bitmap
func (cl CrossLink) Bitmap() []byte {
return cl.BitmapF
}
// OutgoingReceiptsRoot returns hash of cross shard receipts
func (cl CrossLink) OutgoingReceiptsRoot() common.Hash {
return cl.ChainHeader.OutgoingReceiptHash()
// Signature returns aggregated signature
func (cl CrossLink) Signature() [96]byte {
return cl.SignatureF
}
// Serialize returns bytes of cross link rlp-encoded content
@ -73,13 +84,13 @@ type CrossLinks []CrossLink
// Sort crosslinks by shardID and then by blockNum
func (cls CrossLinks) Sort() {
sort.Slice(cls, func(i, j int) bool {
return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].BlockNum().Cmp(cls[j].BlockNum()) < 0)
return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].Number().Cmp(cls[j].Number()) < 0)
})
}
// IsSorted checks whether the cross links are sorted
func (cls CrossLinks) IsSorted() bool {
return sort.SliceIsSorted(cls, func(i, j int) bool {
return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].BlockNum().Cmp(cls[j].BlockNum()) < 0)
return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].Number().Cmp(cls[j].Number()) < 0)
})
}

@ -15,7 +15,6 @@ import (
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/api/service/syncing"
"github.com/harmony-one/harmony/api/service/syncing/downloader"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/reward"
"github.com/harmony-one/harmony/core"
@ -108,12 +107,13 @@ type Node struct {
ConfirmedBlockChannel chan *types.Block // The channel to send confirmed blocks
BeaconBlockChannel chan *types.Block // The channel to send beacon blocks for non-beaconchain nodes
DRand *drand.DRand // The instance for distributed randomness protocol
pendingCrossLinks []*block.Header
pendingClMutex sync.Mutex
pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus
pendingCXMutex sync.Mutex
pendingCrossLinks []types.CrossLink
pendingCLMutex sync.Mutex
// Shard databases
shardChains shardchain.Collection

@ -1,14 +1,13 @@
package node
import (
"bytes"
"encoding/binary"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/bls/ffi/go/bls"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
@ -54,7 +53,7 @@ func (node *Node) BroadcastCXReceiptsWithShardID(block *types.Block, commitSig [
myShardID := node.Consensus.ShardID
utils.Logger().Info().Uint32("toShardID", toShardID).Uint32("myShardID", myShardID).Uint64("blockNum", block.NumberU64()).Msg("[BroadcastCXReceiptsWithShardID]")
cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash(), false)
cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash())
if err != nil || len(cxReceipts) == 0 {
utils.Logger().Info().Err(err).Uint32("ToShardID", toShardID).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceiptsWithShardID] No ReadCXReceipts found")
return
@ -120,124 +119,75 @@ func (node *Node) VerifyBlockCrossLinks(block *types.Block) error {
)
}
firstCrossLinkBlock := core.EpochFirstBlock(node.Blockchain().Config().CrossLinkEpoch)
for i, crossLink := range *crossLinks {
lastLink := &types.CrossLink{}
if i == 0 {
if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 1",
"blockHash", block.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
}
} else {
if (*crossLinks)[i-1].Header().ShardID() != crossLink.Header().ShardID() {
if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 2",
"blockHash", block.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
}
} else {
lastLink = &(*crossLinks)[i-1]
}
}
if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 { // TODO: verify genesis block
err = node.VerifyCrosslinkHeader(lastLink.Header(), crossLink.Header())
if err != nil {
return ctxerror.New("cannot ValidateNewBlock",
for _, crossLink := range *crossLinks {
cl, err := node.Blockchain().ReadCrossLink(crossLink.ShardID(), crossLink.BlockNum())
if err == nil && cl != nil {
if !bytes.Equal(cl.Serialize(), crossLink.Serialize()) {
return ctxerror.New("[CrossLinkVerification] Double signed crossLink",
"blockHash", block.Hash(),
"numTx", len(block.Transactions()),
).WithCause(err)
"Previous committed crossLink", cl,
"crossLink", crossLink,
)
}
continue
}
if err = node.VerifyCrossLink(crossLink); err != nil {
return ctxerror.New("cannot VerifyBlockCrossLinks",
"blockHash", block.Hash(),
"blockNum", block.Number(),
"crossLinkShard", crossLink.ShardID(),
"crossLinkBlock", crossLink.BlockNum(),
"numTx", len(block.Transactions()),
).WithCause(err)
}
}
return nil
}
// ProcessHeaderMessage verify and process Node/Header message into crosslink when it's valid
func (node *Node) ProcessHeaderMessage(msgPayload []byte) {
// ProcessCrossLinkMessage verify and process Node/CrossLink message into crosslink when it's valid
func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
if node.NodeConfig.ShardID == 0 {
var headers []*block.Header
err := rlp.DecodeBytes(msgPayload, &headers)
var crosslinks []types.CrossLink
err := rlp.DecodeBytes(msgPayload, &crosslinks)
if err != nil {
utils.Logger().Error().
Err(err).
Msg("[ProcessingHeader] Crosslink Headers Broadcast Unable to Decode")
Msg("[ProcessingCrossLink] Crosslink Message Broadcast Unable to Decode")
return
}
// Try to reprocess all the pending cross links
node.pendingClMutex.Lock()
crossLinkHeadersToProcess := node.pendingCrossLinks
node.pendingCrossLinks = []*block.Header{}
node.pendingClMutex.Unlock()
firstCrossLinkBlock := core.EpochFirstBlock(node.Blockchain().Config().CrossLinkEpoch)
for _, header := range headers {
if header.Number().Cmp(firstCrossLinkBlock) >= 0 {
// Only process cross link starting from FirstCrossLinkBlock
utils.Logger().Debug().Msgf("[ProcessHeaderMessage] Add Pending CrossLink, shardID %d, blockNum %d", header.ShardID(), header.Number())
crossLinkHeadersToProcess = append(crossLinkHeadersToProcess, header)
}
}
utils.Logger().Debug().
Msgf("[ProcessingHeader] number of crosslink headers to propose %d, firstCrossLinkBlock %d", len(crossLinkHeadersToProcess), firstCrossLinkBlock)
headersToQuque := []*block.Header{}
candidates := []types.CrossLink{}
utils.Logger().Debug().
Msgf("[ProcessingCrossLink] Crosslink going to propose: %d", len(crosslinks))
for _, header := range crossLinkHeadersToProcess {
if len(headersToQuque) > crossLinkBatchSize {
break
for i, cl := range crosslinks {
if cl.Number() == nil || cl.Number().Cmp(firstCrossLinkBlock) < 0 {
utils.Logger().Debug().
Msgf("[ProcessingCrossLink] Crosslink %d skipped: %v", i, cl)
continue
}
exist, err := node.Blockchain().ReadCrossLink(header.ShardID(), header.Number().Uint64(), false)
exist, err := node.Blockchain().ReadCrossLink(cl.ShardID(), cl.Number().Uint64())
if err == nil && exist != nil {
utils.Logger().Debug().
Msgf("[ProcessingHeader] Cross Link already exists, pass. Block num: %d, shardID %d", header.Number(), header.ShardID())
Msgf("[ProcessingCrossLink] Cross Link already exists, pass. Block num: %d, shardID %d", cl.Number(), cl.ShardID())
continue
}
if header.Number().Cmp(firstCrossLinkBlock) > 0 { // Directly trust the first cross-link
// Sanity check on the previous link with the new link
previousLink, err := node.Blockchain().ReadCrossLink(header.ShardID(), header.Number().Uint64()-1, false)
if err != nil {
previousLink, err = node.Blockchain().ReadCrossLink(header.ShardID(), header.Number().Uint64()-1, true)
if err != nil {
headersToQuque = append(headersToQuque, header)
utils.Logger().Error().Err(err).
Msgf("[ProcessingHeader] ReadCrossLink cannot read previousLink with number %d, shardID %d", header.Number().Uint64()-1, header.ShardID())
continue
}
}
err = node.VerifyCrosslinkHeader(previousLink.Header(), header)
if err != nil {
utils.Logger().Error().
Err(err).
Msgf("[ProcessingHeader] Failed to verify new cross link header for shardID %d, blockNum %d", header.ShardID(), header.Number())
continue
}
err = node.VerifyCrossLink(cl)
if err != nil {
utils.Logger().Error().
Err(err).
Msgf("[ProcessingCrossLink] Failed to verify new cross link for shardID %d, blockNum %d", cl.ShardID(), cl.Number())
continue
}
crossLink := types.NewCrossLink(header)
candidates = append(candidates, cl)
utils.Logger().Debug().
Msgf("[ProcessingHeader] committing for shardID %d, blockNum %d", header.ShardID(), header.Number().Uint64())
node.Blockchain().WriteCrossLinks(types.CrossLinks{crossLink}, true)
Msgf("[ProcessingCrossLink] committing for shardID %d, blockNum %d", cl.ShardID(), cl.Number().Uint64())
}
// Queue up the cross links that's in the future
node.pendingClMutex.Lock()
node.pendingCrossLinks = append(node.pendingCrossLinks, headersToQuque...)
node.pendingClMutex.Unlock()
node.pendingCLMutex.Lock()
node.pendingCrossLinks = append(node.pendingCrossLinks, candidates...)
node.pendingCLMutex.Unlock()
}
}
@ -278,22 +228,18 @@ func (node *Node) verifyIncomingReceipts(block *types.Block) error {
return nil
}
// VerifyCrosslinkHeader verifies the header is valid against the prevHeader.
func (node *Node) VerifyCrosslinkHeader(prevHeader, header *block.Header) error {
// VerifyCrossLink verifies the header is valid against the prevHeader.
func (node *Node) VerifyCrossLink(cl types.CrossLink) error {
// TODO: add fork choice rule
parentHash := header.ParentHash()
if prevHeader.Hash() != parentHash {
return ctxerror.New("[CrossLink] Invalid cross link header - parent hash mismatch", "shardID", header.ShardID(), "blockNum", header.Number())
}
// Verify signature of the new cross link header
// TODO: check whether to recalculate shard state
shardState, err := node.Blockchain().ReadShardState(prevHeader.Epoch())
committee := shardState.FindCommitteeByID(prevHeader.ShardID())
shardState, err := node.Blockchain().ReadShardState(cl.Epoch())
committee := shardState.FindCommitteeByID(cl.ShardID())
if err != nil || committee == nil {
return ctxerror.New("[CrossLink] Failed to read shard state for cross link header", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err)
return ctxerror.New("[CrossLink] Failed to read shard state for cross link", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err)
}
var committerKeys []*bls.PublicKey
@ -308,98 +254,36 @@ func (node *Node) VerifyCrosslinkHeader(prevHeader, header *block.Header) error
committerKeys = append(committerKeys, committerKey)
}
if !parseKeysSuccess {
return ctxerror.New("[CrossLink] cannot convert BLS public key", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err)
return ctxerror.New("[CrossLink] cannot convert BLS public key", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err)
}
if header.Number().Uint64() > 1 { // First block doesn't have last sig
if cl.BlockNum() > 1 { // First block doesn't have last sig
mask, err := bls_cosi.NewMask(committerKeys, nil)
if err != nil {
return ctxerror.New("cannot create group sig mask", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err)
return ctxerror.New("cannot create group sig mask", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err)
}
if err := mask.SetMask(header.LastCommitBitmap()); err != nil {
return ctxerror.New("cannot set group sig mask bits", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err)
if err := mask.SetMask(cl.Bitmap()); err != nil {
return ctxerror.New("cannot set group sig mask bits", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err)
}
aggSig := bls.Sign{}
sig := header.LastCommitSignature()
sig := cl.Signature()
err = aggSig.Deserialize(sig[:])
if err != nil {
return ctxerror.New("unable to deserialize multi-signature from payload").WithCause(err)
}
parentHash := cl.ParentHash()
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, header.Number().Uint64()-1)
binary.LittleEndian.PutUint64(blockNumBytes, cl.BlockNum())
commitPayload := append(blockNumBytes, parentHash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
return ctxerror.New("Failed to verify the signature for cross link header ", "shardID", header.ShardID(), "blockNum", header.Number())
return ctxerror.New("Failed to verify the signature for cross link", "shardID", cl.ShardID(), "blockNum", cl.BlockNum())
}
}
return nil
}
// ProposeCrossLinkDataForBeaconchain propose cross links for beacon chain new block
func (node *Node) ProposeCrossLinkDataForBeaconchain() (types.CrossLinks, error) {
utils.Logger().Info().
Uint64("blockNum", node.Blockchain().CurrentBlock().NumberU64()+1).
Msg("Proposing cross links ...")
curBlock := node.Blockchain().CurrentBlock()
numShards := shard.Schedule.InstanceForEpoch(curBlock.Header().Epoch()).NumShards()
shardCrossLinks := make([]types.CrossLinks, numShards)
firstCrossLinkBlock := core.EpochFirstBlock(node.Blockchain().Config().CrossLinkEpoch)
for i := 0; i < int(numShards); i++ {
curShardID := uint32(i)
lastLink, err := node.Blockchain().ReadShardLastCrossLink(curShardID)
lastLinkblockNum := firstCrossLinkBlock
blockNumoffset := 0
if err == nil && lastLink != nil {
blockNumoffset = 1
lastLinkblockNum = lastLink.BlockNum()
}
for true {
link, err := node.Blockchain().ReadCrossLink(curShardID, lastLinkblockNum.Uint64()+uint64(blockNumoffset), true)
if err != nil || link == nil {
break
}
if link.BlockNum().Cmp(firstCrossLinkBlock) > 0 {
if lastLink == nil {
utils.Logger().Error().
Err(err).
Msgf("[CrossLink] Haven't received the first cross link %d", link.BlockNum().Uint64())
break
} else {
err := node.VerifyCrosslinkHeader(lastLink.Header(), link.Header())
if err != nil {
utils.Logger().Error().
Err(err).
Msgf("[CrossLink] Failed verifying temp cross link %d", link.BlockNum().Uint64())
break
}
}
}
shardCrossLinks[i] = append(shardCrossLinks[i], *link)
lastLink = link
blockNumoffset++
}
}
crossLinksToPropose := types.CrossLinks{}
for _, crossLinks := range shardCrossLinks {
crossLinksToPropose = append(crossLinksToPropose, crossLinks...)
}
if len(crossLinksToPropose) != 0 {
crossLinksToPropose.Sort()
return crossLinksToPropose, nil
}
return types.CrossLinks{}, errors.New("No cross link to propose")
}
// ProcessReceiptMessage store the receipts and merkle proof in local data store
func (node *Node) ProcessReceiptMessage(msgPayload []byte) {
cxp := types.CXReceiptsProof{}

@ -144,13 +144,13 @@ func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) {
}
}
case proto_node.Header:
case proto_node.CrossLink:
// only beacon chain will accept the header from other shards
utils.Logger().Debug().Uint32("shardID", node.NodeConfig.ShardID).Msg("NET: received message: Node/Header")
utils.Logger().Debug().Uint32("shardID", node.NodeConfig.ShardID).Msg("NET: received message: Node/CrossLink")
if node.NodeConfig.ShardID != 0 {
return
}
node.ProcessHeaderMessage(msgPayload[1:]) // skip first byte which is blockMsgType
node.ProcessCrossLinkMessage(msgPayload[1:]) // skip first byte which is blockMsgType
case proto_node.Receipt:
utils.Logger().Debug().Msg("NET: received message: Node/Receipt")
@ -211,16 +211,17 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
}
}
// BroadcastCrossLinkHeader is called by consensus leader to send the new header as cross link to beacon chain.
func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) {
utils.Logger().Info().Msgf("Broadcasting new header to beacon chain groupID %s", nodeconfig.NewGroupIDByShardID(0))
// BroadcastCrossLink is called by consensus leader to send the new header as cross link to beacon chain.
func (node *Node) BroadcastCrossLink(newBlock *types.Block) {
utils.Logger().Info().Msgf("Construct and Broadcasting new crosslink to beacon chain groupID %s", nodeconfig.NewGroupIDByShardID(0))
headers := []*block.Header{}
lastLink, err := node.Beaconchain().ReadShardLastCrossLink(newBlock.ShardID())
var latestBlockNum uint64
// if cannot find latest crosslink header, broadcast latest 3 block headers
// TODO chao: record the missing crosslink in local database instead of using latest crosslink
// if cannot find latest crosslink, broadcast latest 3 block headers
if err != nil {
utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLinkHeader] ReadShardLastCrossLink Failed")
utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLink] ReadShardLastCrossLink Failed")
header := node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 2)
if header != nil {
headers = append(headers, header)
@ -231,7 +232,7 @@ func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) {
}
headers = append(headers, newBlock.Header())
} else {
latestBlockNum = lastLink.BlockNum().Uint64()
latestBlockNum = lastLink.BlockNum()
for blockNum := latestBlockNum + 1; blockNum <= newBlock.NumberU64(); blockNum++ {
if blockNum > latestBlockNum+crossLinkBatchSize {
break
@ -243,11 +244,11 @@ func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) {
}
}
utils.Logger().Info().Msgf("[BroadcastCrossLinkHeader] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers))
utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers))
for _, header := range headers {
utils.Logger().Debug().Msgf("[BroadcastCrossLinkHeader] Broadcasting %d", header.Number().Uint64())
utils.Logger().Debug().Msgf("[BroadcastCrossLink] Broadcasting %d", header.Number().Uint64())
}
node.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(0)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(headers)))
node.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(0)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkMessage(headers)))
}
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on
@ -344,7 +345,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit
node.BroadcastNewBlock(newBlock)
}
if node.NodeConfig.ShardID != shard.BeaconChainShardID && newBlock.Epoch().Cmp(node.Blockchain().Config().CrossLinkEpoch) >= 0 {
node.BroadcastCrossLinkHeader(newBlock)
node.BroadcastCrossLink(newBlock)
}
node.BroadcastCXReceipts(newBlock, commitSigAndBitmap)
} else {

@ -112,10 +112,11 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
// Prepare cross links
var crossLinks types.CrossLinks
if node.NodeConfig.ShardID == 0 {
crossLinksToPropose, localErr := node.ProposeCrossLinkDataForBeaconchain()
if localErr == nil {
crossLinks = crossLinksToPropose
}
crossLinks = node.pendingCrossLinks
node.pendingCLMutex.Lock()
node.pendingCrossLinks = []types.CrossLink{}
node.pendingCLMutex.Unlock()
utils.Logger().Debug().Msgf("Number of crosslinks to propose: %d", len(crossLinks))
}
// Prepare shard state

@ -326,6 +326,7 @@ func (w *Worker) FinalizeNewBlock(sig []byte, signers []byte, viewID uint64, coi
// Cross Links
if crossLinks != nil && len(crossLinks) != 0 {
crossLinks.Sort()
crossLinkData, err := rlp.EncodeToBytes(crossLinks)
if err == nil {
utils.Logger().Debug().

Loading…
Cancel
Save