add merkle proof generation/validation and cxreceipts message handler function

pull/1357/head
chao 5 years ago
parent ab8fad1905
commit 0780a1c75a
  1. 23
      api/proto/node/node.go
  2. 2
      cmd/client/wallet/main.go
  3. 2
      core/block_validator.go
  4. 64
      core/blockchain.go
  5. 23
      core/rawdb/accessors_chain.go
  6. 3
      core/rawdb/schema.go
  7. 2
      core/state_processor.go
  8. 6
      core/types/block.go
  9. 25
      core/types/cx_receipt.go
  10. 19
      core/types/derive_sha.go
  11. 8
      core/types/receipt.go
  12. 11
      core/types/transaction.go
  13. 58
      node/node_cross_shard.go
  14. 46
      node/node_handler.go

@ -35,6 +35,12 @@ type BlockchainSyncMessage struct {
BlockHashes []common.Hash
}
// CXReceiptsMessage carrys the cross shard receipts and merkle proof
type CXReceiptsMessage struct {
CXS types.CXReceipts
MKP *types.CXMerkleProof
}
// BlockchainSyncMessageType represents BlockchainSyncMessageType type.
type BlockchainSyncMessageType int
@ -188,3 +194,20 @@ func DeserializeEpochShardStateFromMessage(payload []byte) (*types.EpochShardSta
return epochShardState, nil
}
// ConstructCXReceiptsMessage constructs cross shard receipts and merkle proof
func ConstructCXReceiptsMessage(cxs types.CXReceipts, mkp *types.CXMerkleProof) []byte {
msg := &CXReceiptsMessage{CXS: cxs, MKP: mkp}
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(Block))
byteBuffer.WriteByte(byte(Receipt))
by, err := rlp.EncodeToBytes(msg)
if err != nil {
log.Fatal(err)
return []byte{}
}
byteBuffer.Write(by)
return byteBuffer.Bytes()
}

@ -719,7 +719,7 @@ func processTransferCommand() {
gas, nil, inputData)
} else {
tx = types.NewCrossShardTransaction(
state.nonce, receiverAddress, fromShard, toShard, amountBigInt,
state.nonce, &receiverAddress, fromShard, toShard, amountBigInt,
gas, nil, inputData, types.SubtractionOnly)
}

@ -91,7 +91,7 @@ func (v *BlockValidator) ValidateState(block, parent *types.Block, statedb *stat
return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha)
}
cxsSha := types.DeriveSha(cxReceipts)
cxsSha := types.DeriveMultipleShardsSha(cxReceipts)
if cxsSha != header.CXReceiptHash {
return fmt.Errorf("invalid cross shard receipt root hash (remote: %x local: %x)", header.CXReceiptHash, cxsSha)
}

