Merge branch 'cross-shard' of github.com:harmony-one/harmony into cross-shard

pull/1357/head
chao 5 years ago
commit d90116d2cd
  1. 11
      api/proto/node/node.go
  2. 1
      cmd/harmony/main.go
  3. 1
      consensus/consensus_v2.go
  4. 65
      core/blockchain.go
  5. 23
      core/rawdb/accessors_chain.go
  6. 19
      core/rawdb/schema.go
  7. 3
      core/state_transition.go
  8. 2
      core/types/block.go
  9. 54
      core/types/crosslink.go
  10. 2
      internal/configs/sharding/localnet.go
  11. 12
      internal/genesis/localnodes.go
  12. 22
      node/node.go
  13. 154
      node/node_cross_shard.go
  14. 129
      node/node_handler.go
  15. 68
      node/node_newblock.go
  16. 10
      node/worker/worker.go
  17. 2
      test/configs/local.txt

@ -165,6 +165,17 @@ func ConstructBlocksSyncMessage(blocks []*types.Block) []byte {
return byteBuffer.Bytes()
}
// ConstructCrossLinkHeadersMessage constructs cross link header message to send to beacon chain
func ConstructCrossLinkHeadersMessage(headers []*types.Header) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(Block))
byteBuffer.WriteByte(byte(Header))
headersData, _ := rlp.EncodeToBytes(headers)
byteBuffer.Write(headersData)
return byteBuffer.Bytes()
}
// ConstructEpochShardStateMessage contructs epoch shard state message
func ConstructEpochShardStateMessage(epochShardState types.EpochShardState) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})

@ -302,6 +302,7 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// TODO: refactor the creation of blockchain out of node.New()
currentConsensus.ChainReader = currentNode.Blockchain()
currentNode.NodeConfig.SetBeaconGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(0)))
if *isExplorer {
currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode)
currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(*shardID)))

@ -571,6 +571,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
}
// Construct and send the commit message
// TODO: should only sign on block hash
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, consensus.blockNum)
commitPayload := append(blockNumBytes, consensus.blockHash[:]...)

@ -1119,10 +1119,24 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
err = bc.WriteShardStateBytes(epoch, header.ShardState)
if err != nil {
header.Logger(utils.Logger()).Warn().Err(err).Msg("cannot store shard state")
return n, err
}
}
if len(header.CrossLinks) > 0 {
crossLinks := &types.CrossLinks{}
err = rlp.DecodeBytes(header.CrossLinks, crossLinks)
if err != nil {
header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain] cannot parse cross links")
return n, err
}
for _, crossLink := range *crossLinks {
bc.WriteCrossLinks(types.CrossLinks{crossLink}, false)
bc.WriteShardLastCrossLink(crossLink.ShardID(), crossLink)
}
}
}
}
return n, err
}
@ -1971,23 +1985,54 @@ func (bc *BlockChain) WriteEpochVdfBlockNum(epoch *big.Int, blockNum *big.Int) e
}
// WriteCrossLinks saves the hashes of crosslinks by shardID and blockNum combination key
func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink) error {
// temp=true is to write the just received cross link that's not committed into blockchain with consensus
func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink, temp bool) error {
var err error
for _, cl := range cls {
err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum(), cl.Bytes())
for i := 0; i < len(cls); i++ {
cl := cls[i]
err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum().Uint64(), cl.Serialize(), temp)
}
return err
}
// ReadCrossLinkHash retrieves crosslink hash given shardID and blockNum
func (bc *BlockChain) ReadCrossLinkHash(shardID uint32, blockNum uint64) (common.Hash, error) {
h, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum)
// ReadCrossLink retrieves crosslink given shardID and blockNum.
// temp=true is to retrieve the just received cross link that's not committed into blockchain with consensus
func (bc *BlockChain) ReadCrossLink(shardID uint32, blockNum uint64, temp bool) (*types.CrossLink, error) {
bytes, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum, temp)
if err != nil {
return common.Hash{}, err
return nil, err
}
hash := common.Hash{}
hash.SetBytes(h)
return hash, nil
crossLink, err := types.DeserializeCrossLink(bytes)
return crossLink, err
}
// WriteShardLastCrossLink saves the last crosslink of a shard
// temp=true is to write the just received cross link that's not committed into blockchain with consensus
func (bc *BlockChain) WriteShardLastCrossLink(shardID uint32, cl types.CrossLink) error {
return rawdb.WriteShardLastCrossLink(bc.db, cl.ShardID(), cl.Serialize())
}
// ReadShardLastCrossLink retrieves the last crosslink of a shard.
func (bc *BlockChain) ReadShardLastCrossLink(shardID uint32) (*types.CrossLink, error) {
bytes, err := rawdb.ReadShardLastCrossLink(bc.db, shardID)
if err != nil {
return nil, err
}
crossLink, err := types.DeserializeCrossLink(bytes)
return crossLink, err
}
// ReadLastShardCrossLink retrieves last crosslink of a shard.
func (bc *BlockChain) ReadLastShardCrossLink(shardID uint32, blockNum uint64, temp bool) (*types.CrossLink, error) {
bytes, err := rawdb.ReadCrossLinkShardBlock(bc.db, shardID, blockNum, temp)
if err != nil {
return nil, err
}
crossLink, err := types.DeserializeCrossLink(bytes)
return crossLink, err
}
// IsSameLeaderAsPreviousBlock retrieves a block from the database by number, caching it

