Add crosslink support on beacon chain

pull/1348/head
Rongjian Lan 5 years ago
parent ab8fad1905
commit 1edbcb0db7
  1. 11
      api/proto/node/node.go
  2. 1
      cmd/harmony/main.go
  3. 1
      consensus/consensus_v2.go
  4. 63
      core/blockchain.go
  5. 22
      core/rawdb/accessors_chain.go
  6. 19
      core/rawdb/schema.go
  7. 2
      core/state_processor.go
  8. 2
      core/types/block.go
  9. 54
      core/types/crosslink.go
  10. 6
      core/types/cx_receipt.go
  11. 2
      core/types/receipt.go
  12. 2
      core/types/transaction.go
  13. 22
      node/node.go
  14. 133
      node/node_cross_shard.go
  15. 123
      node/node_handler.go
  16. 68
      node/node_newblock.go
  17. 10
      node/worker/worker.go

@ -159,6 +159,17 @@ func ConstructBlocksSyncMessage(blocks []*types.Block) []byte {
return byteBuffer.Bytes()
}
// ConstructCrossLinkHeadersMessage constructs cross link header message to send blocks to beacon chain
func ConstructCrossLinkHeadersMessage(headers []*types.Header) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(Block))
byteBuffer.WriteByte(byte(Header))
headersData, _ := rlp.EncodeToBytes(headers)
byteBuffer.Write(headersData)
return byteBuffer.Bytes()
}
// ConstructEpochShardStateMessage contructs epoch shard state message
func ConstructEpochShardStateMessage(epochShardState types.EpochShardState) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})

@ -302,6 +302,7 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// TODO: refactor the creation of blockchain out of node.New()
currentConsensus.ChainReader = currentNode.Blockchain()
currentNode.NodeConfig.SetBeaconGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(0)))
if *isExplorer {
currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode)
currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(*shardID)))

@ -571,6 +571,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
}
// Construct and send the commit message
// TODO: should only sign on block hash
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, consensus.blockNum)
commitPayload := append(blockNumBytes, consensus.blockHash[:]...)

@ -1110,8 +1110,20 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
header.Logger(utils.Logger()).Warn().Err(err).Msg("cannot store shard state")
}
}
if len(header.CrossLinks) > 0 {
crossLinks := &types.CrossLinks{}
err := rlp.DecodeBytes(header.CrossLinks, crossLinks)
if err != nil {
header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain] cannot parse cross links")
}
for _, crossLink := range *crossLinks {
bc.WriteCrossLinks(types.CrossLinks{crossLink}, false)
bc.WriteShardLastCrossLink(crossLink.ShardID(), crossLink)
}
}
}
}
return n, err
}
@ -1960,23 +1972,54 @@ func (bc *BlockChain) WriteEpochVdfBlockNum(epoch *big.Int, blockNum *big.Int) e
}
// WriteCrossLinks saves the hashes of crosslinks by shardID and blockNum combination key
func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink) error {
// temp=true is to write the just received cross link that's not committed into blockchain with consensus
func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink, temp bool) error {
var err error
for _, cl := range cls {
err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum(), cl.Bytes())
for i := 0; i < len(cls); i++ {
cl := cls[i]
err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum().Uint64(), cl.Serialize(), temp)
}
return err
}
// ReadCrossLinkHash retrieves crosslink hash given shardID and blockNum
func (bc *BlockChain) ReadCrossLinkHash(shardID uint32, blockNum uint64) (common.Hash, error) {
h, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum)
// ReadCrossLink retrieves crosslink given shardID and blockNum.
// temp=true is to retrieve the just received cross link that's not committed into blockchain with consensus
func (bc *BlockChain) ReadCrossLink(shardID uint32, blockNum uint64, temp bool) (*types.CrossLink, error) {
bytes, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum, temp)
if err != nil {
return common.Hash{}, err
return nil, err
}
hash := common.Hash{}
hash.SetBytes(h)
return hash, nil
crossLink, err := types.DeserializeCrossLink(bytes)
return crossLink, err
}
// WriteShardLastCrossLink saves the last crosslink of a shard
// temp=true is to write the just received cross link that's not committed into blockchain with consensus
func (bc *BlockChain) WriteShardLastCrossLink(shardID uint32, cl types.CrossLink) error {
return rawdb.WriteShardLastCrossLink(bc.db, cl.ShardID(), cl.Serialize())
}
// ReadShardLastCrossLink retrieves the last crosslink of a shard.
func (bc *BlockChain) ReadShardLastCrossLink(shardID uint32) (*types.CrossLink, error) {
bytes, err := rawdb.ReadShardLastCrossLink(bc.db, shardID)
if err != nil {
return nil, err
}
crossLink, err := types.DeserializeCrossLink(bytes)
return crossLink, err
}
// ReadLastShardCrossLink retrieves last crosslink of a shard.
func (bc *BlockChain) ReadLastShardCrossLink(shardID uint32, blockNum uint64, temp bool) (*types.CrossLink, error) {
bytes, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum, temp)
if err != nil {
return nil, err
}
crossLink, err := types.DeserializeCrossLink(bytes)
return crossLink, err
}
// IsSameLeaderAsPreviousBlock retrieves a block from the database by number, caching it

