fix epoch reading bug; return error in block crosslink verification if crosslink exists; only remove committed crosslink from pending list

pull/1877/head
chao 5 years ago
parent 706fc6826f
commit 338720b2eb
  1. 10
      api/proto/node/node.go
  2. 46
      core/blockchain.go
  3. 6
      core/rawdb/accessors_chain.go
  4. 2
      core/rawdb/schema.go
  5. 17
      core/types/crosslink.go
  6. 15
      internal/chain/reward.go
  7. 64
      node/node_cross_shard.go
  8. 2
      node/node_handler.go

@ -12,6 +12,7 @@ import (
"github.com/harmony-one/harmony/api/proto"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
)
@ -149,14 +150,19 @@ func ConstructBlocksSyncMessage(blocks []*types.Block) []byte {
}
// ConstructCrossLinkMessage constructs cross link message to send to beacon chain
func ConstructCrossLinkMessage(headers []*block.Header) []byte {
func ConstructCrossLinkMessage(bc engine.ChainReader, headers []*block.Header) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(Block))
byteBuffer.WriteByte(byte(CrossLink))
crosslinks := []types.CrossLink{}
for _, header := range headers {
crosslinks = append(crosslinks, types.NewCrossLink(header))
parentHeader := bc.GetHeaderByHash(header.ParentHash())
if parentHeader == nil {
continue
}
epoch := parentHeader.Epoch()
crosslinks = append(crosslinks, types.NewCrossLink(header, epoch))
}
crosslinksData, _ := rlp.EncodeToBytes(crosslinks)
byteBuffer.Write(crosslinksData)