@ -464,7 +464,6 @@ func WriteLastCommits(
if err = db.Put(lastCommitsKey, data); err != nil {
return ctxerror.New("cannot write last commits").WithCause(err)
}
utils.GetLogger().Info("wrote last commits", "numShards", len(data))
return nil
}
@ -504,15 +503,33 @@ func WriteEpochVdfBlockNum(db DatabaseWriter, epoch *big.Int, data []byte) error
}
// ReadCrossLinkShardBlock retrieves the blockHash given shardID and blockNum
func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64) ([]byte, error) {
func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64, temp bool) ([]byte, error) {
if temp {
// cross link received but haven't committed into blockchain with consensus
return db.Get(tempCrosslinkKey(shardID, blockNum))
}
return db.Get(crosslinkKey(shardID, blockNum))
}
// WriteCrossLinkShardBlock stores the blockHash given shardID and blockNum
func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte) error {
func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte, temp bool) error {
if temp {
// cross link received but haven't committed into blockchain with consensus
return db.Put(tempCrosslinkKey(shardID, blockNum), data)
}
return db.Put(crosslinkKey(shardID, blockNum), data)
}
// ReadShardLastCrossLink read the last cross link of a shard
func ReadShardLastCrossLink(db DatabaseReader, shardID uint32) ([]byte, error) {
return db.Get(shardLastCrosslinkKey(shardID))
}
// WriteShardLastCrossLink stores the last cross link of a shard
func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) error {
return db.Put(shardLastCrosslinkKey(shardID), data)
}
// ReadCXReceipts retrieves all the transactions of receipts given destination shardID, number and blockHash
func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) (types.CXReceipts, error) {
data, err := db.Get(cxReceiptKey(shardID, number, hash))

@ -60,7 +60,9 @@ var (
preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage
configPrefix = []byte("ethereum-config-") // config prefix for the db
crosslinkPrefix = []byte("crosslink") // prefix for crosslink
shardLastCrosslinkPrefix = []byte("shard-last-cross-link") // prefix for shard last crosslink
crosslinkPrefix = []byte("crosslink") // prefix for crosslink
tempCrosslinkPrefix = []byte("tempCrosslink") // prefix for tempCrosslink
cxReceiptPrefix = []byte("cxReceipt") // prefix for cross shard transaction receipt
cxReceiptHashPrefix = []byte("cxReceiptHash") // prefix for cross shard transaction receipt hash
@ -168,6 +170,13 @@ func epochVdfBlockNumberKey(epoch *big.Int) []byte {
return append(epochVdfBlockNumberPrefix, epoch.Bytes()...)
}
func shardLastCrosslinkKey(shardID uint32) []byte {
sbKey := make([]byte, 4)
binary.BigEndian.PutUint32(sbKey, shardID)
key := append(crosslinkPrefix, sbKey...)
return key
}
func crosslinkKey(shardID uint32, blockNum uint64) []byte {
sbKey := make([]byte, 12)
binary.BigEndian.PutUint32(sbKey, shardID)
@ -176,6 +185,14 @@ func crosslinkKey(shardID uint32, blockNum uint64) []byte {
return key
}
func tempCrosslinkKey(shardID uint32, blockNum uint64) []byte {
sbKey := make([]byte, 12)
binary.BigEndian.PutUint32(sbKey, shardID)
binary.BigEndian.PutUint64(sbKey[4:], blockNum)
key := append(tempCrosslinkPrefix, sbKey...)
return key
}
// cxReceiptKey = cxReceiptsPrefix + shardID + num (uint64 big endian) + hash
func cxReceiptKey(shardID uint32, number uint64, hash common.Hash) []byte {
sKey := make([]byte, 4)

@ -229,9 +229,8 @@ func (st *StateTransition) TransitionDb() (ret []byte, usedGas uint64, failed bo
if vmerr == vm.ErrInsufficientBalance {
if st.txType != types.AdditionOnly {
return nil, 0, false, vmerr
} else {
vmerr = nil
}
vmerr = nil
}
}
st.refundGas()

@ -95,7 +95,7 @@ type Header struct {
Vrf []byte `json:"vrf"`
Vdf []byte `json:"vdf"`
ShardState []byte `json:"shardState"`
CrossLink []byte `json:"crossLink"`
CrossLinks []byte `json:"crossLink"`
}
// field type overrides for gencodec

@ -1,36 +1,68 @@
package types
import (
"math/big"
"sort"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/common"
)
// CrossLink is only used on beacon chain to store the hash links from other shards
type CrossLink struct {
shardID uint32
blockNum uint64
hash common.Hash
ChainHeader *Header
}
// NewCrossLink returns a new cross link object
func NewCrossLink(header *Header) CrossLink {
return CrossLink{header}
}
// Header returns header
func (cl CrossLink) Header() *Header {
return cl.ChainHeader
}
// ShardID returns shardID
func (cl CrossLink) ShardID() uint32 {
return cl.shardID
return cl.ChainHeader.ShardID
}
// BlockNum returns blockNum
func (cl CrossLink) BlockNum() uint64 {
return cl.blockNum
func (cl CrossLink) BlockNum() *big.Int {
return cl.ChainHeader.Number
}
// Hash returns hash
func (cl CrossLink) Hash() common.Hash {
return cl.hash
return cl.ChainHeader.Hash()
}
// StateRoot returns hash of state root
func (cl CrossLink) StateRoot() common.Hash {
return cl.ChainHeader.Root
}
// CxReceiptsRoot returns hash of cross shard receipts
func (cl CrossLink) CxReceiptsRoot() common.Hash {
return cl.ChainHeader.CXReceiptHash
}
// Serialize returns bytes of cross link rlp-encoded content
func (cl CrossLink) Serialize() []byte {
bytes, _ := rlp.EncodeToBytes(cl)
return bytes
}
// Bytes returns bytes of the hash
func (cl CrossLink) Bytes() []byte {
return cl.hash[:]
// DeserializeCrossLink rlp-decode the bytes into cross link object.
func DeserializeCrossLink(bytes []byte) (*CrossLink, error) {
cl := &CrossLink{}
err := rlp.DecodeBytes(bytes, cl)
if err != nil {
return nil, err
}
return cl, err
}
// CrossLinks is a collection of cross links
@ -39,6 +71,6 @@ type CrossLinks []CrossLink
// Sort crosslinks by shardID and then by blockNum
func (cls CrossLinks) Sort() {
sort.Slice(cls, func(i, j int) bool {
return cls[i].shardID < cls[j].shardID || (cls[i].shardID == cls[j].shardID && cls[i].blockNum < cls[j].blockNum)
return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].BlockNum().Cmp(cls[j].BlockNum()) < 0)
})
}

@ -73,4 +73,4 @@ var localnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(localnetV1Epo
var localnetV0 = MustNewInstance(2, 7, 5, genesis.LocalHarmonyAccounts, genesis.LocalFnAccounts, localnetReshardingEpoch)
var localnetV1 = MustNewInstance(2, 7, 5, genesis.LocalHarmonyAccountsV1, genesis.LocalFnAccountsV1, localnetReshardingEpoch)
var localnetV2 = MustNewInstance(2, 10, 4, genesis.LocalHarmonyAccountsV2, genesis.LocalFnAccountsV2, localnetReshardingEpoch)
var localnetV2 = MustNewInstance(2, 9, 6, genesis.LocalHarmonyAccountsV2, genesis.LocalFnAccountsV2, localnetReshardingEpoch)

@ -38,7 +38,7 @@ var LocalHarmonyAccountsV1 = []DeployAccount{
// LocalFnAccountsV1 are the accounts for the initial FN used for local test.
var LocalFnAccountsV1 = []DeployAccount{
{Index: " 0 ", Address: "one1a50tun737ulcvwy0yvve0pvu5skq0kjargvhwe", BlsPublicKey: "4235d4ae2219093632c61db4f71ff0c32bdb56463845f8477c2086af1fe643194d3709575707148cad4f835f2fc4ea05"},
{Index: " 0 ", Address: "one1a50tun737ulcvwy0yvve0pvu5skq0kjargvhwe", BlsPublicKey: "52ecce5f64db21cbe374c9268188f5d2cdd5bec1a3112276a350349860e35fb81f8cfe447a311e0550d961cf25cb988d"},
{Index: " 1 ", Address: "one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg", BlsPublicKey: "a547a9bf6fdde4f4934cde21473748861a3cc0fe8bbb5e57225a29f483b05b72531f002f8187675743d819c955a86100"},
{Index: " 2 ", Address: "one103q7qe5t2505lypvltkqtddaef5tzfxwsse4z7", BlsPublicKey: "678ec9670899bf6af85b877058bea4fc1301a5a3a376987e826e3ca150b80e3eaadffedad0fedfa111576fa76ded980c"},
{Index: " 3 ", Address: "one129r9pj3sk0re76f7zs3qz92rggmdgjhtwge62k", BlsPublicKey: "63f479f249c59f0486fda8caa2ffb247209489dae009dfde6144ff38c370230963d360dffd318cfb26c213320e89a512"},
@ -54,13 +54,17 @@ var LocalHarmonyAccountsV2 = []DeployAccount{
{Index: " 5 ", Address: "one1est2gxcvavmtnzc7mhd73gzadm3xxcv5zczdtw", BlsPublicKey: "776f3b8704f4e1092a302a60e84f81e476c212d6f458092b696df420ea19ff84a6179e8e23d090b9297dc041600bc100"},
{Index: " 6 ", Address: "one1spshr72utf6rwxseaz339j09ed8p6f8ke370zj", BlsPublicKey: "2d61379e44a772e5757e27ee2b3874254f56073e6bd226eb8b160371cc3c18b8c4977bd3dcb71fd57dc62bf0e143fd08"},
{Index: " 7 ", Address: "one1a0x3d6xpmr6f8wsyaxd9v36pytvp48zckswvv9", BlsPublicKey: "c4e4708b6cf2a2ceeb59981677e9821eebafc5cf483fb5364a28fa604cc0ce69beeed40f3f03815c9e196fdaec5f1097"},
{Index: " 8 ", Address: "one1d2rngmem4x2c6zxsjjz29dlah0jzkr0k2n88wc", BlsPublicKey: "86dc2fdc2ceec18f6923b99fd86a68405c132e1005cf1df72dca75db0adfaeb53d201d66af37916d61f079f34f21fb96"},
{Index: " 9 ", Address: "one1658znfwf40epvy7e46cqrmzyy54h4n0qa73nep", BlsPublicKey: "49d15743b36334399f9985feb0753430a2b287b2d68b84495bbb15381854cbf01bca9d1d9f4c9c8f18509b2bfa6bd40f"},
{Index: " 10 ", Address: "one1z05g55zamqzfw9qs432n33gycdmyvs38xjemyl", BlsPublicKey: "95117937cd8c09acd2dfae847d74041a67834ea88662a7cbed1e170350bc329e53db151e5a0ef3e712e35287ae954818"},
{Index: " 11 ", Address: "one1ljznytjyn269azvszjlcqvpcj6hjm822yrcp2e", BlsPublicKey: "68ae289d73332872ec8d04ac256ca0f5453c88ad392730c5741b6055bc3ec3d086ab03637713a29f459177aaa8340615"},
}
// LocalFnAccountsV2 are the accounts for the initial FN used for local test.
var LocalFnAccountsV2 = []DeployAccount{
{Index: " 0 ", Address: "one1a50tun737ulcvwy0yvve0pvu5skq0kjargvhwe", BlsPublicKey: "52ecce5f64db21cbe374c9268188f5d2cdd5bec1a3112276a350349860e35fb81f8cfe447a311e0550d961cf25cb988d"},
{Index: " 1 ", Address: "one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg", BlsPublicKey: "1c1fb28d2de96e82c3d9b4917eb54412517e2763112a3164862a6ed627ac62e87ce274bb4ea36e6a61fb66a15c263a06"},
{Index: " 2 ", Address: "one103q7qe5t2505lypvltkqtddaef5tzfxwsse4z7", BlsPublicKey: "b179c4fdc0bee7bd0b6698b792837dd13404d3f985b59d4a9b1cd0641a76651e271518b61abbb6fbebd4acf963358604"},
{Index: " 1 ", Address: "one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg", BlsPublicKey: "a547a9bf6fdde4f4934cde21473748861a3cc0fe8bbb5e57225a29f483b05b72531f002f8187675743d819c955a86100"},
{Index: " 2 ", Address: "one103q7qe5t2505lypvltkqtddaef5tzfxwsse4z7", BlsPublicKey: "678ec9670899bf6af85b877058bea4fc1301a5a3a376987e826e3ca150b80e3eaadffedad0fedfa111576fa76ded980c"},
{Index: " 3 ", Address: "one129r9pj3sk0re76f7zs3qz92rggmdgjhtwge62k", BlsPublicKey: "63f479f249c59f0486fda8caa2ffb247209489dae009dfde6144ff38c370230963d360dffd318cfb26c213320e89a512"},
{Index: " 4 ", Address: "one1d2rngmem4x2c6zxsjjz29dlah0jzkr0k2n88wc", BlsPublicKey: "16513c487a6bb76f37219f3c2927a4f281f9dd3fd6ed2e3a64e500de6545cf391dd973cc228d24f9bd01efe94912e714"},
{Index: " 5 ", Address: "one1658znfwf40epvy7e46cqrmzyy54h4n0qa73nep", BlsPublicKey: "576d3c48294e00d6be4a22b07b66a870ddee03052fe48a5abbd180222e5d5a1f8946a78d55b025de21635fd743bbad90"},
@ -68,6 +72,4 @@ var LocalFnAccountsV2 = []DeployAccount{
{Index: " 7 ", Address: "one1d7jfnr6yraxnrycgaemyktkmhmajhp8kl0yahv", BlsPublicKey: "f47238daef97d60deedbde5302d05dea5de67608f11f406576e363661f7dcbc4a1385948549b31a6c70f6fde8a391486"},
{Index: " 8 ", Address: "one1r4zyyjqrulf935a479sgqlpa78kz7zlcg2jfen", BlsPublicKey: "fc4b9c535ee91f015efff3f32fbb9d32cdd9bfc8a837bb3eee89b8fff653c7af2050a4e147ebe5c7233dc2d5df06ee0a"},
{Index: " 9 ", Address: "one1p7ht2d4kl8ve7a8jxw746yfnx4wnfxtp8jqxwe", BlsPublicKey: "ca86e551ee42adaaa6477322d7db869d3e203c00d7b86c82ebee629ad79cb6d57b8f3db28336778ec2180e56a8e07296"},
{Index: " 10 ", Address: "one1z05g55zamqzfw9qs432n33gycdmyvs38xjemyl", BlsPublicKey: "95117937cd8c09acd2dfae847d74041a67834ea88662a7cbed1e170350bc329e53db151e5a0ef3e712e35287ae954818"},
{Index: " 11 ", Address: "one1ljznytjyn269azvszjlcqvpcj6hjm822yrcp2e", BlsPublicKey: "68ae289d73332872ec8d04ac256ca0f5453c88ad392730c5741b6055bc3ec3d086ab03637713a29f459177aaa8340615"},
}

@ -89,6 +89,8 @@ type Node struct {
pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus
pendingTxMutex sync.Mutex
DRand *drand.DRand // The instance for distributed randomness protocol
pendingCrossLinks []*types.Header
pendingClMutex sync.Mutex
// Shard databases
shardChains shardchain.Collection
@ -347,16 +349,16 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
node.AddContractKeyAndAddress(scFaucet)
}
if node.Consensus.ShardID == 0 {
// Contracts only exist in beacon chain
if node.isFirstTime {
// Setup one time smart contracts
node.CurrentStakes = make(map[common.Address]*structs.StakeInfo)
node.AddStakingContractToPendingTransactions() //This will save the latest information about staked nodes in current staked
} else {
node.AddContractKeyAndAddress(scStaking)
}
}
//if node.Consensus.ShardID == 0 {
// // Contracts only exist in beacon chain
// if node.isFirstTime {
// // Setup one time smart contracts
// node.CurrentStakes = make(map[common.Address]*structs.StakeInfo)
// node.AddStakingContractToPendingTransactions() //This will save the latest information about staked nodes in current staked
// } else {
// node.AddContractKeyAndAddress(scStaking)
// }
//}
node.ContractCaller = contracts.NewContractCaller(node.Blockchain(), node.Blockchain().Config())

@ -1,9 +1,12 @@
package node
import (
"encoding/binary"
"bytes"
"encoding/base64"
"encoding/binary"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
@ -12,20 +15,133 @@ import (
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
)
// ProcessHeaderMessage verify and process Node/Header message into crosslink when it's valid
func (node *Node) ProcessHeaderMessage(msgPayload []byte) {
var headers []*types.Header
err := rlp.DecodeBytes(msgPayload, &headers)
if err != nil {
utils.Logger().Error().
Err(err).
Msg("Crosslink Headers Broadcast Unable to Decode")
return
if node.NodeConfig.ShardID == 0 {
var headers []*types.Header
err := rlp.DecodeBytes(msgPayload, &headers)
if err != nil {
utils.Logger().Error().
Err(err).
Msg("Crosslink Headers Broadcast Unable to Decode")
return
}
utils.Logger().Debug().
Msgf("[ProcessingHeader NUM] %d", len(headers))
// Try to reprocess all the pending cross links
node.pendingClMutex.Lock()
crossLinkHeadersToProcess := node.pendingCrossLinks
node.pendingCrossLinks = []*types.Header{}
node.pendingClMutex.Unlock()
crossLinkHeadersToProcess = append(crossLinkHeadersToProcess, headers...)
headersToQuque := []*types.Header{}
for _, header := range crossLinkHeadersToProcess {
utils.Logger().Debug().
Msgf("[ProcessingHeader] 1 shardID %d, blockNum %d", header.ShardID, header.Number.Uint64())
exist, err := node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64(), false)
if err == nil && exist != nil {
// Cross link already exists, skip
continue
}
if header.Number.Uint64() > 0 { // Blindly trust the first cross-link
// Sanity check on the previous link with the new link
previousLink, err := node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64()-1, false)
if err != nil {
previousLink, err = node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64()-1, true)
if err != nil {
headersToQuque = append(headersToQuque, header)
continue
}
}
err = node.VerifyCrosslinkHeader(previousLink.Header(), header)
if err != nil {
utils.Logger().Warn().
Err(err).
Msgf("Failed to verify new cross link header for shardID %d, blockNum %d", header.ShardID, header.Number)
continue
}
}
crossLink := types.NewCrossLink(header)
utils.Logger().Debug().
Msgf("[ProcessingHeader] committing shardID %d, blockNum %d", header.ShardID, header.Number.Uint64())
node.Blockchain().WriteCrossLinks(types.CrossLinks{crossLink}, true)
}
// Queue up the cross links that's in the future
node.pendingClMutex.Lock()
node.pendingCrossLinks = append(node.pendingCrossLinks, headersToQuque...)
node.pendingClMutex.Unlock()
}
}
// VerifyCrosslinkHeader verifies the header is valid against the prevHeader.
func (node *Node) VerifyCrosslinkHeader(prevHeader, header *types.Header) error {
// TODO: add fork choice rule
if prevHeader.Hash() != header.ParentHash {
return ctxerror.New("[CrossLink] Invalid cross link header - parent hash mismatch", "shardID", header.ShardID, "blockNum", header.Number)
}
// Verify signature of the new cross link header
shardState, err := node.Blockchain().ReadShardState(prevHeader.Epoch)
committee := shardState.FindCommitteeByID(prevHeader.ShardID)
if err != nil || committee == nil {
return ctxerror.New("[CrossLink] Failed to read shard state for cross link header", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
// TODO: add actual logic
var committerKeys []*bls.PublicKey
parseKeysSuccess := true
for _, member := range committee.NodeList {
committerKey := new(bls.PublicKey)
err = member.BlsPublicKey.ToLibBLSPublicKey(committerKey)
if err != nil {
parseKeysSuccess = false
break
}
committerKeys = append(committerKeys, committerKey)
}
if !parseKeysSuccess {
return ctxerror.New("[CrossLink] cannot convert BLS public key", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
if header.Number.Uint64() > 1 { // First block doesn't have last sig
mask, err := bls_cosi.NewMask(committerKeys, nil)
if err != nil {
return ctxerror.New("cannot create group sig mask", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
if err := mask.SetMask(header.LastCommitBitmap); err != nil {
return ctxerror.New("cannot set group sig mask bits", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
aggSig := bls.Sign{}
err = aggSig.Deserialize(header.LastCommitSignature[:])
if err != nil {
return ctxerror.New("unable to deserialize multi-signature from payload").WithCause(err)
}
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, header.Number.Uint64()-1)
commitPayload := append(blockNumBytes, header.ParentHash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
return ctxerror.New("Failed to verify the signature for cross link header ", "shardID", header.ShardID, "blockNum", header.Number)
}
}
return nil
}
// ProcessReceiptMessage store the receipts and merkle proof in local data store
@ -43,16 +159,16 @@ func (node *Node) ProcessReceiptMessage(msgPayload []byte) {
if len(merkleProof.ShardID) == 0 {
utils.Logger().Warn().Msg("[ProcessReceiptMessage] There is No non-empty destination shards")
return
} else {
for j := 0; j < len(merkleProof.ShardID); j++ {
sKey := make([]byte, 4)
binary.BigEndian.PutUint32(sKey, merkleProof.ShardID[j])
byteBuffer.Write(sKey)
byteBuffer.Write(merkleProof.CXShardHash[j][:])
if merkleProof.ShardID[j] == node.Consensus.ShardID {
foundMyShard = true
myShardRoot = merkleProof.CXShardHash[j]
}
}
for j := 0; j < len(merkleProof.ShardID); j++ {
sKey := make([]byte, 4)
binary.BigEndian.PutUint32(sKey, merkleProof.ShardID[j])
byteBuffer.Write(sKey)
byteBuffer.Write(merkleProof.CXShardHash[j][:])
if merkleProof.ShardID[j] == node.Consensus.ShardID {
foundMyShard = true
myShardRoot = merkleProof.CXShardHash[j]
}
}

@ -3,6 +3,7 @@ package node
import (
"bytes"
"context"
"encoding/binary"
"errors"
"math"
"math/big"
@ -29,6 +30,7 @@ import (
"github.com/harmony-one/harmony/contracts/structs"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
@ -190,10 +192,10 @@ func (node *Node) messageHandler(content []byte, sender libp2p_peer.ID) {
case proto_node.Header:
// only beacon chain will accept the header from other shards
if node.Consensus.ShardID != 0 {
utils.Logger().Debug().Msg("NET: received message: Node/Header")
if node.NodeConfig.ShardID != 0 {
return
}
utils.Logger().Debug().Msg("NET: received message: Node/Header")
node.ProcessHeaderMessage(msgPayload[1:]) // skip first byte which is blockMsgType
case proto_node.Receipt:
@ -285,6 +287,24 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
}
}
// BroadcastCrossLinkHeader is called by consensus leader to send the new header as cross link to beacon chain.
func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) {
utils.Logger().Info().Msgf("Broadcasting new header to beacon chain groupID %s", node.NodeConfig)
lastThreeHeaders := []*types.Header{}
block := node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 2)
if block != nil {
lastThreeHeaders = append(lastThreeHeaders, block.Header())
}
block = node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 1)
if block != nil {
lastThreeHeaders = append(lastThreeHeaders, block.Header())
}
lastThreeHeaders = append(lastThreeHeaders, newBlock.Header())
node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(lastThreeHeaders)))
}
// BroadcastCXReceipts broadcasts cross shard receipts to correspoding
// destination shards
func (node *Node) BroadcastCXReceipts(newBlock *types.Block) {
@ -313,13 +333,65 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block) {
groupID := p2p.ShardID(i)
go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsMessage(cxReceipts, merkleProof)))
}
}
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on
func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
// TODO ek – where do we verify parent-child invariants,
// e.g. "child.Number == child.IsGenesis() ? 0 : parent.Number+1"?
// Verify lastCommitSig
if newBlock.NumberU64() > 1 {
header := newBlock.Header()
parentBlock := node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 1)
if parentBlock == nil {
return ctxerror.New("[VerifyNewBlock] Failed to get parent block", "shardID", header.ShardID, "blockNum", header.Number)
}
parentHeader := parentBlock.Header()
shardState, err := node.Blockchain().ReadShardState(parentHeader.Epoch)
committee := shardState.FindCommitteeByID(parentHeader.ShardID)
if err != nil || committee == nil {
return ctxerror.New("[VerifyNewBlock] Failed to read shard state for cross link header", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
var committerKeys []*bls.PublicKey
parseKeysSuccess := true
for _, member := range committee.NodeList {
committerKey := new(bls.PublicKey)
err = member.BlsPublicKey.ToLibBLSPublicKey(committerKey)
if err != nil {
parseKeysSuccess = false
break
}
committerKeys = append(committerKeys, committerKey)
}
if !parseKeysSuccess {
return ctxerror.New("[VerifyNewBlock] cannot convert BLS public key", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
mask, err := bls_cosi.NewMask(committerKeys, nil)
if err != nil {
return ctxerror.New("[VerifyNewBlock] cannot create group sig mask", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
if err := mask.SetMask(header.LastCommitBitmap); err != nil {
return ctxerror.New("[VerifyNewBlock] cannot set group sig mask bits", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
aggSig := bls.Sign{}
err = aggSig.Deserialize(header.LastCommitSignature[:])
if err != nil {
return ctxerror.New("[VerifyNewBlock] unable to deserialize multi-signature from payload").WithCause(err)
}
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, header.Number.Uint64()-1)
commitPayload := append(blockNumBytes, header.ParentHash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
return ctxerror.New("[VerifyNewBlock] Failed to verify the signature for last commit sig", "shardID", header.ShardID, "blockNum", header.Number)
}
}
// End Verify lastCommitSig
if newBlock.ShardID() != node.Blockchain().ShardID() {
return ctxerror.New("wrong shard ID",
"my shard ID", node.Blockchain().ShardID(),
@ -333,6 +405,56 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
).WithCause(err)
}
// Verify cross links
if node.NodeConfig.ShardID == 0 && len(newBlock.Header().CrossLinks) > 0 {
crossLinks := &types.CrossLinks{}
err := rlp.DecodeBytes(newBlock.Header().CrossLinks, crossLinks)
if err != nil {
return ctxerror.New("[CrossLinkVerification] failed to decode cross links",
"blockHash", newBlock.Hash(),
"crossLinks", len(newBlock.Header().CrossLinks),
).WithCause(err)
}
for i, crossLink := range *crossLinks {
lastLink := &types.CrossLink{}
if i == 0 {
if crossLink.BlockNum().Uint64() > 0 {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 1",
"blockHash", newBlock.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
} else {
lastLink = &crossLink
}
} else {
if (*crossLinks)[i-1].Header().ShardID != crossLink.Header().ShardID {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 2",
"blockHash", newBlock.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
} else {
lastLink = &(*crossLinks)[i-1]
}
}
if crossLink.BlockNum().Uint64() != 0 { // TODO: verify genesis block
err = node.VerifyCrosslinkHeader(lastLink.Header(), crossLink.Header())
if err != nil {
return ctxerror.New("cannot ValidateNewBlock",
"blockHash", newBlock.Hash(),
"numTx", len(newBlock.Transactions()),
).WithCause(err)
}
}
}
}
// TODO: verify the vrf randomness
// _ = newBlock.Header().Vrf
@ -465,6 +587,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
if node.Consensus.PubKey.IsEqual(node.Consensus.LeaderPubKey) {
node.BroadcastNewBlock(newBlock)
node.BroadcastCrossLinkHeader(newBlock)
node.BroadcastCXReceipts(newBlock)
} else {
utils.Logger().Info().

@ -4,6 +4,8 @@ import (
"math/big"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core"
@ -87,7 +89,71 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
}
}
newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase)
if node.NodeConfig.ShardID == 0 {
curBlock := node.Blockchain().CurrentBlock()
numShards := core.ShardingSchedule.InstanceForEpoch(curBlock.Header().Epoch).NumShards()
shardCrossLinks := make([]types.CrossLinks, numShards)
for i := 0; i < int(numShards); i++ {
curShardID := uint32(i)
lastLink, err := node.Blockchain().ReadShardLastCrossLink(curShardID)
blockNum := big.NewInt(0)
blockNumoffset := 0
if err == nil && lastLink != nil {
blockNumoffset = 1
blockNum = lastLink.BlockNum()
}
for true {
link, err := node.Blockchain().ReadCrossLink(curShardID, blockNum.Uint64()+uint64(blockNumoffset), true)
if err != nil || link == nil {
break
}
if link.BlockNum().Uint64() > 1 {
err := node.VerifyCrosslinkHeader(lastLink.Header(), link.Header())
if err != nil {
utils.Logger().Debug().
Err(err).
Msgf("[CrossLink] Failed verifying temp cross link %d", link.BlockNum().Uint64())
break
}
lastLink = link
}
shardCrossLinks[i] = append(shardCrossLinks[i], *link)
blockNumoffset++
}
}
crossLinksToPropose := types.CrossLinks{}
for _, crossLinks := range shardCrossLinks {
crossLinksToPropose = append(crossLinksToPropose, crossLinks...)
}
if len(crossLinksToPropose) != 0 {
crossLinksToPropose.Sort()
data, err := rlp.EncodeToBytes(crossLinksToPropose)
if err != nil {
utils.Logger().Debug().
Err(err).
Msg("Failed encoding cross links")
continue
}
newBlock, err = node.Worker.CommitWithCrossLinks(sig, mask, viewID, coinbase, data)
utils.Logger().Debug().
Uint64("blockNum", newBlock.NumberU64()).
Int("numCrossLinks", len(crossLinksToPropose)).
Msg("Successfully added cross links into new block")
} else {
newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase)
}
} else {
newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase)
}
if err != nil {
ctxerror.Log15(utils.GetLogger().Error,

@ -166,8 +166,8 @@ func (w *Worker) GetCurrentCXReceipts() []*types.CXReceipt {
return w.current.cxs
}
// Commit generate a new block for the new txs.
func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase common.Address) (*types.Block, error) {
// CommitWithCrossLinks generate a new block with cross links for the new txs.
func (w *Worker) CommitWithCrossLinks(sig []byte, signers []byte, viewID uint64, coinbase common.Address, crossLinks []byte) (*types.Block, error) {
if len(sig) > 0 && len(signers) > 0 {
copy(w.current.header.LastCommitSignature[:], sig[:])
w.current.header.LastCommitBitmap = append(signers[:0:0], signers...)
@ -175,6 +175,7 @@ func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase comm
w.current.header.Coinbase = coinbase
w.current.header.ViewID = new(big.Int)
w.current.header.ViewID.SetUint64(viewID)
w.current.header.CrossLinks = crossLinks
s := w.current.state.Copy()
@ -186,6 +187,11 @@ func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase comm
return block, nil
}
// Commit generate a new block for the new txs.
func (w *Worker) Commit(sig []byte, signers []byte, viewID uint64, coinbase common.Address) (*types.Block, error) {
return w.CommitWithCrossLinks(sig, signers, viewID, coinbase, []byte{})
}
// New create a new worker object.
func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus_engine.Engine, shardID uint32) *Worker {
worker := &Worker{

@ -6,7 +6,7 @@
127.0.0.1 9005 validator one1est2gxcvavmtnzc7mhd73gzadm3xxcv5zczdtw 776f3b8704f4e1092a302a60e84f81e476c212d6f458092b696df420ea19ff84a6179e8e23d090b9297dc041600bc100
127.0.0.1 9006 validator one1spshr72utf6rwxseaz339j09ed8p6f8ke370zj 2d61379e44a772e5757e27ee2b3874254f56073e6bd226eb8b160371cc3c18b8c4977bd3dcb71fd57dc62bf0e143fd08
127.0.0.1 9007 validator one1a0x3d6xpmr6f8wsyaxd9v36pytvp48zckswvv9 c4e4708b6cf2a2ceeb59981677e9821eebafc5cf483fb5364a28fa604cc0ce69beeed40f3f03815c9e196fdaec5f1097
127.0.0.1 9008 validator one1d2rngmem4x2c6zxsjjz29dlah0jzkr0k2n88wc 6dc2fdc2ceec18f6923b99fd86a68405c132e1005cf1df72dca75db0adfaeb53d201d66af37916d61f079f34f21fb96
127.0.0.1 9008 validator one1d2rngmem4x2c6zxsjjz29dlah0jzkr0k2n88wc 86dc2fdc2ceec18f6923b99fd86a68405c132e1005cf1df72dca75db0adfaeb53d201d66af37916d61f079f34f21fb96
127.0.0.1 9009 validator one1658znfwf40epvy7e46cqrmzyy54h4n0qa73nep 49d15743b36334399f9985feb0753430a2b287b2d68b84495bbb15381854cbf01bca9d1d9f4c9c8f18509b2bfa6bd40f
127.0.0.1 9010 validator one1a50tun737ulcvwy0yvve0pvu5skq0kjargvhwe 52ecce5f64db21cbe374c9268188f5d2cdd5bec1a3112276a350349860e35fb81f8cfe447a311e0550d961cf25cb988d
127.0.0.1 9011 validator one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg a547a9bf6fdde4f4934cde21473748861a3cc0fe8bbb5e57225a29f483b05b72531f002f8187675743d819c955a86100

Loading…
Cancel
Save