Add delegator->validators index

pull/1831/head
Rongjian Lan 5 years ago
parent e76d205e29
commit 317e587dbe
  1. 156
      core/blockchain.go
  2. 26
      core/rawdb/accessors_chain.go
  3. 6
      core/rawdb/schema.go

@ -18,6 +18,7 @@
package core
import (
"bytes"
"errors"
"fmt"
"io"
@ -58,19 +59,20 @@ var (
)
const (
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
badBlockLimit = 10
triesInMemory = 128
shardCacheLimit = 2
commitsCacheLimit = 10
epochCacheLimit = 10
randomnessCacheLimit = 10
stakingCacheLimit = 256
validatorListCacheLimit = 2
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
badBlockLimit = 10
triesInMemory = 128
shardCacheLimit = 2
commitsCacheLimit = 10
epochCacheLimit = 10
randomnessCacheLimit = 10
stakingCacheLimit = 256
validatorListCacheLimit = 2
validatorListByDelegatorCacheLimit = 256
// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
BlockChainVersion = 3
@ -123,18 +125,19 @@ type BlockChain struct {
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
receiptsCache *lru.Cache // Cache for the most recent receipts per block
blockCache *lru.Cache // Cache for the most recent entire blocks
futureBlocks *lru.Cache // future blocks are blocks added for later processing
shardStateCache *lru.Cache
lastCommitsCache *lru.Cache
epochCache *lru.Cache // Cache epoch number → first block number
randomnessCache *lru.Cache // Cache for vrf/vdf
stakingCache *lru.Cache // Cache for staking validator
validatorListCache *lru.Cache // Cache of validator list
stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
receiptsCache *lru.Cache // Cache for the most recent receipts per block
blockCache *lru.Cache // Cache for the most recent entire blocks
futureBlocks *lru.Cache // future blocks are blocks added for later processing
shardStateCache *lru.Cache
lastCommitsCache *lru.Cache
epochCache *lru.Cache // Cache epoch number → first block number
randomnessCache *lru.Cache // Cache for vrf/vdf
stakingCache *lru.Cache // Cache for staking validator
validatorListCache *lru.Cache // Cache of validator list
validatorListByDelegatorCache *lru.Cache // Cache of validator list by delegator
quit chan struct{} // blockchain quit channel
running int32 // running must be called atomically
@ -173,29 +176,31 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
randomnessCache, _ := lru.New(randomnessCacheLimit)
stakingCache, _ := lru.New(stakingCacheLimit)
validatorListCache, _ := lru.New(validatorListCacheLimit)
validatorListByDelegatorCache, _ := lru.New(validatorListByDelegatorCacheLimit)
bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
shardStateCache: shardCache,
lastCommitsCache: commitsCache,
epochCache: epochCache,
randomnessCache: randomnessCache,
stakingCache: stakingCache,
validatorListCache: validatorListCache,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
shardStateCache: shardCache,
lastCommitsCache: commitsCache,
epochCache: epochCache,
randomnessCache: randomnessCache,
stakingCache: stakingCache,
validatorListCache: validatorListCache,
validatorListByDelegatorCache: validatorListByDelegatorCache,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
}
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
@ -1069,6 +1074,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
// Write other block data using a batch.
// TODO: put following into a func
/////////////////////////// START
batch := bc.db.NewBatch()
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
@ -1156,17 +1162,17 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
if bc.chainConfig.IsStaking(block.Epoch()) {
for _, tx := range block.StakingTransactions() {
err = bc.UpdateValidatorList(tx)
err = bc.UpdateStakingMetaData(tx)
// keep offchain database consistency with onchain we need revert
// but it should not happend unless local database corrupted
if err != nil {
utils.Logger().Debug().Msgf("oops, UpdateValidatorList failed, err: %+v", err)
utils.Logger().Debug().Msgf("oops, UpdateStakingMetaData failed, err: %+v", err)
return NonStatTy, err
}
}
}
// END - TODO
/////////////////////////// END
// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
@ -2317,16 +2323,41 @@ func (bc *BlockChain) WriteValidatorList(addrs []common.Address) error {
if err != nil {
return err
}
by, err := rlp.EncodeToBytes(addrs)
bytes, err := rlp.EncodeToBytes(addrs)
if err == nil {
bc.validatorListCache.Add("validatorList", bytes)
}
return nil
}
// ReadValidatorListByDelegator reads the addresses of validators delegated by a delegator
func (bc *BlockChain) ReadValidatorListByDelegator(delegator common.Address) ([]common.Address, error) {
if cached, ok := bc.validatorListByDelegatorCache.Get(delegator.Bytes()); ok {
by := cached.([]byte)
m := []common.Address{}
if err := rlp.DecodeBytes(by, &m); err != nil {
return nil, err
}
return m, nil
}
return rawdb.ReadValidatorListByDelegator(bc.db, delegator)
}
// WriteValidatorListByDelegator writes the list of validator addresses to database
func (bc *BlockChain) WriteValidatorListByDelegator(delegator common.Address, addrs []common.Address) error {
err := rawdb.WriteValidatorListByDelegator(bc.db, delegator, addrs)
if err != nil {
return err
}
bc.validatorListCache.Add("validatorList", by)
bytes, err := rlp.EncodeToBytes(addrs)
if err == nil {
bc.validatorListByDelegatorCache.Add(delegator.Bytes(), bytes)
}
return nil
}
// UpdateValidatorList updates the validator map according to staking transaction
func (bc *BlockChain) UpdateValidatorList(tx *staking.StakingTransaction) error {
// UpdateStakingMetaData updates the validator's and the delegator's meta data according to staking transaction
func (bc *BlockChain) UpdateStakingMetaData(tx *staking.StakingTransaction) error {
// TODO: simply the logic here in staking/types/transaction.go
payload, err := tx.RLPEncodeStakeMsg()
if err != nil {
@ -2357,6 +2388,23 @@ func (bc *BlockChain) UpdateValidatorList(tx *staking.StakingTransaction) error
// following cases are placeholder for now
case staking.DirectiveEditValidator:
case staking.DirectiveDelegate:
delegate := decodePayload.(*staking.Delegate)
validators, err := bc.ReadValidatorListByDelegator(delegate.DelegatorAddress)
if err != nil {
return err
}
found := false
for _, validator := range validators {
if bytes.Compare(validator.Bytes(), delegate.ValidatorAddress.Bytes()) == 0 {
found = true
break
}
}
if !found {
validators = append(validators, delegate.ValidatorAddress)
}
err = bc.WriteValidatorListByDelegator(delegate.DelegatorAddress, validators)
return err
case staking.DirectiveUndelegate:
case staking.DirectiveCollectRewards:
default:

@ -666,3 +666,29 @@ func WriteValidatorList(db DatabaseWriter, addrs []common.Address) error {
}
return err
}
// ReadValidatorListByDelegator retrieves the list of validators delegated by a delegator
func ReadValidatorListByDelegator(db DatabaseReader, delegator common.Address) ([]common.Address, error) {
data, err := db.Get(delegatorValidatorListKey(delegator))
if len(data) == 0 || err != nil {
return []common.Address{}, nil
}
addrs := []common.Address{}
if err := rlp.DecodeBytes(data, &addrs); err != nil {
utils.Logger().Error().Err(err).Msg("Unable to Decode validator List from database")
return nil, err
}
return addrs, nil
}
// WriteValidatorListByDelegator stores the list of validators delegated by a delegator
func WriteValidatorListByDelegator(db DatabaseWriter, delegator common.Address, addrs []common.Address) error {
bytes, err := rlp.EncodeToBytes(addrs)
if err != nil {
utils.Logger().Error().Msg("[WriteValidatorListByDelegator] Failed to encode")
}
if err := db.Put(delegatorValidatorListKey(delegator), bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorListByDelegator] Failed to store to database")
}
return err
}

@ -65,6 +65,8 @@ var (
crosslinkPrefix = []byte("cl") // prefix for crosslink
tempCrosslinkPrefix = []byte("tcl") // prefix for tempCrosslink
delegatorValidatorListPrefix = []byte("dvl") // prefix for delegator's validator list
// TODO: shorten the key prefix so we don't waste db space
cxReceiptPrefix = []byte("cxReceipt") // prefix for cross shard transaction receipt
tempCxReceiptPrefix = []byte("tempCxReceipt") // prefix for temporary cross shard transaction receipt
@ -201,6 +203,10 @@ func crosslinkKey(shardID uint32, blockNum uint64, temp bool) []byte {
return key
}
func delegatorValidatorListKey(delegator common.Address) []byte {
return append(delegatorValidatorListPrefix, delegator.Bytes()...)
}
// cxReceiptKey = cxReceiptsPrefix + shardID + num (uint64 big endian) + hash
func cxReceiptKey(shardID uint32, number uint64, hash common.Hash, temp bool) []byte {
prefix := cxReceiptPrefix

Loading…
Cancel
Save