diff --git a/api/proto/node/node.go b/api/proto/node/node.go index 721ad6477..daaed4c1a 100644 --- a/api/proto/node/node.go +++ b/api/proto/node/node.go @@ -12,6 +12,7 @@ import ( "github.com/harmony-one/harmony/api/proto" "github.com/harmony-one/harmony/block" + "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" ) @@ -149,14 +150,19 @@ func ConstructBlocksSyncMessage(blocks []*types.Block) []byte { } // ConstructCrossLinkMessage constructs cross link message to send to beacon chain -func ConstructCrossLinkMessage(headers []*block.Header) []byte { +func ConstructCrossLinkMessage(bc engine.ChainReader, headers []*block.Header) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) byteBuffer.WriteByte(byte(Block)) byteBuffer.WriteByte(byte(CrossLink)) crosslinks := []types.CrossLink{} for _, header := range headers { - crosslinks = append(crosslinks, types.NewCrossLink(header)) + parentHeader := bc.GetHeaderByHash(header.ParentHash()) + if parentHeader == nil { + continue + } + epoch := parentHeader.Epoch() + crosslinks = append(crosslinks, types.NewCrossLink(header, epoch)) } crosslinksData, _ := rlp.EncodeToBytes(crosslinks) byteBuffer.Write(crosslinksData) diff --git a/core/blockchain.go b/core/blockchain.go index ba15f28ba..7b5dfd044 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -144,7 +144,7 @@ type BlockChain struct { validatorStatsCache *lru.Cache // Cache for validator stats validatorListCache *lru.Cache // Cache of validator list validatorListByDelegatorCache *lru.Cache // Cache of validator list by delegator - lastPendingCrossLinksCache *lru.Cache // Cache of last pending crosslinks + pendingCrossLinksCache *lru.Cache // Cache of last pending crosslinks quit chan struct{} // blockchain quit channel running int32 // running must be called atomically @@ -185,7 +185,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par validatorStatsCache, _ := lru.New(validatorStatsCacheLimit) validatorListCache, _ := lru.New(validatorListCacheLimit) validatorListByDelegatorCache, _ := lru.New(validatorListByDelegatorCacheLimit) - lastPendingCrossLinksCache, _ := lru.New(pendingCrossLinksCacheLimit) + pendingCrossLinksCache, _ := lru.New(pendingCrossLinksCacheLimit) bc := &BlockChain{ chainConfig: chainConfig, @@ -208,7 +208,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par validatorStatsCache: validatorStatsCache, validatorListCache: validatorListCache, validatorListByDelegatorCache: validatorListByDelegatorCache, - lastPendingCrossLinksCache: lastPendingCrossLinksCache, + pendingCrossLinksCache: pendingCrossLinksCache, engine: engine, vmConfig: vmConfig, badBlocks: badBlocks, @@ -1231,13 +1231,11 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. if err := bc.WriteCrossLinks(types.CrossLinks{crossLink}); err == nil { utils.Logger().Info().Uint64("blockNum", crossLink.BlockNum()).Uint32("shardID", crossLink.ShardID()).Msg("[insertChain/crosslinks] Cross Link Added to Beaconchain") } - bc.UpdateShardLastCrossLink(crossLink.ShardID(), crossLink) - } - //clean local database cache after crosslink inserted into blockchain - err = bc.DeletePendingCrossLinks() - if err != nil { - utils.Logger().Error().Err(err).Msgf("Unable to Delete PendingCrossLinks, number of crosslinks: %d", len(header.CrossLinks())) + bc.LastContinuousCrossLink(crossLink.ShardID(), crossLink) } + //clean/update local database cache after crosslink inserted into blockchain + num, err := bc.DeleteCommittedFromPendingCrossLinks() + utils.Logger().Debug().Msgf("DeleteCommittedFromPendingCrossLinks, crosslinks in header %d, pending crosslinks: %d, error: %+v", len(*crossLinks), num, err) } /////////////////////////// END @@ -2130,11 +2128,11 @@ func (bc *BlockChain) WriteShardLastCrossLink(shardID uint32, cl types.CrossLink return rawdb.WriteShardLastCrossLink(bc.db, cl.ShardID(), cl.Serialize()) } -// UpdateShardLastCrossLink saves the last crosslink of a shard +// LastContinuousCrossLink saves the last crosslink of a shard // This function will update the latest crosslink in the sense that // any previous block's crosslink is received up to this point // there is no missing hole between genesis to this crosslink of given shardID -func (bc *BlockChain) UpdateShardLastCrossLink(shardID uint32, cl types.CrossLink) error { +func (bc *BlockChain) LastContinuousCrossLink(shardID uint32, cl types.CrossLink) error { cl0, err := bc.ReadShardLastCrossLink(shardID) if cl0 == nil { bc.WriteShardLastCrossLink(shardID, cl) @@ -2157,7 +2155,7 @@ func (bc *BlockChain) UpdateShardLastCrossLink(shardID uint32, cl types.CrossLin } if newCheckpoint > 0 { - utils.Logger().Debug().Msgf("UpdateShardLastCrossLink: latest checkpoint blockNum %d", newCheckpoint) + utils.Logger().Debug().Msgf("LastContinuousCrossLink: latest checkpoint blockNum %d", newCheckpoint) cln, err := bc.ReadCrossLink(shardID, newCheckpoint) if err != nil { return err @@ -2181,7 +2179,7 @@ func (bc *BlockChain) ReadShardLastCrossLink(shardID uint32) (*types.CrossLink, // ReadPendingCrossLinks retrieves pending crosslinks func (bc *BlockChain) ReadPendingCrossLinks() ([]types.CrossLink, error) { bytes := []byte{} - if cached, ok := bc.lastPendingCrossLinksCache.Get("lastPendingCLs"); ok { + if cached, ok := bc.pendingCrossLinksCache.Get("pendingCLs"); ok { bytes = cached.([]byte) } else { bytes, err := rawdb.ReadPendingCrossLinks(bc.db) @@ -2210,7 +2208,7 @@ func (bc *BlockChain) WritePendingCrossLinks(cls []types.CrossLink) error { } by, err := rlp.EncodeToBytes(cls) if err == nil { - bc.lastPendingCrossLinksCache.Add("lastPendingCLs", by) + bc.pendingCrossLinksCache.Add("pendingCLs", by) } return nil } @@ -2227,10 +2225,22 @@ func (bc *BlockChain) AddPendingCrossLinks(pendingCLs []types.CrossLink) (int, e return len(cls), err } -// DeletePendingCrossLinks appends pending crosslinks -func (bc *BlockChain) DeletePendingCrossLinks() error { - bc.lastPendingCrossLinksCache.Remove("lastPendingCLs") - return rawdb.DeletePendingCrossLinks(bc.db) +// DeleteCommittedFromPendingCrossLinks delete pending crosslinks that already committed +func (bc *BlockChain) DeleteCommittedFromPendingCrossLinks() (int, error) { + cls, err := bc.ReadPendingCrossLinks() + if err != nil || len(cls) == 0 { + return 0, err + } + pendingCLs := []types.CrossLink{} + for _, cl := range cls { + cl0, err := bc.ReadCrossLink(cl.ShardID(), cl.BlockNum()) + if err == nil && cl0 != nil { + continue + } + pendingCLs = append(pendingCLs, cl) + } + err = bc.WritePendingCrossLinks(pendingCLs) + return len(pendingCLs), err } // IsSameLeaderAsPreviousBlock retrieves a block from the database by number, caching it diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 04f14d124..a75292998 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -543,17 +543,17 @@ func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) err // ReadPendingCrossLinks retrieves last pending crosslinks. func ReadPendingCrossLinks(db DatabaseReader) ([]byte, error) { - return db.Get(lastPendingCrosslinkKey) + return db.Get(pendingCrosslinkKey) } // WritePendingCrossLinks stores last pending crosslinks into database. func WritePendingCrossLinks(db DatabaseWriter, bytes []byte) error { - return db.Put(lastPendingCrosslinkKey, bytes) + return db.Put(pendingCrosslinkKey, bytes) } // DeletePendingCrossLinks stores last pending crosslinks into database. func DeletePendingCrossLinks(db DatabaseDeleter) error { - return db.Delete(lastPendingCrosslinkKey) + return db.Delete(pendingCrosslinkKey) } // ReadCXReceipts retrieves all the transactions of receipts given destination shardID, number and blockHash diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index c0fc3b041..47e72248c 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -58,7 +58,7 @@ var ( shardStatePrefix = []byte("ss") // shardStatePrefix + num (uint64 big endian) + hash -> shardState lastCommitsKey = []byte("LastCommits") - lastPendingCrosslinkKey = []byte("lastPendingCL") // prefix for shard last pending crosslink + pendingCrosslinkKey = []byte("pendingCL") // prefix for shard last pending crosslink preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db diff --git a/core/types/crosslink.go b/core/types/crosslink.go index 32e07890f..d9b2b55b1 100644 --- a/core/types/crosslink.go +++ b/core/types/crosslink.go @@ -20,16 +20,18 @@ type CrossLink struct { SignatureF [96]byte //aggregated signature BitmapF []byte //corresponding bitmap mask for agg signature ShardIDF uint32 //will be verified with signature on |blockNumber|blockHash| is correct + EpochF *big.Int } // NewCrossLink returns a new cross link object -func NewCrossLink(header *block.Header) CrossLink { - if header.Number().Uint64() == 0 { - return CrossLink{} - } +// epoch is the parentHeader's epoch +func NewCrossLink(header *block.Header, epoch *big.Int) CrossLink { parentBlockNum := big.NewInt(0) + if header.Number().Uint64() == 0 { // should not happend, just to be defensive + return CrossLink{header.ParentHash(), parentBlockNum, header.LastCommitSignature(), header.LastCommitBitmap(), header.ShardID(), epoch} + } parentBlockNum.Sub(header.Number(), big.NewInt(1)) - return CrossLink{header.ParentHash(), parentBlockNum, header.LastCommitSignature(), header.LastCommitBitmap(), header.ShardID()} + return CrossLink{header.ParentHash(), parentBlockNum, header.LastCommitSignature(), header.LastCommitBitmap(), header.ShardID(), epoch} } // ShardID returns shardID @@ -42,6 +44,11 @@ func (cl CrossLink) Number() *big.Int { return cl.BlockNumberF } +// Epoch returns epoch with big.Int format +func (cl CrossLink) Epoch() *big.Int { + return cl.EpochF +} + // BlockNum returns blockNum func (cl CrossLink) BlockNum() uint64 { return cl.BlockNumberF.Uint64() diff --git a/internal/chain/reward.go b/internal/chain/reward.go index ac8ed0edd..ecf927441 100644 --- a/internal/chain/reward.go +++ b/internal/chain/reward.go @@ -239,16 +239,13 @@ func AccumulateRewards( cxLink := crossLinks[i] - epoch := bc.GetHeaderByNumber(cxLink.BlockNum()).Epoch() - shardState, err := bc.ReadShardState(epoch) - if !bc.Config().IsStaking(epoch) { - shardState, err = committee.WithStakingEnabled.Compute(epoch, bc) + shardState, err := bc.ReadShardState(cxLink.Epoch()) + if !bc.Config().IsStaking(cxLink.Epoch()) { + shardState, err = committee.WithStakingEnabled.Compute(cxLink.Epoch(), bc) } if err != nil { - // TEMP HACK: IGNORE THE ERROR as THERE IS NO WAY TO VERIFY THE SIG OF FIRST BLOCK OF SHARD FIRST TIME ENTERING STAKING, NO WAY TO FIND THE LAST COMMITEE AS THERE IS GAP - // TODO: FIX THIS WITH NEW CROSSLINK FORMAT - continue + return err } subComm := shardState.FindCommitteeByID(cxLink.ShardID()) @@ -256,9 +253,7 @@ func AccumulateRewards( payableSigners, _, err := blockSigners(cxLink.Bitmap(), subComm) if err != nil { - // TEMP HACK: IGNORE THE ERROR as THERE IS NO WAY TO VERIFY THE SIG OF FIRST BLOCK OF SHARD FIRST TIME ENTERING STAKING, NO WAY TO FIND THE LAST COMMITEE AS THERE IS GAP - // TODO: FIX THIS WITH NEW CROSSLINK FORMAT - continue + return err } votingPower := votepower.Compute(payableSigners) diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 109c903b9..dade0c7c1 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -1,7 +1,6 @@ package node import ( - "bytes" "encoding/binary" "github.com/ethereum/go-ethereum/common" @@ -123,11 +122,8 @@ func (node *Node) VerifyBlockCrossLinks(block *types.Block) error { for _, crossLink := range *crossLinks { cl, err := node.Blockchain().ReadCrossLink(crossLink.ShardID(), crossLink.BlockNum()) if err == nil && cl != nil { - if !bytes.Equal(cl.Serialize(), crossLink.Serialize()) { - // Add logic to slash double sign - utils.Logger().Warn().Msgf("[CrossLinkVerification] Double signed crossLink, previous crossLink %+v, crosslink %+v", cl, crossLink) - } - continue + // Add slash for exist same blocknum but different crosslink + return ctxerror.New("crosslink already exist!") } if err = node.VerifyCrossLink(crossLink); err != nil { return ctxerror.New("cannot VerifyBlockCrossLinks", @@ -161,8 +157,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { Msgf("[ProcessingCrossLink] Crosslink going to propose: %d", len(crosslinks)) for i, cl := range crosslinks { - epoch := node.Blockchain().GetBlockByNumber(cl.BlockNum()).Epoch() - if cl.Number() == nil || epoch.Cmp(node.Blockchain().Config().CrossLinkEpoch) < 0 { + if cl.Number() == nil || cl.Epoch().Cmp(node.Blockchain().Config().CrossLinkEpoch) < 0 { utils.Logger().Debug(). Msgf("[ProcessingCrossLink] Crosslink %d skipped: %v", i, cl) continue @@ -187,9 +182,9 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { Msgf("[ProcessingCrossLink] committing for shardID %d, blockNum %d", cl.ShardID(), cl.Number().Uint64()) } node.pendingCLMutex.Lock() - Len, err := node.Blockchain().AddPendingCrossLinks(candidates) + Len, _ := node.Blockchain().AddPendingCrossLinks(candidates) node.pendingCLMutex.Unlock() - utils.Logger().Error().Err(err). + utils.Logger().Debug(). Msgf("[ProcessingCrossLink] add pending crosslinks, total pending: %d", Len) } } @@ -231,15 +226,14 @@ func (node *Node) verifyIncomingReceipts(block *types.Block) error { return nil } -// VerifyCrossLink verifies the header is valid against the prevHeader. +// VerifyCrossLink verifies the header is valid func (node *Node) VerifyCrossLink(cl types.CrossLink) error { // TODO: add fork choice rule // Verify signature of the new cross link header // TODO: check whether to recalculate shard state - epoch := node.Blockchain().GetBlockByNumber(cl.BlockNum()).Epoch() - shardState, err := node.Blockchain().ReadShardState(epoch) + shardState, err := node.Blockchain().ReadShardState(cl.Epoch()) committee := shardState.FindCommitteeByID(cl.ShardID()) if err != nil || committee == nil { @@ -261,29 +255,31 @@ func (node *Node) VerifyCrossLink(cl types.CrossLink) error { return ctxerror.New("[CrossLink] cannot convert BLS public key", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) } - if cl.BlockNum() > 1 { // First block doesn't have last sig - mask, err := bls_cosi.NewMask(committerKeys, nil) - if err != nil { - return ctxerror.New("cannot create group sig mask", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) - } - if err := mask.SetMask(cl.Bitmap()); err != nil { - return ctxerror.New("cannot set group sig mask bits", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) - } + if cl.BlockNum() <= 1 { + return ctxerror.New("CrossLink BlockNumber should greater than 1") + } - aggSig := bls.Sign{} - sig := cl.Signature() - err = aggSig.Deserialize(sig[:]) - if err != nil { - return ctxerror.New("unable to deserialize multi-signature from payload").WithCause(err) - } + mask, err := bls_cosi.NewMask(committerKeys, nil) + if err != nil { + return ctxerror.New("cannot create group sig mask", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) + } + if err := mask.SetMask(cl.Bitmap()); err != nil { + return ctxerror.New("cannot set group sig mask bits", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) + } - hash := cl.Hash() - blockNumBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(blockNumBytes, cl.BlockNum()) - commitPayload := append(blockNumBytes, hash[:]...) - if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) { - return ctxerror.New("Failed to verify the signature for cross link", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()) - } + aggSig := bls.Sign{} + sig := cl.Signature() + err = aggSig.Deserialize(sig[:]) + if err != nil { + return ctxerror.New("unable to deserialize multi-signature from payload").WithCause(err) + } + + hash := cl.Hash() + blockNumBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(blockNumBytes, cl.BlockNum()) + commitPayload := append(blockNumBytes, hash[:]...) + if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) { + return ctxerror.New("Failed to verify the signature for cross link", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()) } return nil } diff --git a/node/node_handler.go b/node/node_handler.go index 09bafc50b..a4379d3e6 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -241,7 +241,7 @@ func (node *Node) BroadcastCrossLink(newBlock *types.Block) { for _, header := range headers { utils.Logger().Debug().Msgf("[BroadcastCrossLink] Broadcasting %d", header.Number().Uint64()) } - node.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(0)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkMessage(headers))) + node.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(0)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkMessage(node.Consensus.ChainReader, headers))) } // VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on