@ -980,7 +980,7 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e
}
// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.DB) (status WriteStatus, err error) {
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, cxReceipts []*types.CXReceipt, state *state.DB) (status WriteStatus, err error) {
bc.wg.Add(1)
defer bc.wg.Done()
@ -1053,6 +1053,17 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
batch := bc.db.NewBatch()
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
epoch := block.Header().Epoch
shardingConfig := ShardingSchedule.InstanceForEpoch(epoch)
shardNum := int(shardingConfig.NumShards())
for i := 0; i < shardNum; i++ {
if i == int(block.ShardID()) {
continue
}
shardReceipts := GetToShardReceipts(cxReceipts, uint32(i))
rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts)
}
// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
@ -1273,7 +1284,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
proctime := time.Since(bstart)
// Write the block to the chain and get the status.
status, err := bc.WriteBlockWithState(block, receipts, state)
status, err := bc.WriteBlockWithState(block, receipts, cxReceipts, state)
if err != nil {
return i, events, coalescedLogs, err
}
@ -2000,3 +2011,52 @@ func (bc *BlockChain) ChainDB() ethdb.Database {
func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
}
// GetToShardReceipts filters the cross shard receipts with given destination shardID
func GetToShardReceipts(cxReceipts types.CXReceipts, shardID uint32) types.CXReceipts {
cxs := types.CXReceipts{}
for i := range cxReceipts {
cx := cxReceipts[i]
if cx.ToShardID == shardID {
cxs = append(cxs, cx)
}
}
return cxs
}
// CXReceipts retrieves the cross shard transaction receipts of a given shard
func (bc *BlockChain) CXReceipts(shardID uint32, blockNum uint64, blockHash common.Hash) (types.CXReceipts, error) {
cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, blockNum, blockHash)
if err != nil || len(cxs) == 0 {
return nil, err
}
return cxs, nil
}
// CXMerkleProof calculates the cross shard transaction merkle proof of a given destination shard
func (bc *BlockChain) CXMerkleProof(shardID uint32, block *types.Block) (*types.CXMerkleProof, error) {
proof := &types.CXMerkleProof{BlockHash: block.Hash(), CXReceiptHash: block.Header().CXReceiptHash, CXShardHash: []common.Hash{}, ShardID: []uint32{}}
cxs, err := rawdb.ReadCXReceipts(bc.db, shardID, block.NumberU64(), block.Hash())
if err != nil || cxs == nil {
return nil, err
}
epoch := block.Header().Epoch
shardingConfig := ShardingSchedule.InstanceForEpoch(epoch)
shardNum := int(shardingConfig.NumShards())
for i := 0; i < shardNum; i++ {
receipts, err := bc.CXReceipts(uint32(i), block.NumberU64(), block.Hash())
if err != nil || len(receipts) == 0 {
continue
} else {
hash := types.DeriveSha(receipts)
proof.CXShardHash = append(proof.CXShardHash, hash)
proof.ShardID = append(proof.ShardID, uint32(i))
}
}
if len(proof.ShardID) == 0 {
return nil, nil
}
return proof, nil
}

@ -513,31 +513,30 @@ func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64
return db.Put(crosslinkKey(shardID, blockNum), data)
}
// ReadCXReceipts retrieves all the transaction receipts belonging to a block.
func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) types.CXReceipts {
// Retrieve the flattened receipt slice
data, _ := db.Get(cxReceiptKey(shardID, number, hash))
if len(data) == 0 {
return nil
// ReadCXReceipts retrieves all the transactions of receipts given destination shardID, number and blockHash
func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) (types.CXReceipts, error) {
data, err := db.Get(cxReceiptKey(shardID, number, hash))
if len(data) == 0 || err != nil {
utils.Logger().Info().Err(err).Uint64("number", number).Int("dataLen", len(data)).Msg("ReadCXReceipts")
return nil, err
}
// Convert the cross shard tx receipts from their storage form to their internal representation
cxReceipts := types.CXReceipts{}
if err := rlp.DecodeBytes(data, &cxReceipts); err != nil {
utils.Logger().Error().Err(err).Str("hash", hash.Hex()).Msg("Invalid cross-shard tx receipt array RLP")
return nil
return nil, err
}
return cxReceipts
return cxReceipts, nil
}
// WriteCXReceipts stores all the transaction receipts belonging to a block.
// WriteCXReceipts stores all the transaction receipts given destination shardID, blockNumber and blockHash
func WriteCXReceipts(db DatabaseWriter, shardID uint32, number uint64, hash common.Hash, receipts types.CXReceipts) {
bytes, err := rlp.EncodeToBytes(receipts)
if err != nil {
utils.Logger().Error().Msg("Failed to encode cross shard tx receipts")
utils.Logger().Error().Msg("[WriteCXReceipts] Failed to encode cross shard tx receipts")
}
// Store the receipt slice
if err := db.Put(cxReceiptKey(shardID, number, hash), bytes); err != nil {
utils.Logger().Error().Msg("Failed to store block receipts")
utils.Logger().Error().Msg("[WriteCXReceipts] Failed to store cxreceipts")
}
}

@ -62,7 +62,8 @@ var (
crosslinkPrefix = []byte("crosslink") // prefix for crosslink
cxReceiptPrefix = []byte("cxReceipt") // prefix for cross shard transaction receipt
cxReceiptPrefix = []byte("cxReceipt") // prefix for cross shard transaction receipt
cxReceiptHashPrefix = []byte("cxReceiptHash") // prefix for cross shard transaction receipt hash
// epochBlockNumberPrefix + epoch (big.Int.Bytes())
// -> epoch block number (big.Int.Bytes())

@ -124,7 +124,7 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo
//receipt.Logs = statedb.GetLogs(tx.Hash())
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
cxReceipt := &types.CXReceipt{tx.Hash(), msg.Nonce(), msg.From(), *msg.To(), tx.ShardID(), tx.ToShardID(), msg.Value()}
cxReceipt := &types.CXReceipt{tx.Hash(), msg.Nonce(), msg.From(), msg.To(), tx.ShardID(), tx.ToShardID(), msg.Value()}
return receipt, cxReceipt, gas, err
}

@ -245,11 +245,7 @@ func NewBlock(header *Header, txs []*Transaction, receipts []*Receipt, cxs []*CX
b.header.Bloom = CreateBloom(receipts)
}
if len(cxs) == 0 {
b.header.CXReceiptHash = EmptyRootHash
} else {
b.header.CXReceiptHash = DeriveSha(CXReceipts(cxs))
}
b.header.CXReceiptHash = DeriveMultipleShardsSha(CXReceipts(cxs))
return b
}