@ -504,15 +504,33 @@ func WriteEpochVdfBlockNum(db DatabaseWriter, epoch *big.Int, data []byte) error
}
// ReadCrossLinkShardBlock retrieves the blockHash given shardID and blockNum
func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64) ([]byte, error) {
func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64, temp bool) ([]byte, error) {
if temp {
// cross link received but haven't committed into blockchain with consensus
return db.Get(tempCrosslinkKey(shardID, blockNum))
}
return db.Get(crosslinkKey(shardID, blockNum))
}
// WriteCrossLinkShardBlock stores the blockHash given shardID and blockNum
func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte) error {
func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte, temp bool) error {
if temp {
// cross link received but haven't committed into blockchain with consensus
return db.Put(tempCrosslinkKey(shardID, blockNum), data)
}
return db.Put(crosslinkKey(shardID, blockNum), data)
}
// ReadShardLastCrossLink read the last cross link of a shard
func ReadShardLastCrossLink(db DatabaseReader, shardID uint32) ([]byte, error) {
return db.Get(shardLastCrosslinkKey(shardID))
}
// WriteShardLastCrossLink stores the last cross link of a shard
func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) error {
return db.Put(shardLastCrosslinkKey(shardID), data)
}
// ReadCXReceipts retrieves all the transaction receipts belonging to a block.
func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) types.CXReceipts {
// Retrieve the flattened receipt slice

@ -60,7 +60,9 @@ var (
preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage
configPrefix = []byte("ethereum-config-") // config prefix for the db
crosslinkPrefix = []byte("crosslink") // prefix for crosslink
shardLastCrosslinkPrefix = []byte("shard-last-cross-link") // prefix for shard last crosslink
crosslinkPrefix = []byte("crosslink") // prefix for crosslink
tempCrosslinkPrefix = []byte("tempCrosslink") // prefix for tempCrosslink
cxReceiptPrefix = []byte("cxReceipt") // prefix for cross shard transaction receipt
@ -167,6 +169,13 @@ func epochVdfBlockNumberKey(epoch *big.Int) []byte {
return append(epochVdfBlockNumberPrefix, epoch.Bytes()...)
}
func shardLastCrosslinkKey(shardID uint32) []byte {
sbKey := make([]byte, 4)
binary.BigEndian.PutUint32(sbKey, shardID)
key := append(crosslinkPrefix, sbKey...)
return key
}
func crosslinkKey(shardID uint32, blockNum uint64) []byte {
sbKey := make([]byte, 12)
binary.BigEndian.PutUint32(sbKey, shardID)
@ -175,6 +184,14 @@ func crosslinkKey(shardID uint32, blockNum uint64) []byte {
return key
}
func tempCrosslinkKey(shardID uint32, blockNum uint64) []byte {
sbKey := make([]byte, 12)
binary.BigEndian.PutUint32(sbKey, shardID)
binary.BigEndian.PutUint64(sbKey[4:], blockNum)
key := append(tempCrosslinkPrefix, sbKey...)
return key
}
// cxReceiptKey = cxReceiptsPrefix + shardID + num (uint64 big endian) + hash
func cxReceiptKey(shardID uint32, number uint64, hash common.Hash) []byte {
sKey := make([]byte, 4)

@ -124,7 +124,7 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo
//receipt.Logs = statedb.GetLogs(tx.Hash())
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
cxReceipt := &types.CXReceipt{tx.Hash(), msg.Nonce(), msg.From(), *msg.To(), tx.ShardID(), tx.ToShardID(), msg.Value()}
cxReceipt := &types.CXReceipt{tx.Hash(), msg.Nonce(), msg.From(), msg.To(), tx.ShardID(), tx.ToShardID(), msg.Value()}
return receipt, cxReceipt, gas, err
}

@ -94,7 +94,7 @@ type Header struct {
Vrf []byte `json:"vrf"`
Vdf []byte `json:"vdf"`
ShardState []byte `json:"shardState"`
CrossLink []byte `json:"crossLink"`
CrossLinks []byte `json:"crossLink"`
}
// field type overrides for gencodec

@ -1,36 +1,68 @@
package types
import (
"math/big"
"sort"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/common"
)
// CrossLink is only used on beacon chain to store the hash links from other shards
type CrossLink struct {
shardID uint32
blockNum uint64
hash common.Hash
ChainHeader *Header
}
// NewCrossLink returns a new cross link object
func NewCrossLink(header *Header) CrossLink {
return CrossLink{header}
}
// Header returns header
func (cl CrossLink) Header() *Header {
return cl.ChainHeader
}
// ShardID returns shardID
func (cl CrossLink) ShardID() uint32 {
return cl.shardID
return cl.ChainHeader.ShardID
}
// BlockNum returns blockNum
func (cl CrossLink) BlockNum() uint64 {
return cl.blockNum
func (cl CrossLink) BlockNum() *big.Int {
return cl.ChainHeader.Number
}
// Hash returns hash
func (cl CrossLink) Hash() common.Hash {
return cl.hash
return cl.ChainHeader.Hash()
}
// StateRoot returns hash of state root
func (cl CrossLink) StateRoot() common.Hash {
return cl.ChainHeader.Root
}
// CxReceiptsRoot returns hash of cross shard receipts
func (cl CrossLink) CxReceiptsRoot() common.Hash {
return cl.ChainHeader.CXReceiptHash
}
// Serialize returns bytes of cross link rlp-encoded content
func (cl CrossLink) Serialize() []byte {
bytes, _ := rlp.EncodeToBytes(cl)
return bytes
}
// Bytes returns bytes of the hash
func (cl CrossLink) Bytes() []byte {
return cl.hash[:]
// DeserializeCrossLink rlp-decode the bytes into cross link object.
func DeserializeCrossLink(bytes []byte) (*CrossLink, error) {
cl := &CrossLink{}
err := rlp.DecodeBytes(bytes, cl)
if err != nil {
return nil, err
}
return cl, err
}
// CrossLinks is a collection of cross links
@ -39,6 +71,6 @@ type CrossLinks []CrossLink
// Sort crosslinks by shardID and then by blockNum
func (cls CrossLinks) Sort() {
sort.Slice(cls, func(i, j int) bool {
return cls[i].shardID < cls[j].shardID || (cls[i].shardID == cls[j].shardID && cls[i].blockNum < cls[j].blockNum)
return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].BlockNum().Cmp(cls[j].BlockNum()) < 0)
})
}

@ -12,7 +12,7 @@ type CXReceipt struct {
TxHash common.Hash // hash of the cross shard transaction in source shard
Nonce uint64
From common.Address
To common.Address
To *common.Address
ShardID uint32
ToShardID uint32
Amount *big.Int
@ -33,12 +33,12 @@ func (cs CXReceipts) GetRlp(i int) []byte {
return enc
}
// ShardID returns the destination shardID of the cxReceipt
// ToShardID returns the destination shardID of the cxReceipt
func (cs CXReceipts) ToShardID(i int) uint32 {
return cs[i].ToShardID
}
// NewCrossShardReceipt creates a cross shard receipt
func NewCrossShardReceipt(txHash common.Hash, nonce uint64, from common.Address, to common.Address, shardID uint32, toShardID uint32, amount *big.Int) *CXReceipt {
func NewCrossShardReceipt(txHash common.Hash, nonce uint64, from common.Address, to *common.Address, shardID uint32, toShardID uint32, amount *big.Int) *CXReceipt {
return &CXReceipt{TxHash: txHash, Nonce: nonce, From: from, To: to, ShardID: shardID, ToShardID: toShardID, Amount: amount}
}

@ -208,7 +208,7 @@ func (r Receipts) GetRlp(i int) []byte {
return bytes
}
// ShardID returns 0, arbitrary value
// ToShardID returns 0, arbitrary value
// This function is NOT used, just to compatible with DerivableList interface
func (r Receipts) ToShardID(i int) uint32 {
_ = r[i]

@ -378,7 +378,7 @@ func (s Transactions) GetRlp(i int) []byte {
return enc
}
// ShardID returns the destination shardID of given transaction
// ToShardID returns the destination shardID of given transaction
func (s Transactions) ToShardID(i int) uint32 {
return s[i].data.ToShardID
}

@ -89,6 +89,8 @@ type Node struct {
pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus
pendingTxMutex sync.Mutex
DRand *drand.DRand // The instance for distributed randomness protocol
pendingCrossLinks []*types.Header
pendingClMutex sync.Mutex
// Shard databases
shardChains shardchain.Collection
@ -347,16 +349,16 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
node.AddContractKeyAndAddress(scFaucet)
}
if node.Consensus.ShardID == 0 {
// Contracts only exist in beacon chain
if node.isFirstTime {
// Setup one time smart contracts
node.CurrentStakes = make(map[common.Address]*structs.StakeInfo)
node.AddStakingContractToPendingTransactions() //This will save the latest information about staked nodes in current staked
} else {
node.AddContractKeyAndAddress(scStaking)
}
}
//if node.Consensus.ShardID == 0 {
// // Contracts only exist in beacon chain
// if node.isFirstTime {
// // Setup one time smart contracts
// node.CurrentStakes = make(map[common.Address]*structs.StakeInfo)
// node.AddStakingContractToPendingTransactions() //This will save the latest information about staked nodes in current staked
// } else {
// node.AddContractKeyAndAddress(scStaking)
// }
//}
node.ContractCaller = contracts.NewContractCaller(node.Blockchain(), node.Blockchain().Config())

@ -1,23 +1,138 @@
package node
import (
"github.com/ethereum/go-ethereum/rlp"
"encoding/binary"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
)
// ProcessHeaderMessage verify and process Node/Header message into crosslink when it's valid
func (node *Node) ProcessHeaderMessage(msgPayload []byte) {
var headers []*types.Header
err := rlp.DecodeBytes(msgPayload, &headers)
if err != nil {
utils.Logger().Error().
Err(err).
Msg("Crosslink Headers Broadcast Unable to Decode")
return
if node.NodeConfig.ShardID == 0 {
var headers []*types.Header
err := rlp.DecodeBytes(msgPayload, &headers)
if err != nil {
utils.Logger().Error().
Err(err).
Msg("Crosslink Headers Broadcast Unable to Decode")
return
}
utils.Logger().Debug().
Msgf("[ProcessingHeader NUM] %d", len(headers))
// Try to reprocess all the pending cross links
node.pendingClMutex.Lock()
crossLinkHeadersToProcess := node.pendingCrossLinks
node.pendingCrossLinks = []*types.Header{}
node.pendingClMutex.Unlock()
crossLinkHeadersToProcess = append(crossLinkHeadersToProcess, headers...)
headersToQuque := []*types.Header{}
for _, header := range crossLinkHeadersToProcess {
utils.Logger().Debug().
Msgf("[ProcessingHeader] 1 shardID %d, blockNum %d", header.ShardID, header.Number.Uint64())
exist, err := node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64(), false)
if err == nil && exist != nil {
// Cross link already exists, skip
continue
}
if header.Number.Uint64() > 0 { // Blindly trust the first cross-link
// Sanity check on the previous link with the new link
previousLink, err := node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64()-1, false)
if err != nil {
previousLink, err = node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64()-1, true)
if err != nil {
headersToQuque = append(headersToQuque, header)
continue
}
}
err = node.VerifyCrosslinkHeader(previousLink.Header(), header)
if err != nil {
utils.Logger().Warn().
Err(err).
Msgf("Failed to verify new cross link header for shardID %d, blockNum %d", header.ShardID, header.Number)
continue
}
}
crossLink := types.NewCrossLink(header)
utils.Logger().Debug().
Msgf("[ProcessingHeader] committing shardID %d, blockNum %d", header.ShardID, header.Number.Uint64())
node.Blockchain().WriteCrossLinks(types.CrossLinks{crossLink}, true)
}
// Queue up the cross links that's in the future
node.pendingClMutex.Lock()
node.pendingCrossLinks = append(node.pendingCrossLinks, headersToQuque...)
node.pendingClMutex.Unlock()
}
}
// VerifyCrosslinkHeader verifies the header is valid against the prevHeader.
func (node *Node) VerifyCrosslinkHeader(prevHeader, header *types.Header) error {
// TODO: add fork choice rule
if prevHeader.Hash() != header.ParentHash {
return ctxerror.New("[CrossLink] Invalid cross link header - parent hash mismatch", "shardID", header.ShardID, "blockNum", header.Number)
}
// Verify signature of the new cross link header
shardState, err := node.Blockchain().ReadShardState(header.Epoch)
committee := shardState.FindCommitteeByID(header.ShardID)
if err != nil || committee == nil {
return ctxerror.New("[CrossLink] Failed to read shard state for cross link header", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
var committerKeys []*bls.PublicKey
parseKeysSuccess := true
for _, member := range committee.NodeList {
committerKey := new(bls.PublicKey)
err = member.BlsPublicKey.ToLibBLSPublicKey(committerKey)
if err != nil {
parseKeysSuccess = false
break
}
committerKeys = append(committerKeys, committerKey)
}
if !parseKeysSuccess {
return ctxerror.New("[CrossLink] cannot convert BLS public key", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
if header.Number.Uint64() > 1 { // First block doesn't have last sig
mask, err := bls_cosi.NewMask(committerKeys, nil)
if err != nil {
return ctxerror.New("cannot create group sig mask", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
if err := mask.SetMask(header.LastCommitBitmap); err != nil {
return ctxerror.New("cannot set group sig mask bits", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
aggSig := bls.Sign{}
err = aggSig.Deserialize(header.LastCommitSignature[:])
if err != nil {
return ctxerror.New("unable to deserialize multi-signature from payload").WithCause(err)
}
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, header.Number.Uint64()-1)
commitPayload := append(blockNumBytes, header.ParentHash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
return ctxerror.New("Failed to verify the signature for cross link header ", "shardID", header.ShardID, "blockNum", header.Number)
}
}
// TODO: add actual logic
return nil
}
// ProcessReceiptMessage store the receipts and merkle proof in local data store

@ -3,6 +3,7 @@ package node
import (
"bytes"
"context"
"encoding/binary"
"errors"
"math"
"math/big"
@ -29,6 +30,7 @@ import (
"github.com/harmony-one/harmony/contracts/structs"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
@ -190,10 +192,10 @@ func (node *Node) messageHandler(content []byte, sender libp2p_peer.ID) {
case proto_node.Header:
// only beacon chain will accept the header from other shards
if node.Consensus.ShardID != 0 {
utils.Logger().Debug().Msg("NET: received message: Node/Header")
if node.NodeConfig.ShardID != 0 {
return
}
utils.Logger().Debug().Msg("NET: received message: Node/Header")
node.ProcessHeaderMessage(msgPayload[1:]) // skip first byte which is blockMsgType
case proto_node.Receipt:
@ -285,10 +287,76 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
}
}
// BroadcastCrossLinkHeader is called by consensus leader to send the new header as cross link to beacon chain.
func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) {
utils.Logger().Info().Msgf("Broadcasting new header to beacon chain groupID %s", node.NodeConfig)
lastThreeHeaders := []*types.Header{}
block := node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 2)
if block != nil {
lastThreeHeaders = append(lastThreeHeaders, block.Header())
}
block = node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 1)
if block != nil {
lastThreeHeaders = append(lastThreeHeaders, block.Header())
}
lastThreeHeaders = append(lastThreeHeaders, newBlock.Header())
node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(lastThreeHeaders)))
}
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on
func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
// TODO ek – where do we verify parent-child invariants,
// e.g. "child.Number == child.IsGenesis() ? 0 : parent.Number+1"?
// Verify lastCommitSig
if newBlock.NumberU64() > 1 {
header := newBlock.Header()
shardState, err := node.Blockchain().ReadShardState(header.Epoch)
committee := shardState.FindCommitteeByID(header.ShardID)
if err != nil || committee == nil {
return ctxerror.New("[VerifyNewBlock] Failed to read shard state for cross link header", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
var committerKeys []*bls.PublicKey
parseKeysSuccess := true
for _, member := range committee.NodeList {
committerKey := new(bls.PublicKey)
err = member.BlsPublicKey.ToLibBLSPublicKey(committerKey)
if err != nil {
parseKeysSuccess = false
break
}
committerKeys = append(committerKeys, committerKey)
}
if !parseKeysSuccess {
return ctxerror.New("[VerifyNewBlock] cannot convert BLS public key", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
mask, err := bls_cosi.NewMask(node.Consensus.PublicKeys, nil)
if err != nil {
return ctxerror.New("[VerifyNewBlock] cannot create group sig mask", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
if err := mask.SetMask(header.LastCommitBitmap); err != nil {
return ctxerror.New("[VerifyNewBlock] cannot set group sig mask bits", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
aggSig := bls.Sign{}
err = aggSig.Deserialize(header.LastCommitSignature[:])
if err != nil {
return ctxerror.New("[VerifyNewBlock] unable to deserialize multi-signature from payload").WithCause(err)
}
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, header.Number.Uint64()-1)
commitPayload := append(blockNumBytes, header.ParentHash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
return ctxerror.New("[VerifyNewBlock] Failed to verify the signature for last commit sig", "shardID", header.ShardID, "blockNum", header.Number, "keys", committerKeys, "consensusPubKeys", node.Consensus.PublicKeys)
}
}
// End Verify lastCommitSig
if newBlock.ShardID() != node.Blockchain().ShardID() {
return ctxerror.New("wrong shard ID",
"my shard ID", node.Blockchain().ShardID(),
@ -302,6 +370,56 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
).WithCause(err)
}
// Verify cross links
if node.NodeConfig.ShardID == 0 && len(newBlock.Header().CrossLinks) > 0 {
crossLinks := &types.CrossLinks{}
err := rlp.DecodeBytes(newBlock.Header().CrossLinks, crossLinks)
if err != nil {
return ctxerror.New("[CrossLinkVerification] failed to decode cross links",
"blockHash", newBlock.Hash(),
"crossLinks", len(newBlock.Header().CrossLinks),
).WithCause(err)
}
for i, crossLink := range *crossLinks {
lastLink := &types.CrossLink{}
if i == 0 {
if crossLink.BlockNum().Uint64() > 0 {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 1",
"blockHash", newBlock.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
} else {
lastLink = &crossLink
}
} else {
if (*crossLinks)[i-1].Header().ShardID != crossLink.Header().ShardID {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 2",
"blockHash", newBlock.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
} else {
lastLink = &(*crossLinks)[i-1]
}
}
if crossLink.BlockNum().Uint64() != 0 { // TODO: verify genesis block
err = node.VerifyCrosslinkHeader(lastLink.Header(), crossLink.Header())
if err != nil {
return ctxerror.New("cannot ValidateNewBlock",
"blockHash", newBlock.Hash(),
"numTx", len(newBlock.Transactions()),
).WithCause(err)
}
}
}
}
// TODO: verify the vrf randomness
// _ = newBlock.Header().Vrf
@ -426,6 +544,7 @@ func (node *Node) validateNewShardState(block *types.Block, stakeInfo *map[commo
func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
if node.Consensus.PubKey.IsEqual(node.Consensus.LeaderPubKey) {
node.BroadcastNewBlock(newBlock)
node.BroadcastCrossLinkHeader(newBlock)
} else {
utils.Logger().Info().
Uint64("ViewID", node.Consensus.GetViewID()).

@ -4,6 +4,8 @@ import (
"math/big"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core"
@ -87,7 +89,71 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
}
}
newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase)
if node.NodeConfig.ShardID == 0 {
curBlock := node.Blockchain().CurrentBlock()
numShards := core.ShardingSchedule.InstanceForEpoch(curBlock.Header().Epoch).NumShards()
shardCrossLinks := make([]types.CrossLinks, numShards)
for i := 0; i < int(numShards); i++ {
curShardID := uint32(i)
lastLink, err := node.Blockchain().ReadShardLastCrossLink(curShardID)
blockNum := big.NewInt(0)
blockNumoffset := 0
if err == nil && lastLink != nil {
blockNumoffset = 1
blockNum = lastLink.BlockNum()
}
for true {
link, err := node.Blockchain().ReadCrossLink(curShardID, blockNum.Uint64()+uint64(blockNumoffset), true)
if err != nil || link == nil {
break
}
if link.BlockNum().Uint64() > 1 {
err := node.VerifyCrosslinkHeader(lastLink.Header(), link.Header())
if err != nil {
utils.Logger().Debug().
Err(err).
Msgf("[CrossLink] Failed verifying temp cross link %d", link.BlockNum().Uint64())
break
}
lastLink = link
}
shardCrossLinks[i] = append(shardCrossLinks[i], *link)
blockNumoffset++
}
}
crossLinksToPropose := types.CrossLinks{}
for _, crossLinks := range shardCrossLinks {
crossLinksToPropose = append(crossLinksToPropose, crossLinks...)
}
if len(crossLinksToPropose) != 0 {
crossLinksToPropose.Sort()
data, err := rlp.EncodeToBytes(crossLinksToPropose)
if err != nil {
utils.Logger().Debug().
Err(err).
Msg("Failed encoding cross links")
continue
}
newBlock, err = node.Worker.CommitWithCrossLinks(sig, mask, viewID, coinbase, data)
utils.Logger().Debug().
Uint64("blockNum", newBlock.NumberU64()).
Int("numCrossLinks", len(crossLinksToPropose)).
Msg("Successfully added cross links into new block")
} else {
newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase)
}
} else {
newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase)
}
if err != nil {
ctxerror.Log15(utils.GetLogger().Error,

@ -158,8 +158,8 @@ func (w *Worker) GetCurrentCXReceipts() []*types.CXReceipt {
return w.current.cxs
}
// Commit generate a new block for the new txs.
func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase common.Address) (*types.Block, error) {
// CommitWithCrossLinks generate a new block with cross links for the new txs.
func (w *Worker) CommitWithCrossLinks(sig []byte, signers []byte, viewID uint64, coinbase common.Address, crossLinks []byte) (*types.Block, error) {
if len(sig) > 0 && len(signers) > 0 {
copy(w.current.header.LastCommitSignature[:], sig[:])
w.current.header.LastCommitBitmap = append(signers[:0:0], signers...)
@ -167,6 +167,7 @@ func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase comm
w.current.header.Coinbase = coinbase
w.current.header.ViewID = new(big.Int)
w.current.header.ViewID.SetUint64(viewID)
w.current.header.CrossLinks = crossLinks
s := w.current.state.Copy()
@ -178,6 +179,11 @@ func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase comm
return block, nil
}
// Commit generate a new block for the new txs.
func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase common.Address) (*types.Block, error) {
return w.CommitWithCrossLinks(sig, signers, viewID, coinbase, []byte{})
}
// New create a new worker object.
func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus_engine.Engine, shardID uint32) *Worker {
worker := &Worker{

Loading…
Cancel
Save