diff --git a/api/proto/node/node.go b/api/proto/node/node.go index 2e6c34d73..cca97ff2b 100644 --- a/api/proto/node/node.go +++ b/api/proto/node/node.go @@ -35,6 +35,12 @@ type BlockchainSyncMessage struct { BlockHashes []common.Hash } +// CXReceiptsMessage carrys the cross shard receipts and merkle proof +type CXReceiptsMessage struct { + CXS types.CXReceipts + MKP *types.CXMerkleProof +} + // BlockchainSyncMessageType represents BlockchainSyncMessageType type. type BlockchainSyncMessageType int @@ -199,3 +205,20 @@ func DeserializeEpochShardStateFromMessage(payload []byte) (*types.EpochShardSta return epochShardState, nil } + +// ConstructCXReceiptsMessage constructs cross shard receipts and merkle proof +func ConstructCXReceiptsMessage(cxs types.CXReceipts, mkp *types.CXMerkleProof) []byte { + msg := &CXReceiptsMessage{CXS: cxs, MKP: mkp} + + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) + byteBuffer.WriteByte(byte(Block)) + byteBuffer.WriteByte(byte(Receipt)) + by, err := rlp.EncodeToBytes(msg) + + if err != nil { + log.Fatal(err) + return []byte{} + } + byteBuffer.Write(by) + return byteBuffer.Bytes() +} diff --git a/cmd/client/wallet/main.go b/cmd/client/wallet/main.go index 5a3616c87..9943c55a7 100644 --- a/cmd/client/wallet/main.go +++ b/cmd/client/wallet/main.go @@ -719,7 +719,7 @@ func processTransferCommand() { gas, nil, inputData) } else { tx = types.NewCrossShardTransaction( - state.nonce, receiverAddress, fromShard, toShard, amountBigInt, + state.nonce, &receiverAddress, fromShard, toShard, amountBigInt, gas, nil, inputData, types.SubtractionOnly) } diff --git a/core/block_validator.go b/core/block_validator.go index 7aac486ec..74d57e486 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -91,7 +91,7 @@ func (v *BlockValidator) ValidateState(block, parent *types.Block, statedb *stat return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha) } - cxsSha := types.DeriveSha(cxReceipts) + cxsSha := types.DeriveMultipleShardsSha(cxReceipts) if cxsSha != header.CXReceiptHash { return fmt.Errorf("invalid cross shard receipt root hash (remote: %x local: %x)", header.CXReceiptHash, cxsSha) } diff --git a/core/blockchain.go b/core/blockchain.go index a6d279819..38d68a862 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -980,7 +980,7 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e } // WriteBlockWithState writes the block and all associated state to the database. -func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.DB) (status WriteStatus, err error) { +func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, cxReceipts []*types.CXReceipt, state *state.DB) (status WriteStatus, err error) { bc.wg.Add(1) defer bc.wg.Done() @@ -1053,6 +1053,17 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. batch := bc.db.NewBatch() rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) + epoch := block.Header().Epoch + shardingConfig := ShardingSchedule.InstanceForEpoch(epoch) + shardNum := int(shardingConfig.NumShards()) + for i := 0; i < shardNum; i++ { + if i == int(block.ShardID()) { + continue + } + shardReceipts := GetToShardReceipts(cxReceipts, uint32(i)) + rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts) + } + // If the total difficulty is higher than our known, add it to the canonical chain // Second clause in the if statement reduces the vulnerability to selfish mining. // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf @@ -1285,7 +1296,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty proctime := time.Since(bstart) // Write the block to the chain and get the status. - status, err := bc.WriteBlockWithState(block, receipts, state) + status, err := bc.WriteBlockWithState(block, receipts, cxReceipts, state) if err != nil { return i, events, coalescedLogs, err } @@ -2043,3 +2054,52 @@ func (bc *BlockChain) ChainDB() ethdb.Database { func (bc *BlockChain) GetVMConfig() *vm.Config { return &bc.vmConfig } + +// GetToShardReceipts filters the cross shard receipts with given destination shardID +func GetToShardReceipts(cxReceipts types.CXReceipts, shardID uint32) types.CXReceipts { + cxs := types.CXReceipts{} + for i := range cxReceipts { + cx := cxReceipts[i] + if cx.ToShardID == shardID { + cxs = append(cxs, cx) + } + } + return cxs +} + +// CXReceipts retrieves the cross shard transaction receipts of a given shard +func (bc *BlockChain) CXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash) (types.CXReceipts, error) { + cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, blockNum, blockHash) + if err != nil || len(cxs) == 0 { + return nil, err + } + return cxs, nil +} + +// CXMerkleProof calculates the cross shard transaction merkle proof of a given destination shard +func (bc *BlockChain) CXMerkleProof(shardID uint32, block *types.Block) (*types.CXMerkleProof, error) { + proof := &types.CXMerkleProof{BlockHash: block.Hash(), CXReceiptHash: block.Header().CXReceiptHash, CXShardHash: []common.Hash{}, ShardID: []uint32{}} + cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, block.NumberU64(), block.Hash()) + if err != nil || cxs == nil { + return nil, err + } + + epoch := block.Header().Epoch + shardingConfig := ShardingSchedule.InstanceForEpoch(epoch) + shardNum := int(shardingConfig.NumShards()) + + for i := 0; i < shardNum; i++ { + receipts, err := bc.CXReceipts(uint32(i), block.NumberU64(), block.Hash()) + if err != nil || len(receipts) == 0 { + continue + } else { + hash := types.DeriveSha(receipts) + proof.CXShardHash = append(proof.CXShardHash, hash) + proof.ShardID = append(proof.ShardID, uint32(i)) + } + } + if len(proof.ShardID) == 0 { + return nil, nil + } + return proof, nil +} diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 3d5f5729f..5f0430a5d 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -531,31 +531,30 @@ func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) err return db.Put(shardLastCrosslinkKey(shardID), data) } -// ReadCXReceipts retrieves all the transaction receipts belonging to a block. -func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) types.CXReceipts { - // Retrieve the flattened receipt slice - data, _ := db.Get(cxReceiptKey(shardID, number, hash)) - if len(data) == 0 { - return nil +// ReadCXReceipts retrieves all the transactions of receipts given destination shardID, number and blockHash +func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) (types.CXReceipts, error) { + data, err := db.Get(cxReceiptKey(shardID, number, hash)) + if len(data) == 0 || err != nil { + utils.Logger().Info().Err(err).Uint64("number", number).Int("dataLen", len(data)).Msg("ReadCXReceipts") + return nil, err } - // Convert the cross shard tx receipts from their storage form to their internal representation cxReceipts := types.CXReceipts{} if err := rlp.DecodeBytes(data, &cxReceipts); err != nil { utils.Logger().Error().Err(err).Str("hash", hash.Hex()).Msg("Invalid cross-shard tx receipt array RLP") - return nil + return nil, err } - return cxReceipts + return cxReceipts, nil } -// WriteCXReceipts stores all the transaction receipts belonging to a block. +// WriteCXReceipts stores all the transaction receipts given destination shardID, blockNumber and blockHash func WriteCXReceipts(db DatabaseWriter, shardID uint32, number uint64, hash common.Hash, receipts types.CXReceipts) { bytes, err := rlp.EncodeToBytes(receipts) if err != nil { - utils.Logger().Error().Msg("Failed to encode cross shard tx receipts") + utils.Logger().Error().Msg("[WriteCXReceipts] Failed to encode cross shard tx receipts") } // Store the receipt slice if err := db.Put(cxReceiptKey(shardID, number, hash), bytes); err != nil { - utils.Logger().Error().Msg("Failed to store block receipts") + utils.Logger().Error().Msg("[WriteCXReceipts] Failed to store cxreceipts") } } diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 817a4a0bb..d3714318e 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -64,7 +64,8 @@ var ( crosslinkPrefix = []byte("crosslink") // prefix for crosslink tempCrosslinkPrefix = []byte("tempCrosslink") // prefix for tempCrosslink - cxReceiptPrefix = []byte("cxReceipt") // prefix for cross shard transaction receipt + cxReceiptPrefix = []byte("cxReceipt") // prefix for cross shard transaction receipt + cxReceiptHashPrefix = []byte("cxReceiptHash") // prefix for cross shard transaction receipt hash // epochBlockNumberPrefix + epoch (big.Int.Bytes()) // -> epoch block number (big.Int.Bytes()) diff --git a/core/types/block.go b/core/types/block.go index 5048b2ba5..4ab4d05d6 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -245,11 +245,7 @@ func NewBlock(header *Header, txs []*Transaction, receipts []*Receipt, cxs []*CX b.header.Bloom = CreateBloom(receipts) } - if len(cxs) == 0 { - b.header.CXReceiptHash = EmptyRootHash - } else { - b.header.CXReceiptHash = DeriveSha(CXReceipts(cxs)) - } + b.header.CXReceiptHash = DeriveMultipleShardsSha(CXReceipts(cxs)) return b } diff --git a/core/types/cx_receipt.go b/core/types/cx_receipt.go index 8ab2ce81f..f02ede147 100644 --- a/core/types/cx_receipt.go +++ b/core/types/cx_receipt.go @@ -38,7 +38,26 @@ func (cs CXReceipts) ToShardID(i int) uint32 { return cs[i].ToShardID } +// MaxToShardID returns the maximum destination shardID of cxReceipts +func (cs CXReceipts) MaxToShardID() uint32 { + maxShardID := uint32(0) + for i := 0; i < len(cs); i++ { + if maxShardID < cs[i].ToShardID { + maxShardID = cs[i].ToShardID + } + } + return maxShardID +} + // NewCrossShardReceipt creates a cross shard receipt func NewCrossShardReceipt(txHash common.Hash, nonce uint64, from common.Address, to *common.Address, shardID uint32, toShardID uint32, amount *big.Int) *CXReceipt { return &CXReceipt{TxHash: txHash, Nonce: nonce, From: from, To: to, ShardID: shardID, ToShardID: toShardID, Amount: amount} } + +// CXMerkleProof represents the merkle proof of a collection of ordered cross shard transactions +type CXMerkleProof struct { + BlockHash common.Hash // block header's hash + CXReceiptHash common.Hash // root hash of the cross shard receipts in a given block + ShardID []uint32 // order list, records destination shardID + CXShardHash []common.Hash // ordered hash list, each hash corresponds to one destination shard's receipts root hash +} diff --git a/core/types/derive_sha.go b/core/types/derive_sha.go index 84daa3e57..2a6616507 100644 --- a/core/types/derive_sha.go +++ b/core/types/derive_sha.go @@ -18,6 +18,7 @@ package types import ( "bytes" + "encoding/binary" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -30,6 +31,7 @@ type DerivableList interface { Len() int GetRlp(i int) []byte ToShardID(i int) uint32 + MaxToShardID() uint32 // return the maximum non-empty destination shardID } // DeriveSha calculates the hash of the trie generated by DerivableList. @@ -60,12 +62,23 @@ func DeriveOneShardSha(list DerivableList, shardID uint32) common.Hash { return trie.Hash() } -// DeriveMultipleShardsSha calcualtes the hash of tries generated by DerivableList of multiple shards -func DeriveMultipleShardsSha(list DerivableList, numShards int) common.Hash { +// DeriveMultipleShardsSha calcualtes the root hash of tries generated by DerivableList of multiple shards +// If the list is empty, then return EmptyRootHash +// else, return |shard0|trieHash0|shard1|trieHash1|...| for non-empty destination shards +func DeriveMultipleShardsSha(list DerivableList) common.Hash { by := []byte{} - for i := 0; i < numShards; i++ { + for i := 0; i <= int(list.MaxToShardID()); i++ { shardHash := DeriveOneShardSha(list, uint32(i)) + if shardHash == EmptyRootHash { + continue + } + sKey := make([]byte, 4) + binary.BigEndian.PutUint32(sKey, uint32(i)) + by = append(by, sKey...) by = append(by, shardHash[:]...) } + if len(by) == 0 { + return EmptyRootHash + } return crypto.Keccak256Hash(by) } diff --git a/core/types/receipt.go b/core/types/receipt.go index 5d0541d3b..065c689e9 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -211,6 +211,10 @@ func (r Receipts) GetRlp(i int) []byte { // ToShardID returns 0, arbitrary value // This function is NOT used, just to compatible with DerivableList interface func (r Receipts) ToShardID(i int) uint32 { - _ = r[i] + return 0 +} + +// MaxToShardID returns 0, arbitrary value, NOT used +func (r Receipts) MaxToShardID() uint32 { return 0 } diff --git a/core/types/transaction.go b/core/types/transaction.go index 1015837e7..e0ee01d35 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -106,8 +106,8 @@ func NewTransaction(nonce uint64, to common.Address, shardID uint32, amount *big } // NewCrossShardTransaction returns new cross shard transaction -func NewCrossShardTransaction(nonce uint64, to common.Address, shardID uint32, toShardID uint32, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte, txType TransactionType) *Transaction { - return newCrossShardTransaction(nonce, &to, shardID, toShardID, amount, gasLimit, gasPrice, data, txType) +func NewCrossShardTransaction(nonce uint64, to *common.Address, shardID uint32, toShardID uint32, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte, txType TransactionType) *Transaction { + return newCrossShardTransaction(nonce, to, shardID, toShardID, amount, gasLimit, gasPrice, data, txType) } // NewContractCreation returns same shard contract transaction. @@ -383,6 +383,11 @@ func (s Transactions) ToShardID(i int) uint32 { return s[i].data.ToShardID } +// MaxToShardID returns 0, arbitrary value, NOT use +func (s Transactions) MaxToShardID() uint32 { + return 0 +} + // TxDifference returns a new set which is the difference between a and b. func TxDifference(a, b Transactions) Transactions { keep := make(Transactions, 0, len(a)) diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index e34fcf95c..ef89161a4 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -1,10 +1,22 @@ package node import ( +<<<<<<< HEAD "encoding/binary" "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/bls/ffi/go/bls" +======= + "bytes" + "encoding/base64" + "encoding/binary" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" + + proto_node "github.com/harmony-one/harmony/api/proto/node" +>>>>>>> 0780a1c75a321b4e3d5890a814d695916030c6b5 "github.com/harmony-one/harmony/core/types" bls_cosi "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/ctxerror" @@ -137,7 +149,56 @@ func (node *Node) VerifyCrosslinkHeader(prevHeader, header *types.Header) error // ProcessReceiptMessage store the receipts and merkle proof in local data store func (node *Node) ProcessReceiptMessage(msgPayload []byte) { - // TODO: add logic + cxmsg := proto_node.CXReceiptsMessage{} + if err := rlp.DecodeBytes(msgPayload, &cxmsg); err != nil { + utils.Logger().Error().Err(err).Msg("[ProcessReceiptMessage] Unable to Decode message Payload") + return + } + merkleProof := cxmsg.MKP + myShardRoot := common.Hash{} + + var foundMyShard bool + byteBuffer := bytes.NewBuffer([]byte{}) + if len(merkleProof.ShardID) == 0 { + utils.Logger().Warn().Msg("[ProcessReceiptMessage] There is No non-empty destination shards") + return + } else { + for j := 0; j < len(merkleProof.ShardID); j++ { + sKey := make([]byte, 4) + binary.BigEndian.PutUint32(sKey, merkleProof.ShardID[j]) + byteBuffer.Write(sKey) + byteBuffer.Write(merkleProof.CXShardHash[j][:]) + if merkleProof.ShardID[j] == node.Consensus.ShardID { + foundMyShard = true + myShardRoot = merkleProof.CXShardHash[j] + } + } + } + + if !foundMyShard { + utils.Logger().Warn().Msg("[ProcessReceiptMessage] Not Found My Shard in CXReceipt Message") + return + } + + hash := crypto.Keccak256Hash(byteBuffer.Bytes()) + utils.Logger().Debug().Interface("hash", hash).Msg("[ProcessReceiptMessage] RootHash of the CXReceipts") + // TODO chao: use crosslink from beacon sync to verify the hash + + cxReceipts := cxmsg.CXS + sha := types.DeriveSha(cxReceipts) + if sha != myShardRoot { + utils.Logger().Warn().Interface("calculated", sha).Interface("got", myShardRoot).Msg("[ProcessReceiptMessage] Trie Root of CXReceipts Not Match") + return + } + + txs := types.Transactions{} + inputData, _ := base64.StdEncoding.DecodeString("") + for _, cx := range cxReceipts { + // TODO chao: add gas fee to incentivize + tx := types.NewCrossShardTransaction(0, cx.To, cx.ShardID, cx.ToShardID, cx.Amount, 0, nil, inputData, types.AdditionOnly) + txs = append(txs, tx) + } + node.addPendingTransactions(txs) } // ProcessCrossShardTx verify and process cross shard transaction on destination shard diff --git a/node/node_handler.go b/node/node_handler.go index 0f48f4f9f..004b11c9b 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -287,6 +287,7 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) { } } +<<<<<<< HEAD // BroadcastCrossLinkHeader is called by consensus leader to send the new header as cross link to beacon chain. func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) { utils.Logger().Info().Msgf("Broadcasting new header to beacon chain groupID %s", node.NodeConfig) @@ -303,6 +304,37 @@ func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) { lastThreeHeaders = append(lastThreeHeaders, newBlock.Header()) node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(lastThreeHeaders))) +======= +// BroadcastCXReceipts broadcasts cross shard receipts to correspoding +// destination shards +func (node *Node) BroadcastCXReceipts(newBlock *types.Block) { + epoch := newBlock.Header().Epoch + shardingConfig := core.ShardingSchedule.InstanceForEpoch(epoch) + shardNum := int(shardingConfig.NumShards()) + myShardID := node.Consensus.ShardID + utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]") + + for i := 0; i < shardNum; i++ { + if i == int(myShardID) { + continue + } + cxReceipts, err := node.Blockchain().CXReceipts(uint32(i), newBlock.NumberU64(), newBlock.Hash()) + if err != nil || len(cxReceipts) == 0 { + //utils.Logger().Warn().Err(err).Uint32("ToShardID", uint32(i)).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceipts] No CXReceipts found") + continue + } + merkleProof, err := node.Blockchain().CXMerkleProof(uint32(i), newBlock) + if err != nil { + utils.Logger().Warn().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] Unable to get merkleProof") + continue + } + utils.Logger().Info().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] CXReceipts and MerkleProof Found") + + groupID := p2p.ShardID(i) + go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsMessage(cxReceipts, merkleProof))) + } + +>>>>>>> 0780a1c75a321b4e3d5890a814d695916030c6b5 } // VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on @@ -541,22 +573,28 @@ func (node *Node) validateNewShardState(block *types.Block, stakeInfo *map[commo // PostConsensusProcessing is called by consensus participants, after consensus is done, to: // 1. add the new block to blockchain // 2. [leader] send new block to the client +// 3. [leader] send cross shard tx receipts to destination shard func (node *Node) PostConsensusProcessing(newBlock *types.Block) { + if err := node.AddNewBlock(newBlock); err != nil { + utils.Logger().Error(). + Err(err). + Msg("Error when adding new block") + return + } + if node.Consensus.PubKey.IsEqual(node.Consensus.LeaderPubKey) { node.BroadcastNewBlock(newBlock) +<<<<<<< HEAD node.BroadcastCrossLinkHeader(newBlock) +======= + node.BroadcastCXReceipts(newBlock) +>>>>>>> 0780a1c75a321b4e3d5890a814d695916030c6b5 } else { utils.Logger().Info(). Uint64("ViewID", node.Consensus.GetViewID()). Msg("BINGO !!! Reached Consensus") } - if err := node.AddNewBlock(newBlock); err != nil { - utils.Logger().Error(). - Err(err). - Msg("Error when adding new block") - } - if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet { // Update contract deployer's nonce so default contract like faucet can issue transaction with current nonce nonce := node.GetNonceOfAddress(crypto.PubkeyToAddress(node.ContractDeployerKey.PublicKey))