@ -12,7 +12,7 @@ type CXReceipt struct {
TxHash common.Hash // hash of the cross shard transaction in source shard
Nonce uint64
From common.Address
To common.Address
To *common.Address
ShardID uint32
ToShardID uint32
Amount *big.Int
@ -33,12 +33,31 @@ func (cs CXReceipts) GetRlp(i int) []byte {
return enc
}
// ShardID returns the destination shardID of the cxReceipt
// ToShardID returns the destination shardID of the cxReceipt
func (cs CXReceipts) ToShardID(i int) uint32 {
return cs[i].ToShardID
}
// MaxToShardID returns the maximum destination shardID of cxReceipts
func (cs CXReceipts) MaxToShardID() uint32 {
maxShardID := uint32(0)
for i := 0; i < len(cs); i++ {
if maxShardID < cs[i].ToShardID {
maxShardID = cs[i].ToShardID
}
}
return maxShardID
}
// NewCrossShardReceipt creates a cross shard receipt
func NewCrossShardReceipt(txHash common.Hash, nonce uint64, from common.Address, to common.Address, shardID uint32, toShardID uint32, amount *big.Int) *CXReceipt {
func NewCrossShardReceipt(txHash common.Hash, nonce uint64, from common.Address, to *common.Address, shardID uint32, toShardID uint32, amount *big.Int) *CXReceipt {
return &CXReceipt{TxHash: txHash, Nonce: nonce, From: from, To: to, ShardID: shardID, ToShardID: toShardID, Amount: amount}
}
// CXMerkleProof represents the merkle proof of a collection of ordered cross shard transactions
type CXMerkleProof struct {
BlockHash common.Hash // block header's hash
CXReceiptHash common.Hash // root hash of the cross shard receipts in a given block
ShardID []uint32 // order list, records destination shardID
CXShardHash []common.Hash // ordered hash list, each hash corresponds to one destination shard's receipts root hash
}

@ -18,6 +18,7 @@ package types
import (
"bytes"
"encoding/binary"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
@ -30,6 +31,7 @@ type DerivableList interface {
Len() int
GetRlp(i int) []byte
ToShardID(i int) uint32
MaxToShardID() uint32 // return the maximum non-empty destination shardID
}
// DeriveSha calculates the hash of the trie generated by DerivableList.
@ -60,12 +62,23 @@ func DeriveOneShardSha(list DerivableList, shardID uint32) common.Hash {
return trie.Hash()
}
// DeriveMultipleShardsSha calcualtes the hash of tries generated by DerivableList of multiple shards
func DeriveMultipleShardsSha(list DerivableList, numShards int) common.Hash {
// DeriveMultipleShardsSha calcualtes the root hash of tries generated by DerivableList of multiple shards
// If the list is empty, then return EmptyRootHash
// else, return |shard0|trieHash0|shard1|trieHash1|...| for non-empty destination shards
func DeriveMultipleShardsSha(list DerivableList) common.Hash {
by := []byte{}
for i := 0; i < numShards; i++ {
for i := 0; i <= int(list.MaxToShardID()); i++ {
shardHash := DeriveOneShardSha(list, uint32(i))
if shardHash == EmptyRootHash {
continue
}
sKey := make([]byte, 4)
binary.BigEndian.PutUint32(sKey, uint32(i))
by = append(by, sKey...)
by = append(by, shardHash[:]...)
}
if len(by) == 0 {
return EmptyRootHash
}
return crypto.Keccak256Hash(by)
}

@ -208,9 +208,13 @@ func (r Receipts) GetRlp(i int) []byte {
return bytes
}
// ShardID returns 0, arbitrary value
// ToShardID returns 0, arbitrary value
// This function is NOT used, just to compatible with DerivableList interface
func (r Receipts) ToShardID(i int) uint32 {
_ = r[i]
return 0
}
// MaxToShardID returns 0, arbitrary value, NOT used
func (r Receipts) MaxToShardID() uint32 {
return 0
}

@ -106,8 +106,8 @@ func NewTransaction(nonce uint64, to common.Address, shardID uint32, amount *big
}
// NewCrossShardTransaction returns new cross shard transaction
func NewCrossShardTransaction(nonce uint64, to common.Address, shardID uint32, toShardID uint32, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte, txType TransactionType) *Transaction {
return newCrossShardTransaction(nonce, &to, shardID, toShardID, amount, gasLimit, gasPrice, data, txType)
func NewCrossShardTransaction(nonce uint64, to *common.Address, shardID uint32, toShardID uint32, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte, txType TransactionType) *Transaction {
return newCrossShardTransaction(nonce, to, shardID, toShardID, amount, gasLimit, gasPrice, data, txType)
}
// NewContractCreation returns same shard contract transaction.
@ -378,11 +378,16 @@ func (s Transactions) GetRlp(i int) []byte {
return enc
}
// ShardID returns the destination shardID of given transaction
// ToShardID returns the destination shardID of given transaction
func (s Transactions) ToShardID(i int) uint32 {
return s[i].data.ToShardID
}
// MaxToShardID returns 0, arbitrary value, NOT use
func (s Transactions) MaxToShardID() uint32 {
return 0
}
// TxDifference returns a new set which is the difference between a and b.
func TxDifference(a, b Transactions) Transactions {
keep := make(Transactions, 0, len(a))

@ -1,8 +1,15 @@
package node
import (
"bytes"
"encoding/base64"
"encoding/binary"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
)
@ -22,7 +29,56 @@ func (node *Node) ProcessHeaderMessage(msgPayload []byte) {
// ProcessReceiptMessage store the receipts and merkle proof in local data store
func (node *Node) ProcessReceiptMessage(msgPayload []byte) {
// TODO: add logic
cxmsg := proto_node.CXReceiptsMessage{}
if err := rlp.DecodeBytes(msgPayload, &cxmsg); err != nil {
utils.Logger().Error().Err(err).Msg("[ProcessReceiptMessage] Unable to Decode message Payload")
return
}
merkleProof := cxmsg.MKP
myShardRoot := common.Hash{}
var foundMyShard bool
byteBuffer := bytes.NewBuffer([]byte{})
if len(merkleProof.ShardID) == 0 {
utils.Logger().Warn().Msg("[ProcessReceiptMessage] There is No non-empty destination shards")
return
} else {
for j := 0; j < len(merkleProof.ShardID); j++ {
sKey := make([]byte, 4)
binary.BigEndian.PutUint32(sKey, merkleProof.ShardID[j])
byteBuffer.Write(sKey)
byteBuffer.Write(merkleProof.CXShardHash[j][:])
if merkleProof.ShardID[j] == node.Consensus.ShardID {
foundMyShard = true
myShardRoot = merkleProof.CXShardHash[j]
}
}
}
if !foundMyShard {
utils.Logger().Warn().Msg("[ProcessReceiptMessage] Not Found My Shard in CXReceipt Message")
return
}
hash := crypto.Keccak256Hash(byteBuffer.Bytes())
utils.Logger().Debug().Interface("hash", hash).Msg("[ProcessReceiptMessage] RootHash of the CXReceipts")
// TODO chao: use crosslink from beacon sync to verify the hash
cxReceipts := cxmsg.CXS
sha := types.DeriveSha(cxReceipts)
if sha != myShardRoot {
utils.Logger().Warn().Interface("calculated", sha).Interface("got", myShardRoot).Msg("[ProcessReceiptMessage] Trie Root of CXReceipts Not Match")
return
}
txs := types.Transactions{}
inputData, _ := base64.StdEncoding.DecodeString("")
for _, cx := range cxReceipts {
// TODO chao: add gas fee to incentivize
tx := types.NewCrossShardTransaction(0, cx.To, cx.ShardID, cx.ToShardID, cx.Amount, 0, nil, inputData, types.AdditionOnly)
txs = append(txs, tx)
}
node.addPendingTransactions(txs)
}
// ProcessCrossShardTx verify and process cross shard transaction on destination shard

@ -285,6 +285,37 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
}
}
// BroadcastCXReceipts broadcasts cross shard receipts to correspoding
// destination shards
func (node *Node) BroadcastCXReceipts(newBlock *types.Block) {
epoch := newBlock.Header().Epoch
shardingConfig := core.ShardingSchedule.InstanceForEpoch(epoch)
shardNum := int(shardingConfig.NumShards())
myShardID := node.Consensus.ShardID
utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]")
for i := 0; i < shardNum; i++ {
if i == int(myShardID) {
continue
}
cxReceipts, err := node.Blockchain().CXReceipts(uint32(i), newBlock.NumberU64(), newBlock.Hash())
if err != nil || len(cxReceipts) == 0 {
//utils.Logger().Warn().Err(err).Uint32("ToShardID", uint32(i)).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceipts] No CXReceipts found")
continue
}
merkleProof, err := node.Blockchain().CXMerkleProof(uint32(i), newBlock)
if err != nil {
utils.Logger().Warn().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] Unable to get merkleProof")
continue
}
utils.Logger().Info().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] CXReceipts and MerkleProof Found")
groupID := p2p.ShardID(i)
go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsMessage(cxReceipts, merkleProof)))
}
}
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on
func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
// TODO ek – where do we verify parent-child invariants,
@ -423,21 +454,24 @@ func (node *Node) validateNewShardState(block *types.Block, stakeInfo *map[commo
// PostConsensusProcessing is called by consensus participants, after consensus is done, to:
// 1. add the new block to blockchain
// 2. [leader] send new block to the client
// 3. [leader] send cross shard tx receipts to destination shard
func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
if err := node.AddNewBlock(newBlock); err != nil {
utils.Logger().Error().
Err(err).
Msg("Error when adding new block")
return
}
if node.Consensus.PubKey.IsEqual(node.Consensus.LeaderPubKey) {
node.BroadcastNewBlock(newBlock)
node.BroadcastCXReceipts(newBlock)
} else {
utils.Logger().Info().
Uint64("ViewID", node.Consensus.GetViewID()).
Msg("BINGO !!! Reached Consensus")
}
if err := node.AddNewBlock(newBlock); err != nil {
utils.Logger().Error().
Err(err).
Msg("Error when adding new block")
}
if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet {
// Update contract deployer's nonce so default contract like faucet can issue transaction with current nonce
nonce := node.GetNonceOfAddress(crypto.PubkeyToAddress(node.ContractDeployerKey.PublicKey))

Loading…
Cancel
Save