@ -144,7 +144,7 @@ type BlockChain struct {
validatorStatsCache *lru.Cache // Cache for validator stats
validatorListCache *lru.Cache // Cache of validator list
validatorListByDelegatorCache *lru.Cache // Cache of validator list by delegator
lastPendingCrossLinksCache *lru.Cache // Cache of last pending crosslinks
pendingCrossLinksCache *lru.Cache // Cache of last pending crosslinks
quit chan struct{} // blockchain quit channel
running int32 // running must be called atomically
@ -185,7 +185,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
validatorStatsCache, _ := lru.New(validatorStatsCacheLimit)
validatorListCache, _ := lru.New(validatorListCacheLimit)
validatorListByDelegatorCache, _ := lru.New(validatorListByDelegatorCacheLimit)
lastPendingCrossLinksCache, _ := lru.New(pendingCrossLinksCacheLimit)
pendingCrossLinksCache, _ := lru.New(pendingCrossLinksCacheLimit)
bc := &BlockChain{
chainConfig: chainConfig,
@ -208,7 +208,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
validatorStatsCache: validatorStatsCache,
validatorListCache: validatorListCache,
validatorListByDelegatorCache: validatorListByDelegatorCache,
lastPendingCrossLinksCache: lastPendingCrossLinksCache,
pendingCrossLinksCache: pendingCrossLinksCache,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
@ -1231,13 +1231,11 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
if err := bc.WriteCrossLinks(types.CrossLinks{crossLink}); err == nil {
utils.Logger().Info().Uint64("blockNum", crossLink.BlockNum()).Uint32("shardID", crossLink.ShardID()).Msg("[insertChain/crosslinks] Cross Link Added to Beaconchain")
}
bc.UpdateShardLastCrossLink(crossLink.ShardID(), crossLink)
}
//clean local database cache after crosslink inserted into blockchain
err = bc.DeletePendingCrossLinks()
if err != nil {
utils.Logger().Error().Err(err).Msgf("Unable to Delete PendingCrossLinks, number of crosslinks: %d", len(header.CrossLinks()))
bc.LastContinuousCrossLink(crossLink.ShardID(), crossLink)
}
//clean/update local database cache after crosslink inserted into blockchain
num, err := bc.DeleteCommittedFromPendingCrossLinks()
utils.Logger().Debug().Msgf("DeleteCommittedFromPendingCrossLinks, crosslinks in header %d, pending crosslinks: %d, error: %+v", len(*crossLinks), num, err)
}
/////////////////////////// END
@ -2130,11 +2128,11 @@ func (bc *BlockChain) WriteShardLastCrossLink(shardID uint32, cl types.CrossLink
return rawdb.WriteShardLastCrossLink(bc.db, cl.ShardID(), cl.Serialize())
}
// UpdateShardLastCrossLink saves the last crosslink of a shard
// LastContinuousCrossLink saves the last crosslink of a shard
// This function will update the latest crosslink in the sense that
// any previous block's crosslink is received up to this point
// there is no missing hole between genesis to this crosslink of given shardID
func (bc *BlockChain) UpdateShardLastCrossLink(shardID uint32, cl types.CrossLink) error {
func (bc *BlockChain) LastContinuousCrossLink(shardID uint32, cl types.CrossLink) error {
cl0, err := bc.ReadShardLastCrossLink(shardID)
if cl0 == nil {
bc.WriteShardLastCrossLink(shardID, cl)
@ -2157,7 +2155,7 @@ func (bc *BlockChain) UpdateShardLastCrossLink(shardID uint32, cl types.CrossLin
}
if newCheckpoint > 0 {
utils.Logger().Debug().Msgf("UpdateShardLastCrossLink: latest checkpoint blockNum %d", newCheckpoint)
utils.Logger().Debug().Msgf("LastContinuousCrossLink: latest checkpoint blockNum %d", newCheckpoint)
cln, err := bc.ReadCrossLink(shardID, newCheckpoint)
if err != nil {
return err
@ -2181,7 +2179,7 @@ func (bc *BlockChain) ReadShardLastCrossLink(shardID uint32) (*types.CrossLink,
// ReadPendingCrossLinks retrieves pending crosslinks
func (bc *BlockChain) ReadPendingCrossLinks() ([]types.CrossLink, error) {
bytes := []byte{}
if cached, ok := bc.lastPendingCrossLinksCache.Get("lastPendingCLs"); ok {
if cached, ok := bc.pendingCrossLinksCache.Get("pendingCLs"); ok {
bytes = cached.([]byte)
} else {
bytes, err := rawdb.ReadPendingCrossLinks(bc.db)
@ -2210,7 +2208,7 @@ func (bc *BlockChain) WritePendingCrossLinks(cls []types.CrossLink) error {
}
by, err := rlp.EncodeToBytes(cls)
if err == nil {
bc.lastPendingCrossLinksCache.Add("lastPendingCLs", by)
bc.pendingCrossLinksCache.Add("pendingCLs", by)
}
return nil
}
@ -2227,10 +2225,22 @@ func (bc *BlockChain) AddPendingCrossLinks(pendingCLs []types.CrossLink) (int, e
return len(cls), err
}
// DeletePendingCrossLinks appends pending crosslinks
func (bc *BlockChain) DeletePendingCrossLinks() error {
bc.lastPendingCrossLinksCache.Remove("lastPendingCLs")
return rawdb.DeletePendingCrossLinks(bc.db)
// DeleteCommittedFromPendingCrossLinks delete pending crosslinks that already committed
func (bc *BlockChain) DeleteCommittedFromPendingCrossLinks() (int, error) {
cls, err := bc.ReadPendingCrossLinks()
if err != nil || len(cls) == 0 {
return 0, err
}
pendingCLs := []types.CrossLink{}
for _, cl := range cls {
cl0, err := bc.ReadCrossLink(cl.ShardID(), cl.BlockNum())
if err == nil && cl0 != nil {
continue
}
pendingCLs = append(pendingCLs, cl)
}
err = bc.WritePendingCrossLinks(pendingCLs)
return len(pendingCLs), err
}
// IsSameLeaderAsPreviousBlock retrieves a block from the database by number, caching it

@ -543,17 +543,17 @@ func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) err
// ReadPendingCrossLinks retrieves last pending crosslinks.
func ReadPendingCrossLinks(db DatabaseReader) ([]byte, error) {
return db.Get(lastPendingCrosslinkKey)
return db.Get(pendingCrosslinkKey)
}
// WritePendingCrossLinks stores last pending crosslinks into database.
func WritePendingCrossLinks(db DatabaseWriter, bytes []byte) error {
return db.Put(lastPendingCrosslinkKey, bytes)
return db.Put(pendingCrosslinkKey, bytes)
}
// DeletePendingCrossLinks stores last pending crosslinks into database.
func DeletePendingCrossLinks(db DatabaseDeleter) error {
return db.Delete(lastPendingCrosslinkKey)
return db.Delete(pendingCrosslinkKey)
}
// ReadCXReceipts retrieves all the transactions of receipts given destination shardID, number and blockHash

@ -58,7 +58,7 @@ var (
shardStatePrefix = []byte("ss") // shardStatePrefix + num (uint64 big endian) + hash -> shardState
lastCommitsKey = []byte("LastCommits")
lastPendingCrosslinkKey = []byte("lastPendingCL") // prefix for shard last pending crosslink
pendingCrosslinkKey = []byte("pendingCL") // prefix for shard last pending crosslink
preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage
configPrefix = []byte("ethereum-config-") // config prefix for the db

@ -20,16 +20,18 @@ type CrossLink struct {
SignatureF [96]byte //aggregated signature
BitmapF []byte //corresponding bitmap mask for agg signature
ShardIDF uint32 //will be verified with signature on |blockNumber|blockHash| is correct
EpochF *big.Int
}
// NewCrossLink returns a new cross link object
func NewCrossLink(header *block.Header) CrossLink {
if header.Number().Uint64() == 0 {
return CrossLink{}
}
// epoch is the parentHeader's epoch
func NewCrossLink(header *block.Header, epoch *big.Int) CrossLink {
parentBlockNum := big.NewInt(0)
if header.Number().Uint64() == 0 { // should not happend, just to be defensive
return CrossLink{header.ParentHash(), parentBlockNum, header.LastCommitSignature(), header.LastCommitBitmap(), header.ShardID(), epoch}
}
parentBlockNum.Sub(header.Number(), big.NewInt(1))
return CrossLink{header.ParentHash(), parentBlockNum, header.LastCommitSignature(), header.LastCommitBitmap(), header.ShardID()}
return CrossLink{header.ParentHash(), parentBlockNum, header.LastCommitSignature(), header.LastCommitBitmap(), header.ShardID(), epoch}
}
// ShardID returns shardID
@ -42,6 +44,11 @@ func (cl CrossLink) Number() *big.Int {
return cl.BlockNumberF
}
// Epoch returns epoch with big.Int format
func (cl CrossLink) Epoch() *big.Int {
return cl.EpochF
}
// BlockNum returns blockNum
func (cl CrossLink) BlockNum() uint64 {
return cl.BlockNumberF.Uint64()

@ -239,16 +239,13 @@ func AccumulateRewards(
cxLink := crossLinks[i]
epoch := bc.GetHeaderByNumber(cxLink.BlockNum()).Epoch()
shardState, err := bc.ReadShardState(epoch)
if !bc.Config().IsStaking(epoch) {
shardState, err = committee.WithStakingEnabled.Compute(epoch, bc)
shardState, err := bc.ReadShardState(cxLink.Epoch())
if !bc.Config().IsStaking(cxLink.Epoch()) {
shardState, err = committee.WithStakingEnabled.Compute(cxLink.Epoch(), bc)
}
if err != nil {
// TEMP HACK: IGNORE THE ERROR as THERE IS NO WAY TO VERIFY THE SIG OF FIRST BLOCK OF SHARD FIRST TIME ENTERING STAKING, NO WAY TO FIND THE LAST COMMITEE AS THERE IS GAP
// TODO: FIX THIS WITH NEW CROSSLINK FORMAT
continue
return err
}
subComm := shardState.FindCommitteeByID(cxLink.ShardID())
@ -256,9 +253,7 @@ func AccumulateRewards(
payableSigners, _, err := blockSigners(cxLink.Bitmap(), subComm)
if err != nil {
// TEMP HACK: IGNORE THE ERROR as THERE IS NO WAY TO VERIFY THE SIG OF FIRST BLOCK OF SHARD FIRST TIME ENTERING STAKING, NO WAY TO FIND THE LAST COMMITEE AS THERE IS GAP
// TODO: FIX THIS WITH NEW CROSSLINK FORMAT
continue
return err
}
votingPower := votepower.Compute(payableSigners)

@ -1,7 +1,6 @@
package node
import (
"bytes"
"encoding/binary"
"github.com/ethereum/go-ethereum/common"
@ -123,11 +122,8 @@ func (node *Node) VerifyBlockCrossLinks(block *types.Block) error {
for _, crossLink := range *crossLinks {
cl, err := node.Blockchain().ReadCrossLink(crossLink.ShardID(), crossLink.BlockNum())
if err == nil && cl != nil {
if !bytes.Equal(cl.Serialize(), crossLink.Serialize()) {
// Add logic to slash double sign
utils.Logger().Warn().Msgf("[CrossLinkVerification] Double signed crossLink, previous crossLink %+v, crosslink %+v", cl, crossLink)
}
continue
// Add slash for exist same blocknum but different crosslink
return ctxerror.New("crosslink already exist!")
}
if err = node.VerifyCrossLink(crossLink); err != nil {
return ctxerror.New("cannot VerifyBlockCrossLinks",
@ -161,8 +157,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
Msgf("[ProcessingCrossLink] Crosslink going to propose: %d", len(crosslinks))
for i, cl := range crosslinks {
epoch := node.Blockchain().GetBlockByNumber(cl.BlockNum()).Epoch()
if cl.Number() == nil || epoch.Cmp(node.Blockchain().Config().CrossLinkEpoch) < 0 {
if cl.Number() == nil || cl.Epoch().Cmp(node.Blockchain().Config().CrossLinkEpoch) < 0 {
utils.Logger().Debug().
Msgf("[ProcessingCrossLink] Crosslink %d skipped: %v", i, cl)
continue
@ -187,9 +182,9 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
Msgf("[ProcessingCrossLink] committing for shardID %d, blockNum %d", cl.ShardID(), cl.Number().Uint64())
}
node.pendingCLMutex.Lock()
Len, err := node.Blockchain().AddPendingCrossLinks(candidates)
Len, _ := node.Blockchain().AddPendingCrossLinks(candidates)
node.pendingCLMutex.Unlock()
utils.Logger().Error().Err(err).
utils.Logger().Debug().
Msgf("[ProcessingCrossLink] add pending crosslinks, total pending: %d", Len)
}
}
@ -231,15 +226,14 @@ func (node *Node) verifyIncomingReceipts(block *types.Block) error {
return nil
}
// VerifyCrossLink verifies the header is valid against the prevHeader.
// VerifyCrossLink verifies the header is valid
func (node *Node) VerifyCrossLink(cl types.CrossLink) error {
// TODO: add fork choice rule
// Verify signature of the new cross link header
// TODO: check whether to recalculate shard state
epoch := node.Blockchain().GetBlockByNumber(cl.BlockNum()).Epoch()
shardState, err := node.Blockchain().ReadShardState(epoch)
shardState, err := node.Blockchain().ReadShardState(cl.Epoch())
committee := shardState.FindCommitteeByID(cl.ShardID())
if err != nil || committee == nil {
@ -261,29 +255,31 @@ func (node *Node) VerifyCrossLink(cl types.CrossLink) error {
return ctxerror.New("[CrossLink] cannot convert BLS public key", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err)
}
if cl.BlockNum() > 1 { // First block doesn't have last sig
mask, err := bls_cosi.NewMask(committerKeys, nil)
if err != nil {
return ctxerror.New("cannot create group sig mask", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err)
}
if err := mask.SetMask(cl.Bitmap()); err != nil {
return ctxerror.New("cannot set group sig mask bits", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err)
}
if cl.BlockNum() <= 1 {
return ctxerror.New("CrossLink BlockNumber should greater than 1")
}
aggSig := bls.Sign{}
sig := cl.Signature()
err = aggSig.Deserialize(sig[:])
if err != nil {
return ctxerror.New("unable to deserialize multi-signature from payload").WithCause(err)
}
mask, err := bls_cosi.NewMask(committerKeys, nil)
if err != nil {
return ctxerror.New("cannot create group sig mask", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err)
}
if err := mask.SetMask(cl.Bitmap()); err != nil {
return ctxerror.New("cannot set group sig mask bits", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err)
}
hash := cl.Hash()
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, cl.BlockNum())
commitPayload := append(blockNumBytes, hash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
return ctxerror.New("Failed to verify the signature for cross link", "shardID", cl.ShardID(), "blockNum", cl.BlockNum())
}
aggSig := bls.Sign{}
sig := cl.Signature()
err = aggSig.Deserialize(sig[:])
if err != nil {
return ctxerror.New("unable to deserialize multi-signature from payload").WithCause(err)
}
hash := cl.Hash()
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, cl.BlockNum())
commitPayload := append(blockNumBytes, hash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
return ctxerror.New("Failed to verify the signature for cross link", "shardID", cl.ShardID(), "blockNum", cl.BlockNum())
}
return nil
}

@ -241,7 +241,7 @@ func (node *Node) BroadcastCrossLink(newBlock *types.Block) {
for _, header := range headers {
utils.Logger().Debug().Msgf("[BroadcastCrossLink] Broadcasting %d", header.Number().Uint64())
}
node.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(0)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkMessage(headers)))
node.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(0)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkMessage(node.Consensus.ChainReader, headers)))
}
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on

Loading…
Cancel
Save