diff --git a/api/proto/node/node.go b/api/proto/node/node.go index 756842f24..2e6c34d73 100644 --- a/api/proto/node/node.go +++ b/api/proto/node/node.go @@ -159,6 +159,17 @@ func ConstructBlocksSyncMessage(blocks []*types.Block) []byte { return byteBuffer.Bytes() } +// ConstructCrossLinkHeadersMessage constructs cross link header message to send blocks to beacon chain +func ConstructCrossLinkHeadersMessage(headers []*types.Header) []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) + byteBuffer.WriteByte(byte(Block)) + byteBuffer.WriteByte(byte(Header)) + + headersData, _ := rlp.EncodeToBytes(headers) + byteBuffer.Write(headersData) + return byteBuffer.Bytes() +} + // ConstructEpochShardStateMessage contructs epoch shard state message func ConstructEpochShardStateMessage(epochShardState types.EpochShardState) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 1670579f4..49d011ec1 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -302,6 +302,7 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { // TODO: refactor the creation of blockchain out of node.New() currentConsensus.ChainReader = currentNode.Blockchain() + currentNode.NodeConfig.SetBeaconGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(0))) if *isExplorer { currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode) currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(*shardID))) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index e455f5d47..b85a151b4 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -571,6 +571,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { } // Construct and send the commit message + // TODO: should only sign on block hash blockNumBytes := make([]byte, 8) binary.LittleEndian.PutUint64(blockNumBytes, consensus.blockNum) commitPayload := append(blockNumBytes, consensus.blockHash[:]...) diff --git a/core/blockchain.go b/core/blockchain.go index 69246c4c2..a6d279819 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1110,8 +1110,20 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { header.Logger(utils.Logger()).Warn().Err(err).Msg("cannot store shard state") } } + if len(header.CrossLinks) > 0 { + crossLinks := &types.CrossLinks{} + err := rlp.DecodeBytes(header.CrossLinks, crossLinks) + if err != nil { + header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain] cannot parse cross links") + } + for _, crossLink := range *crossLinks { + bc.WriteCrossLinks(types.CrossLinks{crossLink}, false) + bc.WriteShardLastCrossLink(crossLink.ShardID(), crossLink) + } + } } } + return n, err } @@ -1960,23 +1972,54 @@ func (bc *BlockChain) WriteEpochVdfBlockNum(epoch *big.Int, blockNum *big.Int) e } // WriteCrossLinks saves the hashes of crosslinks by shardID and blockNum combination key -func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink) error { +// temp=true is to write the just received cross link that's not committed into blockchain with consensus +func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink, temp bool) error { var err error - for _, cl := range cls { - err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum(), cl.Bytes()) + for i := 0; i < len(cls); i++ { + cl := cls[i] + err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum().Uint64(), cl.Serialize(), temp) } return err } -// ReadCrossLinkHash retrieves crosslink hash given shardID and blockNum -func (bc *BlockChain) ReadCrossLinkHash(shardID uint32, blockNum uint64) (common.Hash, error) { - h, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum) +// ReadCrossLink retrieves crosslink given shardID and blockNum. +// temp=true is to retrieve the just received cross link that's not committed into blockchain with consensus +func (bc *BlockChain) ReadCrossLink(shardID uint32, blockNum uint64, temp bool) (*types.CrossLink, error) { + bytes, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum, temp) if err != nil { - return common.Hash{}, err + return nil, err } - hash := common.Hash{} - hash.SetBytes(h) - return hash, nil + crossLink, err := types.DeserializeCrossLink(bytes) + + return crossLink, err +} + +// WriteShardLastCrossLink saves the last crosslink of a shard +// temp=true is to write the just received cross link that's not committed into blockchain with consensus +func (bc *BlockChain) WriteShardLastCrossLink(shardID uint32, cl types.CrossLink) error { + return rawdb.WriteShardLastCrossLink(bc.db, cl.ShardID(), cl.Serialize()) +} + +// ReadShardLastCrossLink retrieves the last crosslink of a shard. +func (bc *BlockChain) ReadShardLastCrossLink(shardID uint32) (*types.CrossLink, error) { + bytes, err := rawdb.ReadShardLastCrossLink(bc.db, shardID) + if err != nil { + return nil, err + } + crossLink, err := types.DeserializeCrossLink(bytes) + + return crossLink, err +} + +// ReadLastShardCrossLink retrieves last crosslink of a shard. +func (bc *BlockChain) ReadLastShardCrossLink(shardID uint32, blockNum uint64, temp bool) (*types.CrossLink, error) { + bytes, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum, temp) + if err != nil { + return nil, err + } + crossLink, err := types.DeserializeCrossLink(bytes) + + return crossLink, err } // IsSameLeaderAsPreviousBlock retrieves a block from the database by number, caching it diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 7c23a91f4..3d5f5729f 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -504,15 +504,33 @@ func WriteEpochVdfBlockNum(db DatabaseWriter, epoch *big.Int, data []byte) error } // ReadCrossLinkShardBlock retrieves the blockHash given shardID and blockNum -func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64) ([]byte, error) { +func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64, temp bool) ([]byte, error) { + if temp { + // cross link received but haven't committed into blockchain with consensus + return db.Get(tempCrosslinkKey(shardID, blockNum)) + } return db.Get(crosslinkKey(shardID, blockNum)) } // WriteCrossLinkShardBlock stores the blockHash given shardID and blockNum -func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte) error { +func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte, temp bool) error { + if temp { + // cross link received but haven't committed into blockchain with consensus + return db.Put(tempCrosslinkKey(shardID, blockNum), data) + } return db.Put(crosslinkKey(shardID, blockNum), data) } +// ReadShardLastCrossLink read the last cross link of a shard +func ReadShardLastCrossLink(db DatabaseReader, shardID uint32) ([]byte, error) { + return db.Get(shardLastCrosslinkKey(shardID)) +} + +// WriteShardLastCrossLink stores the last cross link of a shard +func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) error { + return db.Put(shardLastCrosslinkKey(shardID), data) +} + // ReadCXReceipts retrieves all the transaction receipts belonging to a block. func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) types.CXReceipts { // Retrieve the flattened receipt slice diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 7ac2a233c..817a4a0bb 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -60,7 +60,9 @@ var ( preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db - crosslinkPrefix = []byte("crosslink") // prefix for crosslink + shardLastCrosslinkPrefix = []byte("shard-last-cross-link") // prefix for shard last crosslink + crosslinkPrefix = []byte("crosslink") // prefix for crosslink + tempCrosslinkPrefix = []byte("tempCrosslink") // prefix for tempCrosslink cxReceiptPrefix = []byte("cxReceipt") // prefix for cross shard transaction receipt @@ -167,6 +169,13 @@ func epochVdfBlockNumberKey(epoch *big.Int) []byte { return append(epochVdfBlockNumberPrefix, epoch.Bytes()...) } +func shardLastCrosslinkKey(shardID uint32) []byte { + sbKey := make([]byte, 4) + binary.BigEndian.PutUint32(sbKey, shardID) + key := append(crosslinkPrefix, sbKey...) + return key +} + func crosslinkKey(shardID uint32, blockNum uint64) []byte { sbKey := make([]byte, 12) binary.BigEndian.PutUint32(sbKey, shardID) @@ -175,6 +184,14 @@ func crosslinkKey(shardID uint32, blockNum uint64) []byte { return key } +func tempCrosslinkKey(shardID uint32, blockNum uint64) []byte { + sbKey := make([]byte, 12) + binary.BigEndian.PutUint32(sbKey, shardID) + binary.BigEndian.PutUint64(sbKey[4:], blockNum) + key := append(tempCrosslinkPrefix, sbKey...) + return key +} + // cxReceiptKey = cxReceiptsPrefix + shardID + num (uint64 big endian) + hash func cxReceiptKey(shardID uint32, number uint64, hash common.Hash) []byte { sKey := make([]byte, 4) diff --git a/core/state_processor.go b/core/state_processor.go index 965e54f24..09bc9a165 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -124,7 +124,7 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo //receipt.Logs = statedb.GetLogs(tx.Hash()) receipt.Bloom = types.CreateBloom(types.Receipts{receipt}) - cxReceipt := &types.CXReceipt{tx.Hash(), msg.Nonce(), msg.From(), *msg.To(), tx.ShardID(), tx.ToShardID(), msg.Value()} + cxReceipt := &types.CXReceipt{tx.Hash(), msg.Nonce(), msg.From(), msg.To(), tx.ShardID(), tx.ToShardID(), msg.Value()} return receipt, cxReceipt, gas, err } diff --git a/core/types/block.go b/core/types/block.go index 7abb71ee3..5048b2ba5 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -94,7 +94,7 @@ type Header struct { Vrf []byte `json:"vrf"` Vdf []byte `json:"vdf"` ShardState []byte `json:"shardState"` - CrossLink []byte `json:"crossLink"` + CrossLinks []byte `json:"crossLink"` } // field type overrides for gencodec diff --git a/core/types/crosslink.go b/core/types/crosslink.go index bb0363c26..ba5412039 100644 --- a/core/types/crosslink.go +++ b/core/types/crosslink.go @@ -1,36 +1,68 @@ package types import ( + "math/big" "sort" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/common" ) // CrossLink is only used on beacon chain to store the hash links from other shards type CrossLink struct { - shardID uint32 - blockNum uint64 - hash common.Hash + ChainHeader *Header +} + +// NewCrossLink returns a new cross link object +func NewCrossLink(header *Header) CrossLink { + return CrossLink{header} +} + +// Header returns header +func (cl CrossLink) Header() *Header { + return cl.ChainHeader } // ShardID returns shardID func (cl CrossLink) ShardID() uint32 { - return cl.shardID + return cl.ChainHeader.ShardID } // BlockNum returns blockNum -func (cl CrossLink) BlockNum() uint64 { - return cl.blockNum +func (cl CrossLink) BlockNum() *big.Int { + return cl.ChainHeader.Number } // Hash returns hash func (cl CrossLink) Hash() common.Hash { - return cl.hash + return cl.ChainHeader.Hash() +} + +// StateRoot returns hash of state root +func (cl CrossLink) StateRoot() common.Hash { + return cl.ChainHeader.Root +} + +// CxReceiptsRoot returns hash of cross shard receipts +func (cl CrossLink) CxReceiptsRoot() common.Hash { + return cl.ChainHeader.CXReceiptHash +} + +// Serialize returns bytes of cross link rlp-encoded content +func (cl CrossLink) Serialize() []byte { + bytes, _ := rlp.EncodeToBytes(cl) + return bytes } -// Bytes returns bytes of the hash -func (cl CrossLink) Bytes() []byte { - return cl.hash[:] +// DeserializeCrossLink rlp-decode the bytes into cross link object. +func DeserializeCrossLink(bytes []byte) (*CrossLink, error) { + cl := &CrossLink{} + err := rlp.DecodeBytes(bytes, cl) + if err != nil { + return nil, err + } + return cl, err } // CrossLinks is a collection of cross links @@ -39,6 +71,6 @@ type CrossLinks []CrossLink // Sort crosslinks by shardID and then by blockNum func (cls CrossLinks) Sort() { sort.Slice(cls, func(i, j int) bool { - return cls[i].shardID < cls[j].shardID || (cls[i].shardID == cls[j].shardID && cls[i].blockNum < cls[j].blockNum) + return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].BlockNum().Cmp(cls[j].BlockNum()) < 0) }) } diff --git a/core/types/cx_receipt.go b/core/types/cx_receipt.go index 0f3ca7d41..8ab2ce81f 100644 --- a/core/types/cx_receipt.go +++ b/core/types/cx_receipt.go @@ -12,7 +12,7 @@ type CXReceipt struct { TxHash common.Hash // hash of the cross shard transaction in source shard Nonce uint64 From common.Address - To common.Address + To *common.Address ShardID uint32 ToShardID uint32 Amount *big.Int @@ -33,12 +33,12 @@ func (cs CXReceipts) GetRlp(i int) []byte { return enc } -// ShardID returns the destination shardID of the cxReceipt +// ToShardID returns the destination shardID of the cxReceipt func (cs CXReceipts) ToShardID(i int) uint32 { return cs[i].ToShardID } // NewCrossShardReceipt creates a cross shard receipt -func NewCrossShardReceipt(txHash common.Hash, nonce uint64, from common.Address, to common.Address, shardID uint32, toShardID uint32, amount *big.Int) *CXReceipt { +func NewCrossShardReceipt(txHash common.Hash, nonce uint64, from common.Address, to *common.Address, shardID uint32, toShardID uint32, amount *big.Int) *CXReceipt { return &CXReceipt{TxHash: txHash, Nonce: nonce, From: from, To: to, ShardID: shardID, ToShardID: toShardID, Amount: amount} } diff --git a/core/types/receipt.go b/core/types/receipt.go index d1633b60f..5d0541d3b 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -208,7 +208,7 @@ func (r Receipts) GetRlp(i int) []byte { return bytes } -// ShardID returns 0, arbitrary value +// ToShardID returns 0, arbitrary value // This function is NOT used, just to compatible with DerivableList interface func (r Receipts) ToShardID(i int) uint32 { _ = r[i] diff --git a/core/types/transaction.go b/core/types/transaction.go index c15cef2f7..1015837e7 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -378,7 +378,7 @@ func (s Transactions) GetRlp(i int) []byte { return enc } -// ShardID returns the destination shardID of given transaction +// ToShardID returns the destination shardID of given transaction func (s Transactions) ToShardID(i int) uint32 { return s[i].data.ToShardID } diff --git a/node/node.go b/node/node.go index 779eb7b7b..926217e26 100644 --- a/node/node.go +++ b/node/node.go @@ -89,6 +89,8 @@ type Node struct { pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus pendingTxMutex sync.Mutex DRand *drand.DRand // The instance for distributed randomness protocol + pendingCrossLinks []*types.Header + pendingClMutex sync.Mutex // Shard databases shardChains shardchain.Collection @@ -347,16 +349,16 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc node.AddContractKeyAndAddress(scFaucet) } - if node.Consensus.ShardID == 0 { - // Contracts only exist in beacon chain - if node.isFirstTime { - // Setup one time smart contracts - node.CurrentStakes = make(map[common.Address]*structs.StakeInfo) - node.AddStakingContractToPendingTransactions() //This will save the latest information about staked nodes in current staked - } else { - node.AddContractKeyAndAddress(scStaking) - } - } + //if node.Consensus.ShardID == 0 { + // // Contracts only exist in beacon chain + // if node.isFirstTime { + // // Setup one time smart contracts + // node.CurrentStakes = make(map[common.Address]*structs.StakeInfo) + // node.AddStakingContractToPendingTransactions() //This will save the latest information about staked nodes in current staked + // } else { + // node.AddContractKeyAndAddress(scStaking) + // } + //} node.ContractCaller = contracts.NewContractCaller(node.Blockchain(), node.Blockchain().Config()) diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 364af0ebd..e34fcf95c 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -1,23 +1,138 @@ package node import ( - "github.com/ethereum/go-ethereum/rlp" + "encoding/binary" + "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/harmony/core/types" + bls_cosi "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" ) // ProcessHeaderMessage verify and process Node/Header message into crosslink when it's valid func (node *Node) ProcessHeaderMessage(msgPayload []byte) { - var headers []*types.Header - err := rlp.DecodeBytes(msgPayload, &headers) - if err != nil { - utils.Logger().Error(). - Err(err). - Msg("Crosslink Headers Broadcast Unable to Decode") - return + if node.NodeConfig.ShardID == 0 { + + var headers []*types.Header + err := rlp.DecodeBytes(msgPayload, &headers) + if err != nil { + utils.Logger().Error(). + Err(err). + Msg("Crosslink Headers Broadcast Unable to Decode") + return + } + + utils.Logger().Debug(). + Msgf("[ProcessingHeader NUM] %d", len(headers)) + // Try to reprocess all the pending cross links + node.pendingClMutex.Lock() + crossLinkHeadersToProcess := node.pendingCrossLinks + node.pendingCrossLinks = []*types.Header{} + node.pendingClMutex.Unlock() + + crossLinkHeadersToProcess = append(crossLinkHeadersToProcess, headers...) + + headersToQuque := []*types.Header{} + + for _, header := range crossLinkHeadersToProcess { + + utils.Logger().Debug(). + Msgf("[ProcessingHeader] 1 shardID %d, blockNum %d", header.ShardID, header.Number.Uint64()) + exist, err := node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64(), false) + if err == nil && exist != nil { + // Cross link already exists, skip + continue + } + + if header.Number.Uint64() > 0 { // Blindly trust the first cross-link + // Sanity check on the previous link with the new link + previousLink, err := node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64()-1, false) + if err != nil { + previousLink, err = node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64()-1, true) + if err != nil { + headersToQuque = append(headersToQuque, header) + continue + } + } + + err = node.VerifyCrosslinkHeader(previousLink.Header(), header) + if err != nil { + utils.Logger().Warn(). + Err(err). + Msgf("Failed to verify new cross link header for shardID %d, blockNum %d", header.ShardID, header.Number) + continue + } + } + + crossLink := types.NewCrossLink(header) + utils.Logger().Debug(). + Msgf("[ProcessingHeader] committing shardID %d, blockNum %d", header.ShardID, header.Number.Uint64()) + node.Blockchain().WriteCrossLinks(types.CrossLinks{crossLink}, true) + } + + // Queue up the cross links that's in the future + node.pendingClMutex.Lock() + node.pendingCrossLinks = append(node.pendingCrossLinks, headersToQuque...) + node.pendingClMutex.Unlock() + } +} + +// VerifyCrosslinkHeader verifies the header is valid against the prevHeader. +func (node *Node) VerifyCrosslinkHeader(prevHeader, header *types.Header) error { + + // TODO: add fork choice rule + if prevHeader.Hash() != header.ParentHash { + return ctxerror.New("[CrossLink] Invalid cross link header - parent hash mismatch", "shardID", header.ShardID, "blockNum", header.Number) + } + + // Verify signature of the new cross link header + shardState, err := node.Blockchain().ReadShardState(header.Epoch) + committee := shardState.FindCommitteeByID(header.ShardID) + + if err != nil || committee == nil { + return ctxerror.New("[CrossLink] Failed to read shard state for cross link header", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err) + } + var committerKeys []*bls.PublicKey + + parseKeysSuccess := true + for _, member := range committee.NodeList { + committerKey := new(bls.PublicKey) + err = member.BlsPublicKey.ToLibBLSPublicKey(committerKey) + if err != nil { + parseKeysSuccess = false + break + } + committerKeys = append(committerKeys, committerKey) + } + if !parseKeysSuccess { + return ctxerror.New("[CrossLink] cannot convert BLS public key", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err) + } + + if header.Number.Uint64() > 1 { // First block doesn't have last sig + mask, err := bls_cosi.NewMask(committerKeys, nil) + if err != nil { + return ctxerror.New("cannot create group sig mask", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err) + } + if err := mask.SetMask(header.LastCommitBitmap); err != nil { + return ctxerror.New("cannot set group sig mask bits", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err) + } + + aggSig := bls.Sign{} + err = aggSig.Deserialize(header.LastCommitSignature[:]) + if err != nil { + return ctxerror.New("unable to deserialize multi-signature from payload").WithCause(err) + } + + blockNumBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(blockNumBytes, header.Number.Uint64()-1) + commitPayload := append(blockNumBytes, header.ParentHash[:]...) + if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) { + return ctxerror.New("Failed to verify the signature for cross link header ", "shardID", header.ShardID, "blockNum", header.Number) + } } - // TODO: add actual logic + return nil } // ProcessReceiptMessage store the receipts and merkle proof in local data store diff --git a/node/node_handler.go b/node/node_handler.go index c47546d34..0f48f4f9f 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -3,6 +3,7 @@ package node import ( "bytes" "context" + "encoding/binary" "errors" "math" "math/big" @@ -29,6 +30,7 @@ import ( "github.com/harmony-one/harmony/contracts/structs" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" + bls_cosi "github.com/harmony-one/harmony/crypto/bls" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" @@ -190,10 +192,10 @@ func (node *Node) messageHandler(content []byte, sender libp2p_peer.ID) { case proto_node.Header: // only beacon chain will accept the header from other shards - if node.Consensus.ShardID != 0 { + utils.Logger().Debug().Msg("NET: received message: Node/Header") + if node.NodeConfig.ShardID != 0 { return } - utils.Logger().Debug().Msg("NET: received message: Node/Header") node.ProcessHeaderMessage(msgPayload[1:]) // skip first byte which is blockMsgType case proto_node.Receipt: @@ -285,10 +287,76 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) { } } +// BroadcastCrossLinkHeader is called by consensus leader to send the new header as cross link to beacon chain. +func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) { + utils.Logger().Info().Msgf("Broadcasting new header to beacon chain groupID %s", node.NodeConfig) + lastThreeHeaders := []*types.Header{} + + block := node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 2) + if block != nil { + lastThreeHeaders = append(lastThreeHeaders, block.Header()) + } + block = node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 1) + if block != nil { + lastThreeHeaders = append(lastThreeHeaders, block.Header()) + } + lastThreeHeaders = append(lastThreeHeaders, newBlock.Header()) + + node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(lastThreeHeaders))) +} + // VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on func (node *Node) VerifyNewBlock(newBlock *types.Block) error { // TODO ek – where do we verify parent-child invariants, // e.g. "child.Number == child.IsGenesis() ? 0 : parent.Number+1"? + // Verify lastCommitSig + if newBlock.NumberU64() > 1 { + header := newBlock.Header() + shardState, err := node.Blockchain().ReadShardState(header.Epoch) + committee := shardState.FindCommitteeByID(header.ShardID) + + if err != nil || committee == nil { + return ctxerror.New("[VerifyNewBlock] Failed to read shard state for cross link header", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err) + } + var committerKeys []*bls.PublicKey + + parseKeysSuccess := true + for _, member := range committee.NodeList { + committerKey := new(bls.PublicKey) + err = member.BlsPublicKey.ToLibBLSPublicKey(committerKey) + if err != nil { + parseKeysSuccess = false + break + } + committerKeys = append(committerKeys, committerKey) + } + if !parseKeysSuccess { + return ctxerror.New("[VerifyNewBlock] cannot convert BLS public key", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err) + } + + mask, err := bls_cosi.NewMask(node.Consensus.PublicKeys, nil) + if err != nil { + return ctxerror.New("[VerifyNewBlock] cannot create group sig mask", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err) + } + if err := mask.SetMask(header.LastCommitBitmap); err != nil { + return ctxerror.New("[VerifyNewBlock] cannot set group sig mask bits", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err) + } + + aggSig := bls.Sign{} + err = aggSig.Deserialize(header.LastCommitSignature[:]) + if err != nil { + return ctxerror.New("[VerifyNewBlock] unable to deserialize multi-signature from payload").WithCause(err) + } + + blockNumBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(blockNumBytes, header.Number.Uint64()-1) + commitPayload := append(blockNumBytes, header.ParentHash[:]...) + if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) { + return ctxerror.New("[VerifyNewBlock] Failed to verify the signature for last commit sig", "shardID", header.ShardID, "blockNum", header.Number, "keys", committerKeys, "consensusPubKeys", node.Consensus.PublicKeys) + } + } + // End Verify lastCommitSig + if newBlock.ShardID() != node.Blockchain().ShardID() { return ctxerror.New("wrong shard ID", "my shard ID", node.Blockchain().ShardID(), @@ -302,6 +370,56 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error { ).WithCause(err) } + // Verify cross links + if node.NodeConfig.ShardID == 0 && len(newBlock.Header().CrossLinks) > 0 { + crossLinks := &types.CrossLinks{} + err := rlp.DecodeBytes(newBlock.Header().CrossLinks, crossLinks) + if err != nil { + return ctxerror.New("[CrossLinkVerification] failed to decode cross links", + "blockHash", newBlock.Hash(), + "crossLinks", len(newBlock.Header().CrossLinks), + ).WithCause(err) + } + for i, crossLink := range *crossLinks { + lastLink := &types.CrossLink{} + if i == 0 { + if crossLink.BlockNum().Uint64() > 0 { + lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID()) + if err != nil { + return ctxerror.New("[CrossLinkVerification] no last cross link found 1", + "blockHash", newBlock.Hash(), + "crossLink", lastLink, + ).WithCause(err) + } + } else { + lastLink = &crossLink + } + } else { + if (*crossLinks)[i-1].Header().ShardID != crossLink.Header().ShardID { + lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID()) + if err != nil { + return ctxerror.New("[CrossLinkVerification] no last cross link found 2", + "blockHash", newBlock.Hash(), + "crossLink", lastLink, + ).WithCause(err) + } + } else { + lastLink = &(*crossLinks)[i-1] + } + } + + if crossLink.BlockNum().Uint64() != 0 { // TODO: verify genesis block + err = node.VerifyCrosslinkHeader(lastLink.Header(), crossLink.Header()) + if err != nil { + return ctxerror.New("cannot ValidateNewBlock", + "blockHash", newBlock.Hash(), + "numTx", len(newBlock.Transactions()), + ).WithCause(err) + } + } + } + } + // TODO: verify the vrf randomness // _ = newBlock.Header().Vrf @@ -426,6 +544,7 @@ func (node *Node) validateNewShardState(block *types.Block, stakeInfo *map[commo func (node *Node) PostConsensusProcessing(newBlock *types.Block) { if node.Consensus.PubKey.IsEqual(node.Consensus.LeaderPubKey) { node.BroadcastNewBlock(newBlock) + node.BroadcastCrossLinkHeader(newBlock) } else { utils.Logger().Info(). Uint64("ViewID", node.Consensus.GetViewID()). diff --git a/node/node_newblock.go b/node/node_newblock.go index 74135dfff..b423804c7 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -4,6 +4,8 @@ import ( "math/big" "time" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/core" @@ -87,7 +89,71 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch } } - newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase) + if node.NodeConfig.ShardID == 0 { + curBlock := node.Blockchain().CurrentBlock() + numShards := core.ShardingSchedule.InstanceForEpoch(curBlock.Header().Epoch).NumShards() + + shardCrossLinks := make([]types.CrossLinks, numShards) + + for i := 0; i < int(numShards); i++ { + curShardID := uint32(i) + lastLink, err := node.Blockchain().ReadShardLastCrossLink(curShardID) + + blockNum := big.NewInt(0) + blockNumoffset := 0 + if err == nil && lastLink != nil { + blockNumoffset = 1 + blockNum = lastLink.BlockNum() + } + + for true { + link, err := node.Blockchain().ReadCrossLink(curShardID, blockNum.Uint64()+uint64(blockNumoffset), true) + if err != nil || link == nil { + break + } + + if link.BlockNum().Uint64() > 1 { + err := node.VerifyCrosslinkHeader(lastLink.Header(), link.Header()) + if err != nil { + utils.Logger().Debug(). + Err(err). + Msgf("[CrossLink] Failed verifying temp cross link %d", link.BlockNum().Uint64()) + break + } + lastLink = link + } + shardCrossLinks[i] = append(shardCrossLinks[i], *link) + + blockNumoffset++ + } + + } + + crossLinksToPropose := types.CrossLinks{} + for _, crossLinks := range shardCrossLinks { + crossLinksToPropose = append(crossLinksToPropose, crossLinks...) + } + if len(crossLinksToPropose) != 0 { + crossLinksToPropose.Sort() + + data, err := rlp.EncodeToBytes(crossLinksToPropose) + if err != nil { + utils.Logger().Debug(). + Err(err). + Msg("Failed encoding cross links") + continue + } + newBlock, err = node.Worker.CommitWithCrossLinks(sig, mask, viewID, coinbase, data) + utils.Logger().Debug(). + Uint64("blockNum", newBlock.NumberU64()). + Int("numCrossLinks", len(crossLinksToPropose)). + Msg("Successfully added cross links into new block") + } else { + newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase) + } + } else { + newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase) + } if err != nil { ctxerror.Log15(utils.GetLogger().Error, diff --git a/node/worker/worker.go b/node/worker/worker.go index 449d7ed9b..3fe3cd727 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -158,8 +158,8 @@ func (w *Worker) GetCurrentCXReceipts() []*types.CXReceipt { return w.current.cxs } -// Commit generate a new block for the new txs. -func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase common.Address) (*types.Block, error) { +// CommitWithCrossLinks generate a new block with cross links for the new txs. +func (w *Worker) CommitWithCrossLinks(sig []byte, signers []byte, viewID uint64, coinbase common.Address, crossLinks []byte) (*types.Block, error) { if len(sig) > 0 && len(signers) > 0 { copy(w.current.header.LastCommitSignature[:], sig[:]) w.current.header.LastCommitBitmap = append(signers[:0:0], signers...) @@ -167,6 +167,7 @@ func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase comm w.current.header.Coinbase = coinbase w.current.header.ViewID = new(big.Int) w.current.header.ViewID.SetUint64(viewID) + w.current.header.CrossLinks = crossLinks s := w.current.state.Copy() @@ -178,6 +179,11 @@ func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase comm return block, nil } +// Commit generate a new block for the new txs. +func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase common.Address) (*types.Block, error) { + return w.CommitWithCrossLinks(sig, signers, viewID, coinbase, []byte{}) +} + // New create a new worker object. func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus_engine.Engine, shardID uint32) *Worker { worker := &Worker{