add incomingReceipts handling and verification

pull/1368/head
chao 5 years ago committed by Chao Ma
parent 1a65ca2eff
commit d1d3e23ee4
  1. 2
      api/proto/node/node_test.go
  2. 6
      consensus/consensus_service.go
  3. 2
      consensus/engine/consensus_engine.go
  4. 2
      core/chain_makers.go
  5. 2
      core/genesis.go
  6. 22
      core/state_processor.go
  7. 20
      core/state_transition.go
  8. 2
      core/tx_pool_test.go
  9. 40
      core/types/block.go
  10. 21
      core/types/cx_receipt.go
  11. 2
      core/vm/evm.go
  12. 28
      node/node_cross_shard.go
  13. 23
      node/node_newblock.go
  14. 25
      node/worker/worker.go

@ -88,7 +88,7 @@ func TestConstructBlocksSyncMessage(t *testing.T) {
t.Fatalf("statedb.Database().TrieDB().Commit() failed: %s", err)
}
block1 := types.NewBlock(head, nil, nil, nil)
block1 := types.NewBlock(head, nil, nil, nil, nil)
blocks := []*types.Block{
block1,

@ -282,16 +282,16 @@ func (consensus *Consensus) VerifySeal(chain consensus_engine.ChainReader, heade
return nil
}
// Finalize implements consensus.Engine, accumulating the block and uncle rewards,
// Finalize implements consensus.Engine, accumulating the block rewards,
// setting the final state and assembling the block.
func (consensus *Consensus) Finalize(chain consensus_engine.ChainReader, header *types.Header, state *state.DB, txs []*types.Transaction, receipts []*types.Receipt, cxreceipts []*types.CXReceipt) (*types.Block, error) {
func (consensus *Consensus) Finalize(chain consensus_engine.ChainReader, header *types.Header, state *state.DB, txs []*types.Transaction, receipts []*types.Receipt, outcxs []*types.CXReceipt, incxs []*types.CXReceipt) (*types.Block, error) {
// Accumulate any block and uncle rewards and commit the final state root
// Header seems complete, assemble into a block and return
if err := accumulateRewards(chain, state, header); err != nil {
return nil, ctxerror.New("cannot pay block reward").WithCause(err)
}
header.Root = state.IntermediateRoot(false)
return types.NewBlock(header, txs, receipts, cxreceipts), nil
return types.NewBlock(header, txs, receipts, outcxs, incxs), nil
}
// Sign on the hash of the message

@ -67,7 +67,7 @@ type Engine interface {
// Note: The block header and state database might be updated to reflect any
// consensus rules that happen at finalization (e.g. block rewards).
Finalize(chain ChainReader, header *types.Header, state *state.DB, txs []*types.Transaction,
receipts []*types.Receipt, cxs []*types.CXReceipt) (*types.Block, error)
receipts []*types.Receipt, outcxs []*types.CXReceipt, incxs []*types.CXReceipt) (*types.Block, error)
// Seal generates a new sealing request for the given input block and pushes
// the result into the given channel.

@ -187,7 +187,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
if b.engine != nil {
// Finalize and seal the block
// TODO (chao): add cxReceipt in the last input
block, err := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.receipts, nil)
block, err := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.receipts, nil, nil)
if err != nil {
panic(err)
}

@ -261,7 +261,7 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block {
statedb.Commit(false)
statedb.Database().TrieDB().Commit(root, true)
return types.NewBlock(head, nil, nil, nil)
return types.NewBlock(head, nil, nil, nil, nil)
}
// Commit writes the block and state of a genesis specification to the database.

@ -58,7 +58,9 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen
func (p *StateProcessor) Process(block *types.Block, statedb *state.DB, cfg vm.Config) (types.Receipts, types.CXReceipts, []*types.Log, uint64, error) {
var (
receipts types.Receipts
cxs types.CXReceipts
outcxs types.CXReceipts
incxs = block.IncomingReceipts()
usedGas = new(uint64)
header = block.Header()
coinbase = block.Header().Coinbase
@ -74,17 +76,22 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.DB, cfg vm.C
}
receipts = append(receipts, receipt)
if cxReceipt != nil {
cxs = append(cxs, cxReceipt)
outcxs = append(outcxs, cxReceipt)
}
allLogs = append(allLogs, receipt.Logs...)
}
for _, cx := range block.IncomingReceipts() {
ApplyIncomingReceipt(statedb, cx)
}
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
_, err := p.engine.Finalize(p.bc, header, statedb, block.Transactions(), receipts, cxs)
_, err := p.engine.Finalize(p.bc, header, statedb, block.Transactions(), receipts, outcxs, incxs)
if err != nil {
return nil, nil, nil, 0, ctxerror.New("cannot finalize block").WithCause(err)
}
return receipts, cxs, allLogs, *usedGas, nil
return receipts, outcxs, allLogs, *usedGas, nil
}
// return true if it is valid
@ -95,9 +102,6 @@ func getTransactionType(header *types.Header, tx *types.Transaction) types.Trans
if tx.ShardID() != tx.ToShardID() && header.ShardID == tx.ShardID() {
return types.SubtractionOnly
}
if tx.ShardID() != tx.ToShardID() && header.ShardID == tx.ToShardID() {
return types.AdditionOnly
}
return types.InvalidTx
}
@ -112,7 +116,7 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo
}
msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
// skip signer err for additiononly tx
if err != nil && txType != types.AdditionOnly {
if err != nil {
return nil, nil, 0, err
}
@ -151,7 +155,7 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo
var cxReceipt *types.CXReceipt
if txType == types.SubtractionOnly {
cxReceipt = &types.CXReceipt{tx.Hash(), msg.Nonce(), msg.From(), msg.To(), tx.ShardID(), tx.ToShardID(), msg.Value()}
cxReceipt = &types.CXReceipt{tx.Hash(), msg.From(), msg.To(), tx.ShardID(), tx.ToShardID(), msg.Value()}
} else {
cxReceipt = nil
}

@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/internal/utils"
@ -134,6 +135,14 @@ func ApplyMessage(evm *vm.EVM, msg Message, gp *GasPool) ([]byte, uint64, bool,
return NewStateTransition(evm, msg, gp).TransitionDb()
}
// ApplyIncomingReceipt will add amount into ToAddress in the receipt
func ApplyIncomingReceipt(db *state.DB, cx *types.CXReceipt) {
if cx == nil || cx.To == nil {
return
}
db.AddBalance(*cx.To, cx.Amount)
}
// to returns the recipient of the message.
func (st *StateTransition) to() common.Address {
if st.msg == nil || st.msg.To() == nil /* contract creation */ {
@ -168,7 +177,7 @@ func (st *StateTransition) buyGas() error {
func (st *StateTransition) preCheck() error {
// Make sure this transaction's nonce is correct.
if st.msg.CheckNonce() && st.evm.Context.TxType != types.AdditionOnly {
if st.msg.CheckNonce() {
nonce := st.state.GetNonce(st.msg.From())
if nonce < st.msg.Nonce() {
return ErrNonceTooHigh
@ -211,9 +220,7 @@ func (st *StateTransition) TransitionDb() (ret []byte, usedGas uint64, failed bo
ret, _, st.gas, vmerr = evm.Create(sender, st.data, st.gas, st.value)
} else {
// Increment the nonce for the next transaction
if st.evm.Context.TxType != types.AdditionOnly {
st.state.SetNonce(msg.From(), st.state.GetNonce(sender.Address())+1)
}
st.state.SetNonce(msg.From(), st.state.GetNonce(sender.Address())+1)
ret, st.gas, vmerr = evm.Call(sender, st.to(), st.data, st.gas, st.value)
}
if vmerr != nil {
@ -223,10 +230,7 @@ func (st *StateTransition) TransitionDb() (ret []byte, usedGas uint64, failed bo
// balance transfer may never fail.
if vmerr == vm.ErrInsufficientBalance {
if st.evm.Context.TxType != types.AdditionOnly {
return nil, 0, false, vmerr
}
vmerr = nil
return nil, 0, false, vmerr
}
}
st.refundGas()

@ -54,7 +54,7 @@ type testBlockChain struct {
func (bc *testBlockChain) CurrentBlock() *types.Block {
return types.NewBlock(&types.Header{
GasLimit: bc.gasLimit,
}, nil, nil, nil)
}, nil, nil, nil, nil)
}
func (bc *testBlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {

@ -77,14 +77,14 @@ type Header struct {
TxHash common.Hash `json:"transactionsRoot" gencodec:"required"`
ReceiptHash common.Hash `json:"receiptsRoot" gencodec:"required"`
OutgoingReceiptHash common.Hash `json:"outgoingReceiptsRoot" gencodec:"required"`
//IncomingReceiptHash common.Hash `json:"incomingReceiptsRoot" gencodec:"required"`
Bloom ethtypes.Bloom `json:"logsBloom" gencodec:"required"`
Number *big.Int `json:"number" gencodec:"required"`
GasLimit uint64 `json:"gasLimit" gencodec:"required"`
GasUsed uint64 `json:"gasUsed" gencodec:"required"`
Time *big.Int `json:"timestamp" gencodec:"required"`
Extra []byte `json:"extraData" gencodec:"required"`
MixDigest common.Hash `json:"mixHash" gencodec:"required"`
IncomingReceiptHash common.Hash `json:"incomingReceiptsRoot" gencodec:"required"`
Bloom ethtypes.Bloom `json:"logsBloom" gencodec:"required"`
Number *big.Int `json:"number" gencodec:"required"`
GasLimit uint64 `json:"gasLimit" gencodec:"required"`
GasUsed uint64 `json:"gasUsed" gencodec:"required"`
Time *big.Int `json:"timestamp" gencodec:"required"`
Extra []byte `json:"extraData" gencodec:"required"`
MixDigest common.Hash `json:"mixHash" gencodec:"required"`
// Additional Fields
ViewID *big.Int `json:"viewID" gencodec:"required"`
Epoch *big.Int `json:"epoch" gencodec:"required"`
@ -160,9 +160,10 @@ type Body struct {
// Block represents an entire block in the Ethereum blockchain.
type Block struct {
header *Header
uncles []*Header
transactions Transactions
header *Header
uncles []*Header
transactions Transactions
incomingReceipts CXReceipts
// caches
hash atomic.Value
@ -227,7 +228,7 @@ type storageblock struct {
// The values of TxHash, UncleHash, ReceiptHash and Bloom in header
// are ignored and set to values derived from the given txs,
// and receipts.
func NewBlock(header *Header, txs []*Transaction, receipts []*Receipt, cxs []*CXReceipt) *Block {
func NewBlock(header *Header, txs []*Transaction, receipts []*Receipt, outcxs []*CXReceipt, incxs []*CXReceipt) *Block {
b := &Block{header: CopyHeader(header)}
// TODO: panic if len(txs) != len(receipts)
@ -246,7 +247,15 @@ func NewBlock(header *Header, txs []*Transaction, receipts []*Receipt, cxs []*CX
b.header.Bloom = CreateBloom(receipts)
}
b.header.OutgoingReceiptHash = DeriveMultipleShardsSha(CXReceipts(cxs))
b.header.OutgoingReceiptHash = DeriveMultipleShardsSha(CXReceipts(outcxs))
if len(incxs) == 0 {
b.header.IncomingReceiptHash = EmptyRootHash
} else {
b.header.IncomingReceiptHash = DeriveSha(CXReceipts(incxs))
b.incomingReceipts = make(CXReceipts, len(incxs))
copy(b.incomingReceipts, incxs)
}
return b
}
@ -340,6 +349,11 @@ func (b *Block) Transactions() Transactions {
return b.transactions
}
// IncomingReceipts returns verified outgoing receipts
func (b *Block) IncomingReceipts() CXReceipts {
return b.incomingReceipts
}
// Transaction returns Transaction.
func (b *Block) Transaction(hash common.Hash) *Transaction {
for _, transaction := range b.transactions {

@ -10,7 +10,6 @@ import (
// CXReceipt represents a receipt for cross-shard transaction
type CXReceipt struct {
TxHash common.Hash // hash of the cross shard transaction in source shard
Nonce uint64
From common.Address
To *common.Address
ShardID uint32
@ -59,8 +58,8 @@ func (cs CXReceipts) MaxToShardID() uint32 {
}
// NewCrossShardReceipt creates a cross shard receipt
func NewCrossShardReceipt(txHash common.Hash, nonce uint64, from common.Address, to *common.Address, shardID uint32, toShardID uint32, amount *big.Int) *CXReceipt {
return &CXReceipt{TxHash: txHash, Nonce: nonce, From: from, To: to, ShardID: shardID, ToShardID: toShardID, Amount: amount}
func NewCrossShardReceipt(txHash common.Hash, from common.Address, to *common.Address, shardID uint32, toShardID uint32, amount *big.Int) *CXReceipt {
return &CXReceipt{TxHash: txHash, From: from, To: to, ShardID: shardID, ToShardID: toShardID, Amount: amount}
}
// CXMerkleProof represents the merkle proof of a collection of ordered cross shard transactions
@ -72,3 +71,19 @@ type CXMerkleProof struct {
ShardIDs []uint32 // order list, records destination shardID
CXShardHashes []common.Hash // ordered hash list, each hash corresponds to one destination shard's receipts root hash
}
// CalculateIncomingReceiptsHash calculates the incoming receipts list hash
// the list is already sorted by shardID and then by blockNum before calling this function
// or the list is from the block field which is already sorted
func CalculateIncomingReceiptsHash(receiptsList []CXReceipts) common.Hash {
if len(receiptsList) == 0 {
return EmptyRootHash
}
incomingReceipts := CXReceipts{}
for _, receipts := range receiptsList {
incomingReceipts = append(incomingReceipts, receipts...)
}
return DeriveSha(incomingReceipts)
}

@ -200,7 +200,7 @@ func (evm *EVM) Call(caller ContractRef, addr common.Address, input []byte, gas
txType := evm.Context.TxType
// Fail if we're trying to transfer more than the available balance
if !evm.Context.CanTransfer(evm.StateDB, caller.Address(), value) && txType != types.AdditionOnly {
if !evm.Context.CanTransfer(evm.StateDB, caller.Address(), value) {
return nil, gas, ErrInsufficientBalance
}

@ -223,6 +223,7 @@ func (node *Node) ProcessReceiptMessage(msgPayload []byte) {
}
// Find receipts with my shard as destination
// and prepare to calculate source shard outgoing cxreceipts root hash
for j := 0; j < len(merkleProof.ShardIDs); j++ {
sKey := make([]byte, 4)
binary.BigEndian.PutUint32(sKey, merkleProof.ShardIDs[j])
@ -241,27 +242,32 @@ func (node *Node) ProcessReceiptMessage(msgPayload []byte) {
// Check whether the receipts matches the receipt merkle root
receiptsForMyShard := cxmsg.Receipts
if len(receiptsForMyShard) == 0 {
return
}
sourceShardID := merkleProof.ShardID
sourceBlockNum := merkleProof.BlockNum
sourceBlockHash := merkleProof.BlockHash
sourceOutgoingCXReceiptsHash := merkleProof.CXReceiptHash
sha := types.DeriveSha(receiptsForMyShard)
// (1) verify the CXReceipts trie root match
if sha != myShardRoot {
utils.Logger().Warn().Interface("calculated", sha).Interface("got", myShardRoot).Msg("[ProcessReceiptMessage] Trie Root of ReadCXReceipts Not Match")
utils.Logger().Warn().Uint32("sourceShardID", sourceShardID).Interface("sourceBlockNum", sourceBlockNum).Interface("calculated", sha).Interface("got", myShardRoot).Msg("[ProcessReceiptMessage] Trie Root of ReadCXReceipts Not Match")
return
}
if len(receiptsForMyShard) == 0 {
// (2) verify the outgoingCXReceiptsHash match
outgoingHashFromSourceShard := crypto.Keccak256Hash(byteBuffer.Bytes())
if outgoingHashFromSourceShard != sourceOutgoingCXReceiptsHash {
utils.Logger().Warn().Uint32("sourceShardID", sourceShardID).Interface("sourceBlockNum", sourceBlockNum).Interface("calculated", outgoingHashFromSourceShard).Interface("got", sourceOutgoingCXReceiptsHash).Msg("[ProcessReceiptMessage] IncomingReceiptRootHash from source shard not match")
return
}
sourceShardID := merkleProof.ShardID
sourceBlockNum := merkleProof.BlockNum
sourceBlockHash := merkleProof.BlockHash
// TODO: check message signature is from the nodes of source shard.
// TODO: (3) check message signature is from the nodes of source shard.
node.Blockchain().WriteCXReceipts(sourceShardID, sourceBlockNum.Uint64(), sourceBlockHash, receiptsForMyShard, true)
// Check merkle proof with crosslink of the source shard
hash := crypto.Keccak256Hash(byteBuffer.Bytes())
utils.Logger().Debug().Interface("hash", hash).Msg("[ProcessReceiptMessage] RootHash of the CXReceipts")
// TODO chao: use crosslink from beacon sync to verify the hash
node.AddPendingReceipts(&cxmsg)
}

@ -219,16 +219,19 @@ func (node *Node) proposeReceipts() []types.CXReceipts {
})
for _, receiptMsg := range node.pendingCXReceipts {
sourceShardID := receiptMsg.MerkleProof.ShardID
sourceBlockNum := receiptMsg.MerkleProof.BlockNum
beaconChain := node.Blockchain() // TODO: read from real beacon chain
crossLink, err := beaconChain.ReadCrossLink(sourceShardID, sourceBlockNum.Uint64(), false)
if err == nil {
if crossLink.ChainHeader.Hash() == receiptMsg.MerkleProof.BlockHash && crossLink.ChainHeader.OutgoingReceiptHash == receiptMsg.MerkleProof.CXReceiptHash {
receiptsList = append(receiptsList, receiptMsg.Receipts)
}
}
// sourceShardID := receiptMsg.MerkleProof.ShardID
// sourceBlockNum := receiptMsg.MerkleProof.BlockNum
//
// beaconChain := node.Blockchain() // TODO: read from real beacon chain
// crossLink, err := beaconChain.ReadCrossLink(sourceShardID, sourceBlockNum.Uint64(), false)
// if err == nil {
// // verify the source block hash is from a finalized block
// if crossLink.ChainHeader.Hash() == receiptMsg.MerkleProof.BlockHash && crossLink.ChainHeader.OutgoingReceiptHash == receiptMsg.MerkleProof.CXReceiptHash {
// receiptsList = append(receiptsList, receiptMsg.Receipts)
// }
// }
// TODO: remove it after beacon chain sync is ready, for pass the test only
receiptsList = append(receiptsList, receiptMsg.Receipts)
}
node.pendingCXMutex.Unlock()
return receiptsList

@ -25,7 +25,8 @@ type environment struct {
header *types.Header
txs []*types.Transaction
receipts []*types.Receipt
cxs []*types.CXReceipt // cross shard transaction receipts (source shard)
outcxs []*types.CXReceipt // cross shard transaction receipts (source shard)
incxs []*types.CXReceipt // cross shard receipts (desitinatin shard)
}
// Worker is the main object which takes care of submitting new work to consensus engine
@ -78,7 +79,6 @@ func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Addres
receipt, cx, _, err := core.ApplyTransaction(w.config, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, vm.Config{})
if err != nil {
fmt.Println("hehe", "applyTransaction failed", err)
w.current.state.RevertToSnapshot(snap)
return nil, err
}
@ -89,7 +89,7 @@ func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Addres
w.current.txs = append(w.current.txs, tx)
w.current.receipts = append(w.current.receipts, receipt)
if cx != nil {
w.current.cxs = append(w.current.cxs, cx)
w.current.outcxs = append(w.current.outcxs, cx)
}
return receipt.Logs, nil
@ -112,14 +112,14 @@ func (w *Worker) CommitTransactions(txs types.Transactions, coinbase common.Addr
return nil
}
// CommitReceipts commits a list of receipts.
// CommitReceipts commits a list of already verified incoming cross shard receipts
func (w *Worker) CommitReceipts(receiptsList []types.CXReceipts, coinbase common.Address) error {
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
}
w.current.header.IncomingReceiptHash = types.CalculateIncomingReceiptsHash(receiptsList)
for _, receipts := range receiptsList {
// TODO: apply receipt
_ = receipts
w.current.incxs = append(w.current.incxs, receipts...)
}
return nil
}
@ -174,9 +174,14 @@ func (w *Worker) GetCurrentReceipts() []*types.Receipt {
return w.current.receipts
}
// GetCurrentCXReceipts get the receipts generated starting from the last state.
func (w *Worker) GetCurrentCXReceipts() []*types.CXReceipt {
return w.current.cxs
// OutgoingReceipts get the receipts generated starting from the last state.
func (w *Worker) OutgoingReceipts() []*types.CXReceipt {
return w.current.outcxs
}
// IncomingReceipts get incoming receipts in destination shard that is received from source shard
func (w *Worker) IncomingReceipts() []*types.CXReceipt {
return w.current.incxs
}
// CommitWithCrossLinks generate a new block with cross links for the new txs.
@ -193,7 +198,7 @@ func (w *Worker) CommitWithCrossLinks(sig []byte, signers []byte, viewID uint64,
s := w.current.state.Copy()
copyHeader := types.CopyHeader(w.current.header)
block, err := w.engine.Finalize(w.chain, copyHeader, s, w.current.txs, w.current.receipts, w.current.cxs)
block, err := w.engine.Finalize(w.chain, copyHeader, s, w.current.txs, w.current.receipts, w.current.outcxs, w.current.incxs)
if err != nil {
return nil, ctxerror.New("cannot finalize block").WithCause(err)
}

Loading…
Cancel
Save