Add the cross-shard fixes from s3 into master (#2105)

* Add missing file

* Fix cx receipt merkle root computation

* Fix epoch check on cx receipt sanity check

* Fix go gen

* fix log mistake

* Return EmptyRootHash if there is no receipts

* Add back legacy merkle root logic and accept both for verification

* Add missing emptyroothash logic

* fix lint

* Fix lint

* Revert proto change
pull/2127/head
Rongjian Lan 5 years ago committed by GitHub
parent f62f3dce41
commit 55664be681
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      api/proto/node/node.go
  2. 14
      core/block_validator.go
  3. 29
      core/blockchain.go
  4. 1
      core/rawdb/accessors_chain.go
  5. 2
      core/types/block.go
  6. 37
      core/types/cx_receipt.go
  7. 7
      core/types/derive_sha.go
  8. 4
      core/types/gen_cx_receipt_json.go
  9. 4
      node/node.go
  10. 11
      node/node_cross_shard.go
  11. 4
      node/node_handler.go

@ -191,13 +191,11 @@ func ConstructCrossLinkMessage(bc engine.ChainReader, headers []*block.Header) [
// ConstructCXReceiptsProof constructs cross shard receipts and related proof including
// merkle proof, blockHeader and commitSignatures
func ConstructCXReceiptsProof(cxs types.CXReceipts, mkp *types.CXMerkleProof, header *block.Header, commitSig []byte, commitBitmap []byte) []byte {
msg := &types.CXReceiptsProof{Receipts: cxs, MerkleProof: mkp, Header: header, CommitSig: commitSig, CommitBitmap: commitBitmap}
func ConstructCXReceiptsProof(cxReceiptsProof *types.CXReceiptsProof) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(Block))
byteBuffer.WriteByte(byte(Receipt))
by, err := rlp.EncodeToBytes(msg)
by, err := rlp.EncodeToBytes(cxReceiptsProof)
if err != nil {
utils.Logger().Error().Err(err).Msg("[ConstructCXReceiptsProof] Encode CXReceiptsProof Error")

@ -100,10 +100,13 @@ func (v *BlockValidator) ValidateState(block, parent *types.Block, statedb *stat
return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash(), receiptSha)
}
if v.config.HasCrossTxFields(block.Epoch()) {
cxsSha := types.DeriveMultipleShardsSha(cxReceipts)
if v.config.AcceptsCrossTx(block.Epoch()) {
cxsSha := cxReceipts.ComputeMerkleRoot()
if cxsSha != header.OutgoingReceiptHash() {
return fmt.Errorf("invalid cross shard receipt root hash (remote: %x local: %x)", header.OutgoingReceiptHash(), cxsSha)
legacySha := types.DeriveMultipleShardsSha(cxReceipts)
if legacySha != header.OutgoingReceiptHash() {
return fmt.Errorf("invalid cross shard receipt root hash (remote: %x local: %x, legacy: %x)", header.OutgoingReceiptHash(), cxsSha, legacySha)
}
}
}
@ -202,7 +205,7 @@ func (v *BlockValidator) ValidateCXReceiptsProof(cxp *types.CXReceiptsProof) err
}
if !foundMatchingShardID {
return ctxerror.New("[ValidateCXReceiptsProof] Didn't find matching shardID")
return ctxerror.New("[ValidateCXReceiptsProof] Didn't find matching toShardID (no receipts for my shard)")
}
sourceShardID := merkleProof.ShardID
@ -217,6 +220,9 @@ func (v *BlockValidator) ValidateCXReceiptsProof(cxp *types.CXReceiptsProof) err
// (2) verify the outgoingCXReceiptsHash match
outgoingHashFromSourceShard := crypto.Keccak256Hash(byteBuffer.Bytes())
if byteBuffer.Len() == 0 {
outgoingHashFromSourceShard = types.EmptyRootHash
}
if outgoingHashFromSourceShard != merkleProof.CXReceiptHash {
return ctxerror.New("[ValidateCXReceiptsProof] IncomingReceiptRootHash from source shard not match", "sourceShardID", sourceShardID, "sourceBlockNum", sourceBlockNum, "calculated", outgoingHashFromSourceShard, "got", merkleProof.CXReceiptHash)
}

@ -1109,10 +1109,11 @@ func (bc *BlockChain) WriteBlockWithState(
if i == int(block.ShardID()) {
continue
}
shardReceipts := GetToShardReceipts(cxReceipts, uint32(i))
shardReceipts := types.CXReceipts(cxReceipts).GetToShardReceipts(uint32(i))
err := rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts)
if err != nil {
utils.Logger().Debug().Err(err).Interface("shardReceipts", shardReceipts).Int("toShardID", i).Msg("WriteCXReceipts cannot write into database")
utils.Logger().Error().Err(err).Interface("shardReceipts", shardReceipts).Int("toShardID", i).Msg("WriteCXReceipts cannot write into database")
return NonStatTy, err
}
}
@ -2377,18 +2378,6 @@ func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
}
// GetToShardReceipts filters the cross shard receipts with given destination shardID
func GetToShardReceipts(cxReceipts types.CXReceipts, shardID uint32) types.CXReceipts {
cxs := types.CXReceipts{}
for i := range cxReceipts {
cx := cxReceipts[i]
if cx.ToShardID == shardID {
cxs = append(cxs, cx)
}
}
return cxs
}
// ReadCXReceipts retrieves the cross shard transaction receipts of a given shard
func (bc *BlockChain) ReadCXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash) (types.CXReceipts, error) {
cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, blockNum, blockHash)
@ -2398,19 +2387,9 @@ func (bc *BlockChain) ReadCXReceipts(shardID uint32, blockNum uint64, blockHash
return cxs, nil
}
// WriteCXReceipts saves the cross shard transaction receipts of a given shard
func (bc *BlockChain) WriteCXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash, receipts types.CXReceipts) error {
return rawdb.WriteCXReceipts(bc.db, shardID, blockNum, blockHash, receipts)
}
// CXMerkleProof calculates the cross shard transaction merkle proof of a given destination shard
func (bc *BlockChain) CXMerkleProof(shardID uint32, block *types.Block) (*types.CXMerkleProof, error) {
func (bc *BlockChain) CXMerkleProof(toShardID uint32, block *types.Block) (*types.CXMerkleProof, error) {
proof := &types.CXMerkleProof{BlockNum: block.Number(), BlockHash: block.Hash(), ShardID: block.ShardID(), CXReceiptHash: block.Header().OutgoingReceiptHash(), CXShardHashes: []common.Hash{}, ShardIDs: []uint32{}}
cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, block.NumberU64(), block.Hash())
if err != nil || cxs == nil {
return nil, err
}
epoch := block.Header().Epoch()
shardingConfig := shard.Schedule.InstanceForEpoch(epoch)

@ -546,6 +546,7 @@ func DeletePendingCrossLinks(db DatabaseDeleter) error {
func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) (types.CXReceipts, error) {
data, err := db.Get(cxReceiptKey(shardID, number, hash))
if err != nil || len(data) == 0 {
utils.Logger().Info().Err(err).Uint64("number", number).Int("dataLen", len(data)).Msg("ReadCXReceipts")
return nil, err
}
cxReceipts := types.CXReceipts{}

@ -310,7 +310,7 @@ func NewBlock(
b.header.SetBloom(CreateBloom(receipts))
}
b.header.SetOutgoingReceiptHash(DeriveMultipleShardsSha(CXReceipts(outcxs)))
b.header.SetOutgoingReceiptHash(CXReceipts(outcxs).ComputeMerkleRoot())
if len(incxs) == 0 {
b.header.SetIncomingReceiptHash(EmptyRootHash)

@ -1,8 +1,12 @@
package types
import (
"bytes"
"encoding/binary"
"math/big"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
@ -82,6 +86,39 @@ func (cs CXReceipts) MaxToShardID() uint32 {
return maxShardID
}
// ComputeMerkleRoot computes the merkle root of this list of receipts
// The root is the hash of [shardID, receiptsRoot, shardID, receiptsRoot...]
// The receiptRoot is the merkle root hash of the receipts for a specific shards.
func (cs CXReceipts) ComputeMerkleRoot() common.Hash {
byteBuffer := bytes.NewBuffer([]byte{})
for i := 0; i <= int(cs.MaxToShardID()); i++ {
shardReceipts := cs.GetToShardReceipts(uint32(i))
if len(shardReceipts) != 0 {
sKey := make([]byte, 4)
binary.BigEndian.PutUint32(sKey, uint32(i))
byteBuffer.Write(sKey)
hash := DeriveSha(shardReceipts)
byteBuffer.Write(hash[:])
}
}
if byteBuffer.Len() == 0 {
return EmptyRootHash
}
return crypto.Keccak256Hash(byteBuffer.Bytes())
}
// GetToShardReceipts filters the cross shard receipts with given destination shardID
func (cs CXReceipts) GetToShardReceipts(shardID uint32) CXReceipts {
cxs := CXReceipts{}
for i := range cs {
cx := cs[i]
if cx.ToShardID == shardID {
cxs = append(cxs, cx)
}
}
return cxs
}
// CXMerkleProof represents the merkle proof of a collection of ordered cross shard transactions
type CXMerkleProof struct {
BlockNum *big.Int // blockNumber of source shard

@ -20,8 +20,9 @@ import (
"bytes"
"encoding/binary"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
@ -56,6 +57,8 @@ func DeriveSha(list ...DerivableBase) common.Hash {
return trie.Hash()
}
//// Legacy forked logic. Keep as is, but do not use it anymore ->
// DeriveOneShardSha calculates the hash of the trie of
// cross shard transactions with the given destination shard
func DeriveOneShardSha(list DerivableList, shardID uint32) common.Hash {
@ -95,3 +98,5 @@ func DeriveMultipleShardsSha(list DerivableList) common.Hash {
}
return crypto.Keccak256Hash(by)
}
//// <- Legacy forked logic. Keep as is, but do not use it anymore

@ -140,7 +140,7 @@ func (r *CXMerkleProof) UnmarshalJSON(input []byte) error {
func (r CXReceiptsProof) MarshalJSON() ([]byte, error) {
type CXReceiptsProof struct {
Receipts []*CXReceipt `json:"receipts" gencodec:"required"`
MerkleProof *CXMerkleProof `json:"merkleProof" gencodec:"required"`
MerkleProof *CXMerkleProof `json:"merkleProof" gencodec:"required"`
Header *block.Header `json:"header" gencoded:"required"`
CommitSig []byte `json:"commitSig"`
CommitBitmap []byte `json:"commitBitmap"`
@ -158,7 +158,7 @@ func (r CXReceiptsProof) MarshalJSON() ([]byte, error) {
func (r *CXReceiptsProof) UnmarshalJSON(input []byte) error {
type CXReceiptsProof struct {
Receipts []*CXReceipt `json:"receipts" gencodec:"required"`
MerkleProof *CXMerkleProof `json:"merkleProof" gencodec:"required"`
MerkleProof *CXMerkleProof `json:"merkleProof" gencodec:"required"`
Header *block.Header `json:"header" gencoded:"required"`
CommitSig []byte `json:"commitSig"`
CommitBitmap []byte `json:"commitBitmap"`

@ -360,7 +360,7 @@ func (node *Node) AddPendingReceipts(receipts *types.CXReceiptsProof) {
if err := node.Blockchain().Validator().ValidateCXReceiptsProof(receipts); err != nil {
if !strings.Contains(err.Error(), rawdb.MsgNoShardStateFromDB) {
utils.Logger().Error().Err(err).Msg("[proposeReceiptsProof] Invalid CXReceiptsProof")
utils.Logger().Error().Err(err).Msg("[AddPendingReceipts] Invalid CXReceiptsProof")
return
}
}
@ -375,7 +375,7 @@ func (node *Node) AddPendingReceipts(receipts *types.CXReceiptsProof) {
}
if e := receipts.Header.Epoch(); blockNum == 0 ||
!node.Blockchain().Config().IsCrossLink(e) {
!node.Blockchain().Config().AcceptsCrossTx(e) {
utils.Logger().Info().
Uint64("incoming-epoch", e.Uint64()).
Msg("Incoming receipt had meaningless epoch")

@ -39,7 +39,7 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block, lastCommits []byte)
if i == int(myShardID) {
continue
}
go node.BroadcastCXReceiptsWithShardID(newBlock, commitSig, commitBitmap, uint32(i))
node.BroadcastCXReceiptsWithShardID(newBlock, commitSig, commitBitmap, uint32(i))
}
}
@ -50,19 +50,22 @@ func (node *Node) BroadcastCXReceiptsWithShardID(block *types.Block, commitSig [
cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash())
if err != nil || len(cxReceipts) == 0 {
utils.Logger().Info().Err(err).Uint32("ToShardID", toShardID).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceiptsWithShardID] No ReadCXReceipts found")
utils.Logger().Info().Err(err).Uint32("ToShardID", toShardID).Int("numCXReceipts", len(cxReceipts)).Msg("[CXMerkleProof] No receipts found for the destination shard")
return
}
merkleProof, err := node.Blockchain().CXMerkleProof(toShardID, block)
if err != nil {
utils.Logger().Warn().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceiptsWithShardID] Unable to get merkleProof")
return
}
cxReceiptsProof := &types.CXReceiptsProof{Receipts: cxReceipts, MerkleProof: merkleProof, Header: block.Header(), CommitSig: commitSig, CommitBitmap: commitBitmap}
groupID := nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(toShardID))
utils.Logger().Info().Uint32("ToShardID", toShardID).Str("GroupID", string(groupID)).Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof Found")
utils.Logger().Info().Uint32("ToShardID", toShardID).Str("GroupID", string(groupID)).Interface("cxp", cxReceiptsProof).Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof ready. Sending CX receipts...")
// TODO ek – limit concurrency
go node.host.SendMessageToGroups([]nodeconfig.GroupID{groupID}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, block.Header(), commitSig, commitBitmap)))
go node.host.SendMessageToGroups([]nodeconfig.GroupID{groupID}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceiptsProof)))
}
// BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request

@ -372,10 +372,10 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit
Int("numTxns", len(newBlock.Transactions())).
Int("numStakingTxns", len(newBlock.StakingTransactions())).
Msg("BINGO !!! Reached Consensus")
// 15% of the validator also need to do broadcasting
// 1% of the validator also need to do broadcasting
rand.Seed(time.Now().UTC().UnixNano())
rnd := rand.Intn(100)
if rnd < 0 {
if rnd < 1 {
// Beacon validators also broadcast new blocks to make sure beacon sync is strong.
if node.NodeConfig.ShardID == 0 {
node.BroadcastNewBlock(newBlock)

Loading…
Cancel
Save