Refactor offchain data commit; Make block onchain/offchain commit atomic (#2279)

* Refactor offchain data; Add epoch to ValidatorSnapshot

* Make block onchain/offchain data commit atomically
pull/2253/head
Rongjian Lan 5 years ago committed by Edgar Aroutiounian
parent 06a967f73a
commit 9ffbf682c0
  1. 526
      core/blockchain.go
  2. 184
      core/offchain.go
  3. 437
      core/rawdb/accessors_chain.go
  4. 10
      core/rawdb/accessors_chain_test.go
  5. 362
      core/rawdb/accessors_offchain.go
  6. 21
      core/rawdb/schema.go
  7. 2
      internal/hmyapi/apiv1/blockchain.go
  8. 2
      internal/hmyapi/apiv2/blockchain.go

@ -603,31 +603,36 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
return nil return nil
} }
// insert injects a new head block into the current block chain. This method // similar to insert, but add to the db writer.
// assumes that the block is indeed a true head. It will also reset the head func (bc *BlockChain) insertWithWriter(batch rawdb.DatabaseWriter, block *types.Block) {
// header and the head fast sync block to this very same block if they are older
// or if they are on a different side chain.
//
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) insert(block *types.Block) {
// If the block is on a side chain or an unknown one, force other heads onto it too // If the block is on a side chain or an unknown one, force other heads onto it too
updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash() updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash()
// Add the block to the canonical chain number scheme and mark as the head // Add the block to the canonical chain number scheme and mark as the head
rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64()) rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
rawdb.WriteHeadBlockHash(bc.db, block.Hash()) rawdb.WriteHeadBlockHash(batch, block.Hash())
bc.currentBlock.Store(block) bc.currentBlock.Store(block)
// If the block is better than our head or is on a different chain, force update heads // If the block is better than our head or is on a different chain, force update heads
if updateHeads { if updateHeads {
bc.hc.SetCurrentHeader(block.Header()) bc.hc.SetCurrentHeader(block.Header())
rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()) rawdb.WriteHeadFastBlockHash(batch, block.Hash())
bc.currentFastBlock.Store(block) bc.currentFastBlock.Store(block)
} }
} }
// insert injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
// header and the head fast sync block to this very same block if they are older
// or if they are on a different side chain.
//
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) insert(block *types.Block) {
bc.insertWithWriter(bc.db, block)
}
// Genesis retrieves the chain's genesis block. // Genesis retrieves the chain's genesis block.
func (bc *BlockChain) Genesis() *types.Block { func (bc *BlockChain) Genesis() *types.Block {
return bc.genesisBlock return bc.genesisBlock
@ -1052,17 +1057,19 @@ func (bc *BlockChain) WriteBlockWithState(
defer bc.mu.Unlock() defer bc.mu.Unlock()
currentBlock := bc.CurrentBlock() currentBlock := bc.CurrentBlock()
if currentBlock == nil || block.ParentHash() != currentBlock.Hash() {
return NonStatTy, errors.New("Hash of parent block doesn't match the current block hash")
}
rawdb.WriteBlock(bc.db, block) // Commit state object changes to in-memory trie
root, err := state.Commit(bc.chainConfig.IsS3(block.Epoch())) root, err := state.Commit(bc.chainConfig.IsS3(block.Epoch()))
if err != nil { if err != nil {
return NonStatTy, err return NonStatTy, err
} }
triedb := bc.stateCache.TrieDB()
// If we're running an archive node, always flush // Flush trie state into disk if it's archival node or the block is epoch block
if bc.cacheConfig.Disabled { triedb := bc.stateCache.TrieDB()
if bc.cacheConfig.Disabled || len(block.Header().ShardState()) > 0 {
if err := triedb.Commit(root, false); err != nil { if err := triedb.Commit(root, false); err != nil {
return NonStatTy, err return NonStatTy, err
} }
@ -1112,208 +1119,31 @@ func (bc *BlockChain) WriteBlockWithState(
} }
} }
// Write other block data using a batch.
// TODO: put following into a func
/////////////////////////// START
batch := bc.db.NewBatch() batch := bc.db.NewBatch()
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) // Write the raw block
rawdb.WriteBlock(batch, block)
//// Cross-shard txns
epoch := block.Header().Epoch()
if bc.chainConfig.HasCrossTxFields(block.Epoch()) {
shardingConfig := shard.Schedule.InstanceForEpoch(epoch)
shardNum := int(shardingConfig.NumShards())
for i := 0; i < shardNum; i++ {
if i == int(block.ShardID()) {
continue
}
shardReceipts := types.CXReceipts(cxReceipts).GetToShardReceipts(uint32(i)) // Write offchain data
err := rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts) if status, err := bc.CommitOffChainData(
if err != nil { batch, block, receipts,
utils.Logger().Error().Err(err).Interface("shardReceipts", shardReceipts).Int("toShardID", i).Msg("WriteCXReceipts cannot write into database") cxReceipts, payout, state, root); err != nil {
return NonStatTy, err return status, err
}
}
// Mark incomingReceipts in the block as spent
bc.WriteCXReceiptsProofSpent(block.IncomingReceipts())
} }
//// VRF + VDF // Write the positional metadata for transaction/receipt lookups and preimages
//check non zero VRF field in header and add to local db rawdb.WriteTxLookupEntries(batch, block)
if len(block.Vrf()) > 0 { rawdb.WriteCxLookupEntries(batch, block)
vrfBlockNumbers, _ := bc.ReadEpochVrfBlockNums(block.Header().Epoch()) rawdb.WritePreimages(batch, block.NumberU64(), state.Preimages())
if (len(vrfBlockNumbers) > 0) && (vrfBlockNumbers[len(vrfBlockNumbers)-1] == block.NumberU64()) {
utils.Logger().Error().
Str("number", block.Number().String()).
Str("epoch", block.Header().Epoch().String()).
Msg("VRF block number is already in local db")
} else {
vrfBlockNumbers = append(vrfBlockNumbers, block.NumberU64())
err = bc.WriteEpochVrfBlockNums(block.Header().Epoch(), vrfBlockNumbers)
if err != nil {
utils.Logger().Error().
Str("number", block.Number().String()).
Str("epoch", block.Header().Epoch().String()).
Msg("failed to write VRF block number to local db")
return NonStatTy, err
}
}
}
//check non zero Vdf in header and add to local db // Update current block
if len(block.Vdf()) > 0 { bc.insertWithWriter(batch, block)
err = bc.WriteEpochVdfBlockNum(block.Header().Epoch(), block.Number())
if err != nil {
utils.Logger().Error().
Str("number", block.Number().String()).
Str("epoch", block.Header().Epoch().String()).
Msg("failed to write VDF block number to local db")
return NonStatTy, err
}
}
//// Shard State and Validator Update
header := block.Header()
if len(header.ShardState()) > 0 {
// Write shard state for the new epoch
epoch := new(big.Int).Add(header.Epoch(), common.Big1)
shardState, err := block.Header().GetShardState()
if err == nil && shardState.Epoch != nil && bc.chainConfig.IsStaking(shardState.Epoch) {
// After staking, the epoch will be decided by the epoch in the shard state.
epoch = new(big.Int).Set(shardState.Epoch)
}
newShardState, err := bc.WriteShardStateBytes(batch, epoch, header.ShardState())
if err != nil {
header.Logger(utils.Logger()).Warn().Err(err).Msg("cannot store shard state")
return NonStatTy, err
}
// Find all the active validator addresses and store them in db
allActiveValidators := []common.Address{}
processed := make(map[common.Address]struct{})
for i := range newShardState.Shards {
shard := newShardState.Shards[i]
for j := range shard.Slots {
slot := shard.Slots[j]
if slot.EffectiveStake != nil { // For external validator
_, ok := processed[slot.EcdsaAddress]
if !ok {
processed[slot.EcdsaAddress] = struct{}{}
allActiveValidators = append(allActiveValidators, shard.Slots[j].EcdsaAddress)
}
}
}
}
// Update active validators
if err := bc.WriteActiveValidatorList(allActiveValidators); err != nil {
return NonStatTy, err
}
// Update snapshots for all validators
if err := bc.UpdateValidatorSnapshots(); err != nil {
return NonStatTy, err
}
}
// Do bookkeeping for new staking txns
for _, tx := range block.StakingTransactions() {
err = bc.UpdateStakingMetaData(tx, root)
// keep offchain database consistency with onchain we need revert
// but it should not happend unless local database corrupted
if err != nil {
utils.Logger().Debug().Msgf("oops, UpdateStakingMetaData failed, err: %+v", err)
return NonStatTy, err
}
}
// Update voting power of validators for all shards
if block.ShardID() == shard.BeaconChainShardID &&
len(block.Header().ShardState()) > 0 {
shardState := new(shard.State)
if shardState, err = shard.DecodeWrapper(block.Header().ShardState()); err == nil {
if err = bc.UpdateValidatorVotingPower(shardState); err != nil {
utils.Logger().Err(err).Msg("[UpdateValidatorVotingPower] Failed to update voting power")
}
} else {
utils.Logger().Err(err).Msg("[UpdateValidatorVotingPower] Failed to decode shard state")
}
}
//// Writing beacon chain cross links
if header.ShardID() == shard.BeaconChainShardID &&
bc.chainConfig.IsCrossLink(block.Epoch()) &&
len(header.CrossLinks()) > 0 {
crossLinks := &types.CrossLinks{}
err = rlp.DecodeBytes(header.CrossLinks(), crossLinks)
if err != nil {
header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain/crosslinks] cannot parse cross links")
return NonStatTy, err
}
if !crossLinks.IsSorted() {
header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain/crosslinks] cross links are not sorted")
return NonStatTy, errors.New("proposed cross links are not sorted")
}
for _, crossLink := range *crossLinks {
// Process crosslink
if err := bc.WriteCrossLinks(types.CrossLinks{crossLink}); err == nil {
utils.Logger().Info().Uint64("blockNum", crossLink.BlockNum()).Uint32("shardID", crossLink.ShardID()).Msg("[insertChain/crosslinks] Cross Link Added to Beaconchain")
}
bc.LastContinuousCrossLink(crossLink)
}
//clean/update local database cache after crosslink inserted into blockchain
num, err := bc.DeleteCommittedFromPendingCrossLinks(*crossLinks)
utils.Logger().Debug().Msgf("DeleteCommittedFromPendingCrossLinks, crosslinks in header %d, pending crosslinks: %d, error: %+v", len(*crossLinks), num, err)
}
if bc.CurrentHeader().ShardID() == shard.BeaconChainShardID {
if bc.chainConfig.IsStaking(block.Epoch()) {
bc.UpdateBlockRewardAccumulator(payout, block.Number().Uint64())
} else {
// block reward never accumulate before staking
bc.WriteBlockRewardAccumulator(big.NewInt(0), block.Number().Uint64())
}
}
/////////////////////////// END
// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
// TODO: Remove reorg code, it's not used in our code
reorg := true
if reorg {
// Reorganise the chain if the parent is not the head block
if block.ParentHash() != currentBlock.Hash() {
if err := bc.reorg(currentBlock, block); err != nil {
return NonStatTy, err
}
}
// Write the positional metadata for transaction/receipt lookups and preimages
rawdb.WriteTxLookupEntries(batch, block)
rawdb.WritePreimages(batch, block.NumberU64(), state.Preimages())
// write the positional metadata for CXReceipts lookups
rawdb.WriteCxLookupEntries(batch, block)
status = CanonStatTy
} else {
status = SideStatTy
}
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
return NonStatTy, err return NonStatTy, err
} }
// Set new head.
if status == CanonStatTy {
bc.insert(block)
}
bc.futureBlocks.Remove(block.Hash()) bc.futureBlocks.Remove(block.Hash())
return status, nil return CanonStatTy, nil
} }
// InsertChain attempts to insert the given batch of blocks in to the canonical // InsertChain attempts to insert the given batch of blocks in to the canonical
@ -1526,10 +1356,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
// Only count canonical blocks for GC processing time // Only count canonical blocks for GC processing time
bc.gcproc += proctime bc.gcproc += proctime
case SideStatTy:
logger.Debug().Msg("Inserted forked block")
blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainSideEvent{block})
} }
stats.processed++ stats.processed++
@ -1607,132 +1433,6 @@ func countTransactions(chain []*types.Block) (c int) {
return c return c
} }
// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
// to be part of the new canonical chain and accumulates potential missing transactions and post an
// event about them
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
var (
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block
deletedTxs types.Transactions
deletedLogs []*types.Log
// collectLogs collects the logs that were generated during the
// processing of the block that corresponds with the given hash.
// These logs are later announced as deleted.
collectLogs = func(hash common.Hash) {
// Coalesce logs and set 'Removed'.
number := bc.hc.GetBlockNumber(hash)
if number == nil {
return
}
receipts := rawdb.ReadReceipts(bc.db, hash, *number)
for _, receipt := range receipts {
for _, log := range receipt.Logs {
del := *log
del.Removed = true
deletedLogs = append(deletedLogs, &del)
}
}
}
)
// first reduce whoever is higher bound
if oldBlock.NumberU64() > newBlock.NumberU64() {
// reduce old chain
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
collectLogs(oldBlock.Hash())
}
} else {
// reduce new chain and append new chain blocks for inserting later on
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
newChain = append(newChain, newBlock)
}
}
if oldBlock == nil {
return fmt.Errorf("Invalid old chain")
}
if newBlock == nil {
return fmt.Errorf("Invalid new chain")
}
for {
if oldBlock.Hash() == newBlock.Hash() {
commonBlock = oldBlock
break
}
oldChain = append(oldChain, oldBlock)
newChain = append(newChain, newBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
collectLogs(oldBlock.Hash())
oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
if oldBlock == nil {
return fmt.Errorf("Invalid old chain")
}
if newBlock == nil {
return fmt.Errorf("Invalid new chain")
}
}
// Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 {
logEvent := utils.Logger().Debug()
if len(oldChain) > 63 {
logEvent = utils.Logger().Warn()
}
logEvent.
Str("number", commonBlock.Number().String()).
Str("hash", commonBlock.Hash().Hex()).
Int("drop", len(oldChain)).
Str("dropfrom", oldChain[0].Hash().Hex()).
Int("add", len(newChain)).
Str("addfrom", newChain[0].Hash().Hex()).
Msg("Chain split detected")
} else {
utils.Logger().Error().
Str("oldnum", oldBlock.Number().String()).
Str("oldhash", oldBlock.Hash().Hex()).
Str("newnum", newBlock.Number().String()).
Str("newhash", newBlock.Hash().Hex()).
Msg("Impossible reorg, please file an issue")
}
// Insert the new chain, taking care of the proper incremental order
var addedTxs types.Transactions
for i := len(newChain) - 1; i >= 0; i-- {
// insert the block in the canonical way, re-writing history
bc.insert(newChain[i])
// write lookup entries for hash based transaction/receipt searches
rawdb.WriteTxLookupEntries(bc.db, newChain[i])
addedTxs = append(addedTxs, newChain[i].Transactions()...)
}
// calculate the difference between deleted and added transactions
diff := types.TxDifference(deletedTxs, addedTxs)
// When transactions get deleted from the database that means the
// receipts that were created in the fork must also be deleted
batch := bc.db.NewBatch()
for _, tx := range diff {
rawdb.DeleteTxLookupEntry(batch, tx.Hash())
}
batch.Write()
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(oldChain) > 0 {
go func() {
for _, block := range oldChain {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
}()
}
return nil
}
// PostChainEvents iterates over the events generated by a chain insertion and // PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed. // posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock. // TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
@ -2129,11 +1829,11 @@ func (bc *BlockChain) WriteEpochVdfBlockNum(epoch *big.Int, blockNum *big.Int) e
} }
// WriteCrossLinks saves the hashes of crosslinks by shardID and blockNum combination key // WriteCrossLinks saves the hashes of crosslinks by shardID and blockNum combination key
func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink) error { func (bc *BlockChain) WriteCrossLinks(batch rawdb.DatabaseWriter, cls []types.CrossLink) error {
var err error var err error
for i := 0; i < len(cls); i++ { for i := 0; i < len(cls); i++ {
cl := cls[i] cl := cls[i]
err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum(), cl.Serialize()) err = rawdb.WriteCrossLinkShardBlock(batch, cl.ShardID(), cl.BlockNum(), cl.Serialize())
} }
return err return err
} }
@ -2163,14 +1863,14 @@ func (bc *BlockChain) ReadCrossLink(shardID uint32, blockNum uint64) (*types.Cro
// This function will update the latest crosslink in the sense that // This function will update the latest crosslink in the sense that
// any previous block's crosslink is received up to this point // any previous block's crosslink is received up to this point
// there is no missing hole between genesis to this crosslink of given shardID // there is no missing hole between genesis to this crosslink of given shardID
func (bc *BlockChain) LastContinuousCrossLink(cl types.CrossLink) error { func (bc *BlockChain) LastContinuousCrossLink(batch rawdb.DatabaseWriter, cl types.CrossLink) error {
if !bc.Config().IsCrossLink(cl.Epoch()) { if !bc.Config().IsCrossLink(cl.Epoch()) {
return errors.New("Trying to write last continuous cross link with epoch before cross link starting epoch") return errors.New("Trying to write last continuous cross link with epoch before cross link starting epoch")
} }
cl0, err := bc.ReadShardLastCrossLink(cl.ShardID()) cl0, err := bc.ReadShardLastCrossLink(cl.ShardID())
if cl0 == nil { if cl0 == nil {
rawdb.WriteShardLastCrossLink(bc.db, cl.ShardID(), cl.Serialize()) rawdb.WriteShardLastCrossLink(batch, cl.ShardID(), cl.Serialize())
return nil return nil
} }
if err != nil { if err != nil {
@ -2194,7 +1894,7 @@ func (bc *BlockChain) LastContinuousCrossLink(cl types.CrossLink) error {
if err != nil { if err != nil {
return err return err
} }
return rawdb.WriteShardLastCrossLink(bc.db, cln.ShardID(), cln.Serialize()) return rawdb.WriteShardLastCrossLink(batch, cln.ShardID(), cln.Serialize())
} }
return nil return nil
} }
@ -2444,58 +2144,11 @@ func (bc *BlockChain) CXMerkleProof(toShardID uint32, block *types.Block) (*type
return proof, nil return proof, nil
} }
// LatestCXReceiptsCheckpoint returns the latest checkpoint
func (bc *BlockChain) LatestCXReceiptsCheckpoint(shardID uint32) uint64 {
blockNum, _ := rawdb.ReadCXReceiptsProofUnspentCheckpoint(bc.db, shardID)
return blockNum
}
// NextCXReceiptsCheckpoint returns the next checkpoint blockNum
func (bc *BlockChain) NextCXReceiptsCheckpoint(currentNum uint64, shardID uint32) uint64 {
lastCheckpoint, _ := rawdb.ReadCXReceiptsProofUnspentCheckpoint(bc.db, shardID)
newCheckpoint := lastCheckpoint
// the new checkpoint will not exceed currentNum+1
for num := lastCheckpoint; num <= currentNum+1; num++ {
newCheckpoint = num
by, _ := rawdb.ReadCXReceiptsProofSpent(bc.db, shardID, num)
if by == rawdb.NAByte {
// TODO chao: check if there is IncompingReceiptsHash in crosslink header
// if the rootHash is non-empty, it means incomingReceipts are not delivered
// otherwise, it means there is no cross-shard transactions for this block
continue
}
if by == rawdb.SpentByte {
continue
}
// the first unspent blockHash found, break the loop
break
}
return newCheckpoint
}
// updateCXReceiptsCheckpoints will update the checkpoint and clean spent receipts upto checkpoint
func (bc *BlockChain) updateCXReceiptsCheckpoints(shardID uint32, currentNum uint64) {
lastCheckpoint, err := rawdb.ReadCXReceiptsProofUnspentCheckpoint(bc.db, shardID)
if err != nil {
utils.Logger().Warn().Msg("[updateCXReceiptsCheckpoints] Cannot get lastCheckpoint")
}
newCheckpoint := bc.NextCXReceiptsCheckpoint(currentNum, shardID)
if lastCheckpoint == newCheckpoint {
return
}
utils.Logger().Debug().Uint64("lastCheckpoint", lastCheckpoint).Uint64("newCheckpont", newCheckpoint).Msg("[updateCXReceiptsCheckpoints]")
for num := lastCheckpoint; num < newCheckpoint; num++ {
rawdb.DeleteCXReceiptsProofSpent(bc.db, shardID, num)
}
rawdb.WriteCXReceiptsProofUnspentCheckpoint(bc.db, shardID, newCheckpoint)
}
// WriteCXReceiptsProofSpent mark the CXReceiptsProof list with given unspent status // WriteCXReceiptsProofSpent mark the CXReceiptsProof list with given unspent status
// true: unspent, false: spent // true: unspent, false: spent
func (bc *BlockChain) WriteCXReceiptsProofSpent(cxps []*types.CXReceiptsProof) { func (bc *BlockChain) WriteCXReceiptsProofSpent(db rawdb.DatabaseWriter, cxps []*types.CXReceiptsProof) {
for _, cxp := range cxps { for _, cxp := range cxps {
rawdb.WriteCXReceiptsProofSpent(bc.db, cxp) rawdb.WriteCXReceiptsProofSpent(db, cxp)
} }
} }
@ -2504,31 +2157,12 @@ func (bc *BlockChain) IsSpent(cxp *types.CXReceiptsProof) bool {
shardID := cxp.MerkleProof.ShardID shardID := cxp.MerkleProof.ShardID
blockNum := cxp.MerkleProof.BlockNum.Uint64() blockNum := cxp.MerkleProof.BlockNum.Uint64()
by, _ := rawdb.ReadCXReceiptsProofSpent(bc.db, shardID, blockNum) by, _ := rawdb.ReadCXReceiptsProofSpent(bc.db, shardID, blockNum)
if by == rawdb.SpentByte || cxp.MerkleProof.BlockNum.Uint64() < bc.LatestCXReceiptsCheckpoint(cxp.MerkleProof.ShardID) { if by == rawdb.SpentByte {
return true return true
} }
return false return false
} }
// UpdateCXReceiptsCheckpointsByBlock cleans checkpoints and update latest checkpoint based on incomingReceipts of the given block
func (bc *BlockChain) UpdateCXReceiptsCheckpointsByBlock(block *types.Block) {
m := make(map[uint32]uint64)
for _, cxp := range block.IncomingReceipts() {
shardID := cxp.MerkleProof.ShardID
blockNum := cxp.MerkleProof.BlockNum.Uint64()
if _, ok := m[shardID]; !ok {
m[shardID] = blockNum
} else if m[shardID] < blockNum {
m[shardID] = blockNum
}
}
for k, v := range m {
utils.Logger().Debug().Uint32("shardID", k).Uint64("blockNum", v).Msg("[CleanCXReceiptsCheckpoints] Cleaning CXReceiptsProof upto")
bc.updateCXReceiptsCheckpoints(k, v)
}
}
// ReadTxLookupEntry returns where the given transaction resides in the chain, // ReadTxLookupEntry returns where the given transaction resides in the chain,
// as a (block hash, block number, index in transaction list) triple. // as a (block hash, block number, index in transaction list) triple.
// returns 0, 0 if not found // returns 0, 0 if not found
@ -2571,11 +2205,11 @@ func (bc *BlockChain) ReadValidatorSnapshot(
return &v, nil return &v, nil
} }
return rawdb.ReadValidatorSnapshot(bc.db, addr) return rawdb.ReadValidatorSnapshot(bc.db, addr, bc.CurrentBlock().Epoch())
} }
// writeValidatorSnapshots writes the snapshot of provided list of validators // writeValidatorSnapshots writes the snapshot of provided list of validators
func (bc *BlockChain) writeValidatorSnapshots(addrs []common.Address) error { func (bc *BlockChain) writeValidatorSnapshots(batch rawdb.DatabaseWriter, addrs []common.Address, epoch *big.Int) error {
// Read all validator's current data // Read all validator's current data
validators := []*staking.ValidatorWrapper{} validators := []*staking.ValidatorWrapper{}
for i := range addrs { for i := range addrs {
@ -2586,15 +2220,11 @@ func (bc *BlockChain) writeValidatorSnapshots(addrs []common.Address) error {
validators = append(validators, validator) validators = append(validators, validator)
} }
// Batch write the current data as snapshot // Batch write the current data as snapshot
batch := bc.db.NewBatch()
for i := range validators { for i := range validators {
if err := rawdb.WriteValidatorSnapshot(batch, validators[i]); err != nil { if err := rawdb.WriteValidatorSnapshot(batch, validators[i], epoch); err != nil {
return err return err
} }
} }
if err := batch.Write(); err != nil {
return err
}
// Update cache // Update cache
for i := range validators { for i := range validators {
@ -2613,7 +2243,7 @@ func (bc *BlockChain) ReadValidatorStats(addr common.Address) (*staking.Validato
} }
// UpdateValidatorVotingPower writes the voting power for the committees // UpdateValidatorVotingPower writes the voting power for the committees
func (bc *BlockChain) UpdateValidatorVotingPower(state *shard.State) error { func (bc *BlockChain) UpdateValidatorVotingPower(batch rawdb.DatabaseWriter, state *shard.State) error {
if state == nil { if state == nil {
return errors.New("[UpdateValidatorVotingPower] Nil shard state") return errors.New("[UpdateValidatorVotingPower] Nil shard state")
} }
@ -2630,8 +2260,6 @@ func (bc *BlockChain) UpdateValidatorVotingPower(state *shard.State) error {
networkWide := votepower.AggregateRosters(rosters) networkWide := votepower.AggregateRosters(rosters)
batch := bc.db.NewBatch()
for key, value := range networkWide { for key, value := range networkWide {
statsFromDB, err := rawdb.ReadValidatorStats(bc.db, key) statsFromDB, err := rawdb.ReadValidatorStats(bc.db, key)
if statsFromDB == nil { if statsFromDB == nil {
@ -2649,18 +2277,15 @@ func (bc *BlockChain) UpdateValidatorVotingPower(state *shard.State) error {
} }
} }
if err := batch.Write(); err != nil {
return err
}
// TODO: Update cache
return nil return nil
} }
// deleteValidatorSnapshots deletes the snapshot staking information of given validator address // deleteValidatorSnapshots deletes the snapshot staking information of given validator address
// TODO: delete validator snapshots from X epochs ago
func (bc *BlockChain) deleteValidatorSnapshots(addrs []common.Address) error { func (bc *BlockChain) deleteValidatorSnapshots(addrs []common.Address) error {
batch := bc.db.NewBatch() batch := bc.db.NewBatch()
for i := range addrs { for i := range addrs {
rawdb.DeleteValidatorSnapshot(batch, addrs[i]) rawdb.DeleteValidatorSnapshot(batch, addrs[i], bc.CurrentBlock().Epoch())
} }
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
return err return err
@ -2672,8 +2297,8 @@ func (bc *BlockChain) deleteValidatorSnapshots(addrs []common.Address) error {
} }
// UpdateValidatorSnapshots updates the content snapshot of all validators // UpdateValidatorSnapshots updates the content snapshot of all validators
// Note: this should only be called within the blockchain insertBlock process. // Note: this should only be called within the blockchain insert process.
func (bc *BlockChain) UpdateValidatorSnapshots() error { func (bc *BlockChain) UpdateValidatorSnapshots(batch rawdb.DatabaseWriter, epoch *big.Int) error {
allValidators, err := bc.ReadValidatorList() allValidators, err := bc.ReadValidatorList()
if err != nil { if err != nil {
return err return err
@ -2685,7 +2310,7 @@ func (bc *BlockChain) UpdateValidatorSnapshots() error {
// return err // return err
//} //}
return bc.writeValidatorSnapshots(allValidators) return bc.writeValidatorSnapshots(batch, allValidators, epoch)
} }
// ReadValidatorList reads the addresses of current all validators // ReadValidatorList reads the addresses of current all validators
@ -2702,9 +2327,9 @@ func (bc *BlockChain) ReadValidatorList() ([]common.Address, error) {
} }
// WriteValidatorList writes the list of validator addresses to database // WriteValidatorList writes the list of validator addresses to database
// Note: this should only be called within the blockchain insertBlock process. // Note: this should only be called within the blockchain insert process.
func (bc *BlockChain) WriteValidatorList(addrs []common.Address) error { func (bc *BlockChain) WriteValidatorList(db rawdb.DatabaseWriter, addrs []common.Address) error {
err := rawdb.WriteValidatorList(bc.db, addrs, false) err := rawdb.WriteValidatorList(db, addrs, false)
if err != nil { if err != nil {
return err return err
} }
@ -2729,9 +2354,9 @@ func (bc *BlockChain) ReadActiveValidatorList() ([]common.Address, error) {
} }
// WriteActiveValidatorList writes the list of active validator addresses to database // WriteActiveValidatorList writes the list of active validator addresses to database
// Note: this should only be called within the blockchain insertBlock process. // Note: this should only be called within the blockchain insert process.
func (bc *BlockChain) WriteActiveValidatorList(addrs []common.Address) error { func (bc *BlockChain) WriteActiveValidatorList(batch rawdb.DatabaseWriter, addrs []common.Address) error {
err := rawdb.WriteValidatorList(bc.db, addrs, true) err := rawdb.WriteValidatorList(batch, addrs, true)
if err != nil { if err != nil {
return err return err
} }
@ -2756,8 +2381,8 @@ func (bc *BlockChain) ReadDelegationsByDelegator(delegator common.Address) ([]st
} }
// writeDelegationsByDelegator writes the list of validator addresses to database // writeDelegationsByDelegator writes the list of validator addresses to database
func (bc *BlockChain) writeDelegationsByDelegator(delegator common.Address, indices []staking.DelegationIndex) error { func (bc *BlockChain) writeDelegationsByDelegator(batch rawdb.DatabaseWriter, delegator common.Address, indices []staking.DelegationIndex) error {
err := rawdb.WriteDelegationsByDelegator(bc.db, delegator, indices) err := rawdb.WriteDelegationsByDelegator(batch, delegator, indices)
if err != nil { if err != nil {
return err return err
} }
@ -2769,8 +2394,8 @@ func (bc *BlockChain) writeDelegationsByDelegator(delegator common.Address, indi
} }
// UpdateStakingMetaData updates the validator's and the delegator's meta data according to staking transaction // UpdateStakingMetaData updates the validator's and the delegator's meta data according to staking transaction
// Note: this should only be called within the blockchain insertBlock process. // Note: this should only be called within the blockchain insert process.
func (bc *BlockChain) UpdateStakingMetaData(tx *staking.StakingTransaction, root common.Hash) error { func (bc *BlockChain) UpdateStakingMetaData(batch rawdb.DatabaseWriter, tx *staking.StakingTransaction, root common.Hash) error {
// TODO: simply the logic here in staking/types/transaction.go // TODO: simply the logic here in staking/types/transaction.go
payload, err := tx.RLPEncodeStakeMsg() payload, err := tx.RLPEncodeStakeMsg()
if err != nil { if err != nil {
@ -2784,7 +2409,6 @@ func (bc *BlockChain) UpdateStakingMetaData(tx *staking.StakingTransaction, root
switch tx.StakingType() { switch tx.StakingType() {
case staking.DirectiveCreateValidator: case staking.DirectiveCreateValidator:
createValidator := decodePayload.(*staking.CreateValidator) createValidator := decodePayload.(*staking.CreateValidator)
// TODO: batch add validator list instead of one by one
list, err := bc.ReadValidatorList() list, err := bc.ReadValidatorList()
if err != nil { if err != nil {
return err return err
@ -2796,7 +2420,7 @@ func (bc *BlockChain) UpdateStakingMetaData(tx *staking.StakingTransaction, root
list = utils.AppendIfMissing(list, createValidator.ValidatorAddress) list = utils.AppendIfMissing(list, createValidator.ValidatorAddress)
if len(list) > beforeLen { if len(list) > beforeLen {
if err = bc.WriteValidatorList(list); err != nil { if err = bc.WriteValidatorList(batch, list); err != nil {
return err return err
} }
} }
@ -2809,16 +2433,16 @@ func (bc *BlockChain) UpdateStakingMetaData(tx *staking.StakingTransaction, root
validator.Snapshot.Epoch = epoch validator.Snapshot.Epoch = epoch
if err := rawdb.WriteValidatorSnapshot(bc.db, validator); err != nil { if err := rawdb.WriteValidatorSnapshot(batch, validator, epoch); err != nil {
return err return err
} }
// Add self delegation into the index // Add self delegation into the index
return bc.addDelegationIndex(createValidator.ValidatorAddress, createValidator.ValidatorAddress, root) return bc.addDelegationIndex(batch, createValidator.ValidatorAddress, createValidator.ValidatorAddress, root)
case staking.DirectiveEditValidator: case staking.DirectiveEditValidator:
case staking.DirectiveDelegate: case staking.DirectiveDelegate:
delegate := decodePayload.(*staking.Delegate) delegate := decodePayload.(*staking.Delegate)
return bc.addDelegationIndex(delegate.DelegatorAddress, delegate.ValidatorAddress, root) return bc.addDelegationIndex(batch, delegate.DelegatorAddress, delegate.ValidatorAddress, root)
case staking.DirectiveUndelegate: case staking.DirectiveUndelegate:
case staking.DirectiveCollectRewards: case staking.DirectiveCollectRewards:
default: default:
@ -2839,8 +2463,8 @@ func (bc *BlockChain) ReadBlockRewardAccumulator(number uint64) (*big.Int, error
// WriteBlockRewardAccumulator directly writes the BlockRewardAccumulator value // WriteBlockRewardAccumulator directly writes the BlockRewardAccumulator value
// Note: this should only be called once during staking launch. // Note: this should only be called once during staking launch.
func (bc *BlockChain) WriteBlockRewardAccumulator(reward *big.Int, number uint64) error { func (bc *BlockChain) WriteBlockRewardAccumulator(batch rawdb.DatabaseWriter, reward *big.Int, number uint64) error {
err := rawdb.WriteBlockRewardAccumulator(bc.db, reward, number) err := rawdb.WriteBlockRewardAccumulator(batch, reward, number)
if err != nil { if err != nil {
return err return err
} }
@ -2849,19 +2473,19 @@ func (bc *BlockChain) WriteBlockRewardAccumulator(reward *big.Int, number uint64
} }
//UpdateBlockRewardAccumulator .. //UpdateBlockRewardAccumulator ..
// Note: this should only be called within the blockchain insertBlock process. // Note: this should only be called within the blockchain insert process.
func (bc *BlockChain) UpdateBlockRewardAccumulator(diff *big.Int, number uint64) error { func (bc *BlockChain) UpdateBlockRewardAccumulator(batch rawdb.DatabaseWriter, diff *big.Int, number uint64) error {
current, err := bc.ReadBlockRewardAccumulator(number - 1) current, err := bc.ReadBlockRewardAccumulator(number - 1)
if err != nil { if err != nil {
// one-off fix for pangaea, return after pangaea enter staking. // one-off fix for pangaea, return after pangaea enter staking.
current = big.NewInt(0) current = big.NewInt(0)
bc.WriteBlockRewardAccumulator(current, number) bc.WriteBlockRewardAccumulator(batch, current, number)
} }
return bc.WriteBlockRewardAccumulator(new(big.Int).Add(current, diff), number) return bc.WriteBlockRewardAccumulator(batch, new(big.Int).Add(current, diff), number)
} }
// Note this should read from the state of current block in concern (root == newBlock.root) // Note this should read from the state of current block in concern (root == newBlock.root)
func (bc *BlockChain) addDelegationIndex(delegatorAddress, validatorAddress common.Address, root common.Hash) error { func (bc *BlockChain) addDelegationIndex(batch rawdb.DatabaseWriter, delegatorAddress, validatorAddress common.Address, root common.Hash) error {
// Get existing delegations // Get existing delegations
delegations, err := bc.ReadDelegationsByDelegator(delegatorAddress) delegations, err := bc.ReadDelegationsByDelegator(delegatorAddress)
if err != nil { if err != nil {
@ -2890,7 +2514,7 @@ func (bc *BlockChain) addDelegationIndex(delegatorAddress, validatorAddress comm
}) })
} }
} }
return bc.writeDelegationsByDelegator(delegatorAddress, delegations) return bc.writeDelegationsByDelegator(batch, delegatorAddress, delegations)
} }
// ValidatorCandidates returns the up to date validator candidates for next epoch // ValidatorCandidates returns the up to date validator candidates for next epoch

@ -0,0 +1,184 @@
package core
import (
"errors"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
)
// CommitOffChainData write off chain data of a block onto db writer.
func (bc *BlockChain) CommitOffChainData(
batch rawdb.DatabaseWriter, block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt, payout *big.Int, state *state.DB, root common.Hash) (status WriteStatus, err error) {
//// Write receipts of the block
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
//// Cross-shard txns
epoch := block.Header().Epoch()
if bc.chainConfig.HasCrossTxFields(block.Epoch()) {
shardingConfig := shard.Schedule.InstanceForEpoch(epoch)
shardNum := int(shardingConfig.NumShards())
for i := 0; i < shardNum; i++ {
if i == int(block.ShardID()) {
continue
}
shardReceipts := types.CXReceipts(cxReceipts).GetToShardReceipts(uint32(i))
err := rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts)
if err != nil {
utils.Logger().Error().Err(err).Interface("shardReceipts", shardReceipts).Int("toShardID", i).Msg("WriteCXReceipts cannot write into database")
return NonStatTy, err
}
}
// Mark incomingReceipts in the block as spent
bc.WriteCXReceiptsProofSpent(batch, block.IncomingReceipts())
}
//// VRF + VDF
//check non zero VRF field in header and add to local db
//if len(block.Vrf()) > 0 {
// vrfBlockNumbers, _ := bc.ReadEpochVrfBlockNums(block.Header().Epoch())
// if (len(vrfBlockNumbers) > 0) && (vrfBlockNumbers[len(vrfBlockNumbers)-1] == block.NumberU64()) {
// utils.Logger().Error().
// Str("number", block.Number().String()).
// Str("epoch", block.Header().Epoch().String()).
// Msg("VRF block number is already in local db")
// } else {
// vrfBlockNumbers = append(vrfBlockNumbers, block.NumberU64())
// err = bc.WriteEpochVrfBlockNums(block.Header().Epoch(), vrfBlockNumbers)
// if err != nil {
// utils.Logger().Error().
// Str("number", block.Number().String()).
// Str("epoch", block.Header().Epoch().String()).
// Msg("failed to write VRF block number to local db")
// return NonStatTy, err
// }
// }
//}
//
////check non zero Vdf in header and add to local db
//if len(block.Vdf()) > 0 {
// err = bc.WriteEpochVdfBlockNum(block.Header().Epoch(), block.Number())
// if err != nil {
// utils.Logger().Error().
// Str("number", block.Number().String()).
// Str("epoch", block.Header().Epoch().String()).
// Msg("failed to write VDF block number to local db")
// return NonStatTy, err
// }
//}
//// Shard State and Validator Update
header := block.Header()
if len(header.ShardState()) > 0 {
// Write shard state for the new epoch
epoch := new(big.Int).Add(header.Epoch(), common.Big1)
shardState, err := block.Header().GetShardState()
if err == nil && shardState.Epoch != nil && bc.chainConfig.IsStaking(shardState.Epoch) {
// After staking, the epoch will be decided by the epoch in the shard state.
epoch = new(big.Int).Set(shardState.Epoch)
}
newShardState, err := bc.WriteShardStateBytes(batch, epoch, header.ShardState())
if err != nil {
header.Logger(utils.Logger()).Warn().Err(err).Msg("cannot store shard state")
return NonStatTy, err
}
// Find all the active validator addresses and store them in db
allActiveValidators := []common.Address{}
processed := make(map[common.Address]struct{})
for i := range newShardState.Shards {
shard := newShardState.Shards[i]
for j := range shard.Slots {
slot := shard.Slots[j]
if slot.EffectiveStake != nil { // For external validator
_, ok := processed[slot.EcdsaAddress]
if !ok {
processed[slot.EcdsaAddress] = struct{}{}
allActiveValidators = append(allActiveValidators, shard.Slots[j].EcdsaAddress)
}
}
}
}
// Update active validators
if err := bc.WriteActiveValidatorList(batch, allActiveValidators); err != nil {
return NonStatTy, err
}
// Update snapshots for all validators
if err := bc.UpdateValidatorSnapshots(batch, epoch); err != nil {
return NonStatTy, err
}
}
// Do bookkeeping for new staking txns
for _, tx := range block.StakingTransactions() {
err = bc.UpdateStakingMetaData(batch, tx, root)
// keep offchain database consistency with onchain we need revert
// but it should not happend unless local database corrupted
if err != nil {
utils.Logger().Debug().Msgf("oops, UpdateStakingMetaData failed, err: %+v", err)
return NonStatTy, err
}
}
// Update voting power of validators for all shards
if block.ShardID() == shard.BeaconChainShardID &&
len(block.Header().ShardState()) > 0 {
shardState := new(shard.State)
if shardState, err = shard.DecodeWrapper(block.Header().ShardState()); err == nil {
if err = bc.UpdateValidatorVotingPower(batch, shardState); err != nil {
utils.Logger().Err(err).Msg("[UpdateValidatorVotingPower] Failed to update voting power")
}
} else {
utils.Logger().Err(err).Msg("[UpdateValidatorVotingPower] Failed to decode shard state")
}
}
//// Writing beacon chain cross links
if header.ShardID() == shard.BeaconChainShardID &&
bc.chainConfig.IsCrossLink(block.Epoch()) &&
len(header.CrossLinks()) > 0 {
crossLinks := &types.CrossLinks{}
err = rlp.DecodeBytes(header.CrossLinks(), crossLinks)
if err != nil {
header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain/crosslinks] cannot parse cross links")
return NonStatTy, err
}
if !crossLinks.IsSorted() {
header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain/crosslinks] cross links are not sorted")
return NonStatTy, errors.New("proposed cross links are not sorted")
}
for _, crossLink := range *crossLinks {
// Process crosslink
if err := bc.WriteCrossLinks(batch, types.CrossLinks{crossLink}); err == nil {
utils.Logger().Info().Uint64("blockNum", crossLink.BlockNum()).Uint32("shardID", crossLink.ShardID()).Msg("[insertChain/crosslinks] Cross Link Added to Beaconchain")
}
bc.LastContinuousCrossLink(batch, crossLink)
}
//clean/update local database cache after crosslink inserted into blockchain
num, err := bc.DeleteCommittedFromPendingCrossLinks(*crossLinks)
utils.Logger().Debug().Msgf("DeleteCommittedFromPendingCrossLinks, crosslinks in header %d, pending crosslinks: %d, error: %+v", len(*crossLinks), num, err)
}
if bc.CurrentHeader().ShardID() == shard.BeaconChainShardID {
if bc.chainConfig.IsStaking(block.Epoch()) {
bc.UpdateBlockRewardAccumulator(batch, payout, block.Number().Uint64())
} else {
// block reward never accumulate before staking
bc.WriteBlockRewardAccumulator(batch, big.NewInt(0), block.Number().Uint64())
}
}
return CanonStatTy, nil
}

@ -25,10 +25,7 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
staking "github.com/harmony-one/harmony/staking/types"
) )
// MsgNoShardStateFromDB error message for shard state reading failure // MsgNoShardStateFromDB error message for shard state reading failure
@ -122,24 +119,6 @@ func WriteHeadFastBlockHash(db DatabaseWriter, hash common.Hash) {
} }
} }
// ReadFastTrieProgress retrieves the number of tries nodes fast synced to allow
// reporting correct numbers across restarts.
func ReadFastTrieProgress(db DatabaseReader) uint64 {
data, _ := db.Get(fastTrieProgressKey)
if len(data) == 0 {
return 0
}
return new(big.Int).SetBytes(data).Uint64()
}
// WriteFastTrieProgress stores the fast sync trie process counter to support
// retrieving it across restarts.
func WriteFastTrieProgress(db DatabaseWriter, count uint64) {
if err := db.Put(fastTrieProgressKey, new(big.Int).SetUint64(count).Bytes()); err != nil {
utils.Logger().Error().Msg("Failed to store fast sync trie progress")
}
}
// ReadHeaderRLP retrieves a block header in its raw RLP database encoding. // ReadHeaderRLP retrieves a block header in its raw RLP database encoding.
func ReadHeaderRLP(db DatabaseReader, hash common.Hash, number uint64) rlp.RawValue { func ReadHeaderRLP(db DatabaseReader, hash common.Hash, number uint64) rlp.RawValue {
data, _ := db.Get(headerKey(number, hash)) data, _ := db.Get(headerKey(number, hash))
@ -351,31 +330,6 @@ func ReadBlock(db DatabaseReader, hash common.Hash, number uint64) *types.Block
func WriteBlock(db DatabaseWriter, block *types.Block) { func WriteBlock(db DatabaseWriter, block *types.Block) {
WriteBody(db, block.Hash(), block.NumberU64(), block.Body()) WriteBody(db, block.Hash(), block.NumberU64(), block.Body())
WriteHeader(db, block.Header()) WriteHeader(db, block.Header())
// TODO ek – maybe roll the below into WriteHeader()
epoch := block.Header().Epoch()
if epoch == nil {
// backward compatibility
return
}
epochBlockNum := block.Number()
writeOne := func() {
if err := WriteEpochBlockNumber(db, epoch, epochBlockNum); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to write epoch block number")
}
}
// A block may be a genesis block AND end-of-epoch block at the same time.
if epochBlockNum.Sign() == 0 {
// Genesis block; record this block's epoch and block numbers.
writeOne()
}
// TODO: don't change epoch based on shard state presence
if len(block.Header().ShardState()) > 0 && block.NumberU64() != 0 {
// End-of-epoch block; record the next epoch after this block.
epoch = new(big.Int).Add(epoch, common.Big1)
epochBlockNum = new(big.Int).Add(epochBlockNum, common.Big1)
writeOne()
}
} }
// DeleteBlock removes all block data associated with a hash. // DeleteBlock removes all block data associated with a hash.
@ -412,394 +366,3 @@ func FindCommonAncestor(db DatabaseReader, a, b *block.Header) *block.Header {
} }
return a return a
} }
// ReadShardState retrieves sharding state.
func ReadShardState(
db DatabaseReader, epoch *big.Int,
) (*shard.State, error) {
data, err := db.Get(shardStateKey(epoch))
if err != nil {
return nil, ctxerror.New(MsgNoShardStateFromDB,
"epoch", epoch,
).WithCause(err)
}
ss, err2 := shard.DecodeWrapper(data)
if err2 != nil {
return nil, ctxerror.New("cannot decode sharding state",
"epoch", epoch,
).WithCause(err2)
}
return ss, nil
}
// WriteShardStateBytes stores sharding state into database.
func WriteShardStateBytes(db DatabaseWriter, epoch *big.Int, data []byte) (err error) {
if err = db.Put(shardStateKey(epoch), data); err != nil {
return ctxerror.New("cannot write sharding state",
"epoch", epoch,
).WithCause(err)
}
utils.Logger().Info().Str("epoch", epoch.String()).Int("size", len(data)).Msg("wrote sharding state")
return nil
}
// ReadLastCommits retrieves LastCommits.
func ReadLastCommits(db DatabaseReader) ([]byte, error) {
var data []byte
data, err := db.Get(lastCommitsKey)
if err != nil {
return nil, ctxerror.New("cannot read last commits from rawdb").WithCause(err)
}
return data, nil
}
// WriteLastCommits stores last commits into database.
func WriteLastCommits(
db DatabaseWriter, data []byte,
) (err error) {
if err = db.Put(lastCommitsKey, data); err != nil {
return ctxerror.New("cannot write last commits").WithCause(err)
}
utils.Logger().Info().
Int("size", len(data)).
Msg("wrote last commits")
return nil
}
// ReadEpochBlockNumber retrieves the epoch block number for the given epoch,
// or nil if the given epoch is not found in the database.
func ReadEpochBlockNumber(db DatabaseReader, epoch *big.Int) (*big.Int, error) {
data, err := db.Get(epochBlockNumberKey(epoch))
if err != nil {
return nil, err
}
return new(big.Int).SetBytes(data), nil
}
// WriteEpochBlockNumber stores the given epoch-number-to-epoch-block-number in the database.
func WriteEpochBlockNumber(db DatabaseWriter, epoch, blockNum *big.Int) error {
return db.Put(epochBlockNumberKey(epoch), blockNum.Bytes())
}
// ReadEpochVrfBlockNums retrieves the VRF block numbers for the given epoch
func ReadEpochVrfBlockNums(db DatabaseReader, epoch *big.Int) ([]byte, error) {
return db.Get(epochVrfBlockNumbersKey(epoch))
}
// WriteEpochVrfBlockNums stores the VRF block numbers for the given epoch
func WriteEpochVrfBlockNums(db DatabaseWriter, epoch *big.Int, data []byte) error {
return db.Put(epochVrfBlockNumbersKey(epoch), data)
}
// ReadEpochVdfBlockNum retrieves the VDF block number for the given epoch
func ReadEpochVdfBlockNum(db DatabaseReader, epoch *big.Int) ([]byte, error) {
return db.Get(epochVdfBlockNumberKey(epoch))
}
// WriteEpochVdfBlockNum stores the VDF block number for the given epoch
func WriteEpochVdfBlockNum(db DatabaseWriter, epoch *big.Int, data []byte) error {
return db.Put(epochVdfBlockNumberKey(epoch), data)
}
// ReadCrossLinkShardBlock retrieves the blockHash given shardID and blockNum
func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64) ([]byte, error) {
return db.Get(crosslinkKey(shardID, blockNum))
}
// WriteCrossLinkShardBlock stores the blockHash given shardID and blockNum
func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte) error {
return db.Put(crosslinkKey(shardID, blockNum), data)
}
// DeleteCrossLinkShardBlock deletes the blockHash given shardID and blockNum
func DeleteCrossLinkShardBlock(db DatabaseDeleter, shardID uint32, blockNum uint64) error {
return db.Delete(crosslinkKey(shardID, blockNum))
}
// ReadShardLastCrossLink read the last cross link of a shard
func ReadShardLastCrossLink(db DatabaseReader, shardID uint32) ([]byte, error) {
return db.Get(shardLastCrosslinkKey(shardID))
}
// WriteShardLastCrossLink stores the last cross link of a shard
func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) error {
return db.Put(shardLastCrosslinkKey(shardID), data)
}
// ReadPendingCrossLinks retrieves last pending crosslinks.
func ReadPendingCrossLinks(db DatabaseReader) ([]byte, error) {
return db.Get(pendingCrosslinkKey)
}
// WritePendingCrossLinks stores last pending crosslinks into database.
func WritePendingCrossLinks(db DatabaseWriter, bytes []byte) error {
return db.Put(pendingCrosslinkKey, bytes)
}
// DeletePendingCrossLinks stores last pending crosslinks into database.
func DeletePendingCrossLinks(db DatabaseDeleter) error {
return db.Delete(pendingCrosslinkKey)
}
// ReadPendingSlashingCandidates retrieves last pending slashing candidates
func ReadPendingSlashingCandidates(db DatabaseReader) ([]byte, error) {
return db.Get(pendingSlashingKey)
}
// WritePendingSlashingCandidates stores last pending slashing candidates into database.
func WritePendingSlashingCandidates(db DatabaseWriter, bytes []byte) error {
return db.Put(pendingSlashingKey, bytes)
}
// DeletePendingSlashingCandidates stores last pending slashing candidates into database.
func DeletePendingSlashingCandidates(db DatabaseDeleter) error {
return db.Delete(pendingSlashingKey)
}
// ReadCXReceipts retrieves all the transactions of receipts given destination shardID, number and blockHash
func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) (types.CXReceipts, error) {
data, err := db.Get(cxReceiptKey(shardID, number, hash))
if err != nil || len(data) == 0 {
utils.Logger().Info().Err(err).Uint64("number", number).Int("dataLen", len(data)).Msg("ReadCXReceipts")
return nil, err
}
cxReceipts := types.CXReceipts{}
if err := rlp.DecodeBytes(data, &cxReceipts); err != nil {
return nil, err
}
return cxReceipts, nil
}
// WriteCXReceipts stores all the transaction receipts given destination shardID, blockNumber and blockHash
func WriteCXReceipts(db DatabaseWriter, shardID uint32, number uint64, hash common.Hash, receipts types.CXReceipts) error {
bytes, err := rlp.EncodeToBytes(receipts)
if err != nil {
utils.Logger().Error().Msg("[WriteCXReceipts] Failed to encode cross shard tx receipts")
}
// Store the receipt slice
if err := db.Put(cxReceiptKey(shardID, number, hash), bytes); err != nil {
utils.Logger().Error().Msg("[WriteCXReceipts] Failed to store cxreceipts")
}
return err
}
// ReadCXReceiptsProofSpent check whether a CXReceiptsProof is unspent
func ReadCXReceiptsProofSpent(db DatabaseReader, shardID uint32, number uint64) (byte, error) {
data, err := db.Get(cxReceiptSpentKey(shardID, number))
if err != nil || len(data) == 0 {
return NAByte, ctxerror.New("[ReadCXReceiptsProofSpent] Cannot find the key", "shardID", shardID, "number", number).WithCause(err)
}
return data[0], nil
}
// WriteCXReceiptsProofSpent write CXReceiptsProof as spent into database
func WriteCXReceiptsProofSpent(dbw DatabaseWriter, cxp *types.CXReceiptsProof) error {
shardID := cxp.MerkleProof.ShardID
blockNum := cxp.MerkleProof.BlockNum.Uint64()
return dbw.Put(cxReceiptSpentKey(shardID, blockNum), []byte{SpentByte})
}
// DeleteCXReceiptsProofSpent removes unspent indicator of a given blockHash
func DeleteCXReceiptsProofSpent(db DatabaseDeleter, shardID uint32, number uint64) {
if err := db.Delete(cxReceiptSpentKey(shardID, number)); err != nil {
utils.Logger().Error().Msg("Failed to delete receipts unspent indicator")
}
}
// ReadCXReceiptsProofUnspentCheckpoint returns the last unspent blocknumber
func ReadCXReceiptsProofUnspentCheckpoint(db DatabaseReader, shardID uint32) (uint64, error) {
by, err := db.Get(cxReceiptUnspentCheckpointKey(shardID))
if err != nil {
return 0, ctxerror.New("[ReadCXReceiptsProofUnspent] Cannot Unspent Checkpoint", "shardID", shardID).WithCause(err)
}
lastCheckpoint := binary.BigEndian.Uint64(by[:])
return lastCheckpoint, nil
}
// WriteCXReceiptsProofUnspentCheckpoint check whether a CXReceiptsProof is unspent, true means not spent
func WriteCXReceiptsProofUnspentCheckpoint(db DatabaseWriter, shardID uint32, blockNum uint64) error {
by := make([]byte, 8)
binary.BigEndian.PutUint64(by[:], blockNum)
return db.Put(cxReceiptUnspentCheckpointKey(shardID), by)
}
// ReadValidatorInformation retrieves staking validator by its address
func ReadValidatorInformation(db DatabaseReader, addr common.Address) (*staking.ValidatorWrapper, error) {
data, err := db.Get(validatorKey(addr))
if err != nil || len(data) == 0 {
utils.Logger().Info().Err(err).Msg("ReadValidatorInformation")
return nil, err
}
v := staking.ValidatorWrapper{}
if err := rlp.DecodeBytes(data, &v); err != nil {
utils.Logger().Error().Err(err).Str("address", addr.Hex()).Msg("Unable to Decode staking validator from database")
return nil, err
}
return &v, nil
}
// WriteValidatorData stores validator's information by its address
func WriteValidatorData(db DatabaseWriter, v *staking.ValidatorWrapper) error {
bytes, err := rlp.EncodeToBytes(v)
if err != nil {
utils.Logger().Error().Msg("[WriteValidatorData] Failed to encode")
return err
}
if err := db.Put(validatorKey(v.Address), bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorData] Failed to store to database")
return err
}
return err
}
// ReadValidatorSnapshot retrieves validator's snapshot by its address
func ReadValidatorSnapshot(
db DatabaseReader, addr common.Address,
) (*staking.ValidatorWrapper, error) {
data, err := db.Get(validatorSnapshotKey(addr))
if err != nil || len(data) == 0 {
utils.Logger().Info().Err(err).Msg("ReadValidatorSnapshot")
return nil, err
}
v := staking.ValidatorWrapper{}
if err := rlp.DecodeBytes(data, &v); err != nil {
utils.Logger().Error().Err(err).
Str("address", addr.Hex()).
Msg("Unable to decode validator snapshot from database")
return nil, err
}
return &v, nil
}
// WriteValidatorSnapshot stores validator's snapshot by its address
func WriteValidatorSnapshot(db DatabaseWriter, v *staking.ValidatorWrapper) error {
bytes, err := rlp.EncodeToBytes(v)
if err != nil {
utils.Logger().Error().Msg("[WriteValidatorSnapshot] Failed to encode")
return err
}
if err := db.Put(validatorSnapshotKey(v.Address), bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorSnapshot] Failed to store to database")
return err
}
return err
}
// ReadValidatorStats retrieves validator's stats by its address
func ReadValidatorStats(
db DatabaseReader, addr common.Address,
) (*staking.ValidatorStats, error) {
data, err := db.Get(validatorStatsKey(addr))
if err != nil || len(data) == 0 {
utils.Logger().Info().Err(err).Msg("ReadValidatorStats")
return nil, err
}
stats := staking.ValidatorStats{}
if err := rlp.DecodeBytes(data, &stats); err != nil {
utils.Logger().Error().Err(err).
Str("address", addr.Hex()).
Msg("Unable to decode validator stats from database")
return nil, err
}
return &stats, nil
}
// WriteValidatorStats stores validator's stats by its address
func WriteValidatorStats(
db DatabaseWriter, addr common.Address, stats *staking.ValidatorStats,
) error {
bytes, err := rlp.EncodeToBytes(stats)
if err != nil {
utils.Logger().Error().Msg("[WriteValidatorStats] Failed to encode")
return err
}
if err := db.Put(validatorStatsKey(addr), bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorStats] Failed to store to database")
return err
}
return err
}
// DeleteValidatorSnapshot removes the validator's snapshot by its address
func DeleteValidatorSnapshot(db DatabaseDeleter, addr common.Address) {
if err := db.Delete(validatorSnapshotKey(addr)); err != nil {
utils.Logger().Error().Msg("Failed to delete snapshot of a validator")
}
}
// ReadValidatorList retrieves staking validator by its address
// Return only active validators if activeOnly==true, otherwise, return all validators
func ReadValidatorList(db DatabaseReader, activeOnly bool) ([]common.Address, error) {
key := validatorListKey
if activeOnly {
key = activeValidatorListKey
}
data, err := db.Get(key)
if err != nil || len(data) == 0 {
return []common.Address{}, nil
}
addrs := []common.Address{}
if err := rlp.DecodeBytes(data, &addrs); err != nil {
utils.Logger().Error().Err(err).Msg("Unable to Decode validator List from database")
return nil, err
}
return addrs, nil
}
// WriteValidatorList stores staking validator's information by its address
// Writes only for active validators if activeOnly==true, otherwise, writes for all validators
func WriteValidatorList(db DatabaseWriter, addrs []common.Address, activeOnly bool) error {
key := validatorListKey
if activeOnly {
key = activeValidatorListKey
}
bytes, err := rlp.EncodeToBytes(addrs)
if err != nil {
utils.Logger().Error().Msg("[WriteValidatorList] Failed to encode")
}
if err := db.Put(key, bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorList] Failed to store to database")
}
return err
}
// ReadDelegationsByDelegator retrieves the list of validators delegated by a delegator
func ReadDelegationsByDelegator(db DatabaseReader, delegator common.Address) ([]staking.DelegationIndex, error) {
data, err := db.Get(delegatorValidatorListKey(delegator))
if err != nil || len(data) == 0 {
return []staking.DelegationIndex{}, nil
}
addrs := []staking.DelegationIndex{}
if err := rlp.DecodeBytes(data, &addrs); err != nil {
utils.Logger().Error().Err(err).Msg("Unable to Decode delegations from database")
return nil, err
}
return addrs, nil
}
// WriteDelegationsByDelegator stores the list of validators delegated by a delegator
func WriteDelegationsByDelegator(db DatabaseWriter, delegator common.Address, indices []staking.DelegationIndex) error {
bytes, err := rlp.EncodeToBytes(indices)
if err != nil {
utils.Logger().Error().Msg("[writeDelegationsByDelegator] Failed to encode")
}
if err := db.Put(delegatorValidatorListKey(delegator), bytes); err != nil {
utils.Logger().Error().Msg("[writeDelegationsByDelegator] Failed to store to database")
}
return err
}
// ReadBlockRewardAccumulator ..
func ReadBlockRewardAccumulator(db DatabaseReader, number uint64) (*big.Int, error) {
data, err := db.Get(blockRewardAccumKey(number))
if err != nil {
return nil, err
}
return new(big.Int).SetBytes(data), nil
}
// WriteBlockRewardAccumulator ..
func WriteBlockRewardAccumulator(db DatabaseWriter, newAccum *big.Int, number uint64) error {
return db.Put(blockRewardAccumKey(number), newAccum.Bytes())
}

@ -145,11 +145,11 @@ func TestBlockStorage(t *testing.T) {
} else if types.DeriveSha(types.Transactions(entry.Transactions())) != types.DeriveSha(block.Transactions()) || types.CalcUncleHash(entry.Uncles()) != types.CalcUncleHash(block.Uncles()) { } else if types.DeriveSha(types.Transactions(entry.Transactions())) != types.DeriveSha(block.Transactions()) || types.CalcUncleHash(entry.Uncles()) != types.CalcUncleHash(block.Uncles()) {
t.Fatalf("Retrieved body mismatch: have %v, want %v", entry, block.Body()) t.Fatalf("Retrieved body mismatch: have %v, want %v", entry, block.Body())
} }
if actual, err := ReadEpochBlockNumber(db, big.NewInt(0)); err != nil { //if actual, err := ReadEpochBlockNumber(db, big.NewInt(0)); err != nil {
t.Fatalf("Genesis epoch block number not found, error=%#v", err) // t.Fatalf("Genesis epoch block number not found, error=%#v", err)
} else if expected := big.NewInt(0); actual.Cmp(expected) != 0 { //} else if expected := big.NewInt(0); actual.Cmp(expected) != 0 {
t.Fatalf("Genesis epoch block number mismatch: have %v, want %v", actual, expected) // t.Fatalf("Genesis epoch block number mismatch: have %v, want %v", actual, expected)
} //}
//if actual, err := ReadEpochBlockNumber(db, big.NewInt(1)); err != nil { //if actual, err := ReadEpochBlockNumber(db, big.NewInt(1)); err != nil {
// t.Fatalf("Next epoch block number not found, error=%#v", err) // t.Fatalf("Next epoch block number not found, error=%#v", err)
//} else if expected := big.NewInt(1); actual.Cmp(expected) != 0 { //} else if expected := big.NewInt(1); actual.Cmp(expected) != 0 {

@ -0,0 +1,362 @@
package rawdb
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
staking "github.com/harmony-one/harmony/staking/types"
)
// ReadShardState retrieves shard state of a specific epoch.
func ReadShardState(
db DatabaseReader, epoch *big.Int,
) (*shard.State, error) {
data, err := db.Get(shardStateKey(epoch))
if err != nil {
return nil, ctxerror.New(MsgNoShardStateFromDB,
"epoch", epoch,
).WithCause(err)
}
ss, err2 := shard.DecodeWrapper(data)
if err2 != nil {
return nil, ctxerror.New("cannot decode sharding state",
"epoch", epoch,
).WithCause(err2)
}
return ss, nil
}
// WriteShardStateBytes stores sharding state into database.
func WriteShardStateBytes(db DatabaseWriter, epoch *big.Int, data []byte) (err error) {
if err = db.Put(shardStateKey(epoch), data); err != nil {
return ctxerror.New("cannot write sharding state",
"epoch", epoch,
).WithCause(err)
}
utils.Logger().Info().Str("epoch", epoch.String()).Int("size", len(data)).Msg("wrote sharding state")
return nil
}
// ReadLastCommits retrieves the commit signatures on the current block of blockchain.
func ReadLastCommits(db DatabaseReader) ([]byte, error) {
var data []byte
data, err := db.Get(lastCommitsKey)
if err != nil {
return nil, ctxerror.New("cannot read last commits from rawdb").WithCause(err)
}
return data, nil
}
// WriteLastCommits stores the commit signatures collected on the newly confirmed block into database.
func WriteLastCommits(
db DatabaseWriter, data []byte,
) (err error) {
if err = db.Put(lastCommitsKey, data); err != nil {
return ctxerror.New("cannot write last commits").WithCause(err)
}
utils.Logger().Info().
Int("size", len(data)).
Msg("wrote last commits")
return nil
}
// ReadCrossLinkShardBlock retrieves the blockHash given shardID and blockNum
func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64) ([]byte, error) {
return db.Get(crosslinkKey(shardID, blockNum))
}
// WriteCrossLinkShardBlock stores the blockHash given shardID and blockNum
func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte) error {
return db.Put(crosslinkKey(shardID, blockNum), data)
}
// DeleteCrossLinkShardBlock deletes the blockHash given shardID and blockNum
func DeleteCrossLinkShardBlock(db DatabaseDeleter, shardID uint32, blockNum uint64) error {
return db.Delete(crosslinkKey(shardID, blockNum))
}
// ReadShardLastCrossLink read the last cross link of a shard
func ReadShardLastCrossLink(db DatabaseReader, shardID uint32) ([]byte, error) {
return db.Get(shardLastCrosslinkKey(shardID))
}
// WriteShardLastCrossLink stores the last cross link of a shard
func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) error {
return db.Put(shardLastCrosslinkKey(shardID), data)
}
// ReadPendingCrossLinks retrieves last pending crosslinks.
func ReadPendingCrossLinks(db DatabaseReader) ([]byte, error) {
return db.Get(pendingCrosslinkKey)
}
// WritePendingCrossLinks stores last pending crosslinks into database.
func WritePendingCrossLinks(db DatabaseWriter, bytes []byte) error {
return db.Put(pendingCrosslinkKey, bytes)
}
// DeletePendingCrossLinks stores last pending crosslinks into database.
func DeletePendingCrossLinks(db DatabaseDeleter) error {
return db.Delete(pendingCrosslinkKey)
}
// ReadPendingSlashingCandidates retrieves last pending slashing candidates
func ReadPendingSlashingCandidates(db DatabaseReader) ([]byte, error) {
return db.Get(pendingSlashingKey)
}
// WritePendingSlashingCandidates stores last pending slashing candidates into database.
func WritePendingSlashingCandidates(db DatabaseWriter, bytes []byte) error {
return db.Put(pendingSlashingKey, bytes)
}
// DeletePendingSlashingCandidates stores last pending slashing candidates into database.
func DeletePendingSlashingCandidates(db DatabaseDeleter) error {
return db.Delete(pendingSlashingKey)
}
// ReadCXReceipts retrieves all the transactions of receipts given destination shardID, number and blockHash
func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) (types.CXReceipts, error) {
data, err := db.Get(cxReceiptKey(shardID, number, hash))
if err != nil || len(data) == 0 {
utils.Logger().Info().Err(err).Uint64("number", number).Int("dataLen", len(data)).Msg("ReadCXReceipts")
return nil, err
}
cxReceipts := types.CXReceipts{}
if err := rlp.DecodeBytes(data, &cxReceipts); err != nil {
return nil, err
}
return cxReceipts, nil
}
// WriteCXReceipts stores all the transaction receipts given destination shardID, blockNumber and blockHash
func WriteCXReceipts(db DatabaseWriter, shardID uint32, number uint64, hash common.Hash, receipts types.CXReceipts) error {
bytes, err := rlp.EncodeToBytes(receipts)
if err != nil {
utils.Logger().Error().Msg("[WriteCXReceipts] Failed to encode cross shard tx receipts")
}
// Store the receipt slice
if err := db.Put(cxReceiptKey(shardID, number, hash), bytes); err != nil {
utils.Logger().Error().Msg("[WriteCXReceipts] Failed to store cxreceipts")
}
return err
}
// ReadCXReceiptsProofSpent check whether a CXReceiptsProof is unspent
func ReadCXReceiptsProofSpent(db DatabaseReader, shardID uint32, number uint64) (byte, error) {
data, err := db.Get(cxReceiptSpentKey(shardID, number))
if err != nil || len(data) == 0 {
return NAByte, ctxerror.New("[ReadCXReceiptsProofSpent] Cannot find the key", "shardID", shardID, "number", number).WithCause(err)
}
return data[0], nil
}
// WriteCXReceiptsProofSpent write CXReceiptsProof as spent into database
func WriteCXReceiptsProofSpent(dbw DatabaseWriter, cxp *types.CXReceiptsProof) error {
shardID := cxp.MerkleProof.ShardID
blockNum := cxp.MerkleProof.BlockNum.Uint64()
return dbw.Put(cxReceiptSpentKey(shardID, blockNum), []byte{SpentByte})
}
// DeleteCXReceiptsProofSpent removes unspent indicator of a given blockHash
func DeleteCXReceiptsProofSpent(db DatabaseDeleter, shardID uint32, number uint64) {
if err := db.Delete(cxReceiptSpentKey(shardID, number)); err != nil {
utils.Logger().Error().Msg("Failed to delete receipts unspent indicator")
}
}
// ReadValidatorSnapshot retrieves validator's snapshot by its address
func ReadValidatorSnapshot(
db DatabaseReader, addr common.Address, epoch *big.Int,
) (*staking.ValidatorWrapper, error) {
data, err := db.Get(validatorSnapshotKey(addr, epoch))
if err != nil || len(data) == 0 {
utils.Logger().Info().Err(err).Msg("ReadValidatorSnapshot")
return nil, err
}
v := staking.ValidatorWrapper{}
if err := rlp.DecodeBytes(data, &v); err != nil {
utils.Logger().Error().Err(err).
Str("address", addr.Hex()).
Msg("Unable to decode validator snapshot from database")
return nil, err
}
return &v, nil
}
// WriteValidatorSnapshot stores validator's snapshot by its address
func WriteValidatorSnapshot(batch DatabaseWriter, v *staking.ValidatorWrapper, epoch *big.Int) error {
bytes, err := rlp.EncodeToBytes(v)
if err != nil {
utils.Logger().Error().Msg("[WriteValidatorSnapshot] Failed to encode")
return err
}
if err := batch.Put(validatorSnapshotKey(v.Address, epoch), bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorSnapshot] Failed to store to database")
return err
}
return err
}
// DeleteValidatorSnapshot removes the validator's snapshot by its address
func DeleteValidatorSnapshot(db DatabaseDeleter, addr common.Address, epoch *big.Int) {
if err := db.Delete(validatorSnapshotKey(addr, epoch)); err != nil {
utils.Logger().Error().Msg("Failed to delete snapshot of a validator")
}
}
// ReadValidatorStats retrieves validator's stats by its address
func ReadValidatorStats(
db DatabaseReader, addr common.Address,
) (*staking.ValidatorStats, error) {
data, err := db.Get(validatorStatsKey(addr))
if err != nil || len(data) == 0 {
utils.Logger().Info().Err(err).Msg("ReadValidatorStats")
return nil, err
}
stats := staking.ValidatorStats{}
if err := rlp.DecodeBytes(data, &stats); err != nil {
utils.Logger().Error().Err(err).
Str("address", addr.Hex()).
Msg("Unable to decode validator stats from database")
return nil, err
}
return &stats, nil
}
// WriteValidatorStats stores validator's stats by its address
func WriteValidatorStats(
batch DatabaseWriter, addr common.Address, stats *staking.ValidatorStats,
) error {
bytes, err := rlp.EncodeToBytes(stats)
if err != nil {
utils.Logger().Error().Msg("[WriteValidatorStats] Failed to encode")
return err
}
if err := batch.Put(validatorStatsKey(addr), bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorStats] Failed to store to database")
return err
}
return err
}
// ReadValidatorList retrieves staking validator by its address
// Return only active validators if activeOnly==true, otherwise, return all validators
func ReadValidatorList(db DatabaseReader, activeOnly bool) ([]common.Address, error) {
key := validatorListKey
if activeOnly {
key = activeValidatorListKey
}
data, err := db.Get(key)
if err != nil || len(data) == 0 {
return []common.Address{}, nil
}
addrs := []common.Address{}
if err := rlp.DecodeBytes(data, &addrs); err != nil {
utils.Logger().Error().Err(err).Msg("Unable to Decode validator List from database")
return nil, err
}
return addrs, nil
}
// WriteValidatorList stores staking validator's information by its address
// Writes only for active validators if activeOnly==true, otherwise, writes for all validators
func WriteValidatorList(db DatabaseWriter, addrs []common.Address, activeOnly bool) error {
key := validatorListKey
if activeOnly {
key = activeValidatorListKey
}
bytes, err := rlp.EncodeToBytes(addrs)
if err != nil {
utils.Logger().Error().Msg("[WriteValidatorList] Failed to encode")
}
if err := db.Put(key, bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorList] Failed to store to database")
}
return err
}
// ReadDelegationsByDelegator retrieves the list of validators delegated by a delegator
func ReadDelegationsByDelegator(db DatabaseReader, delegator common.Address) ([]staking.DelegationIndex, error) {
data, err := db.Get(delegatorValidatorListKey(delegator))
if err != nil || len(data) == 0 {
return []staking.DelegationIndex{}, nil
}
addrs := []staking.DelegationIndex{}
if err := rlp.DecodeBytes(data, &addrs); err != nil {
utils.Logger().Error().Err(err).Msg("Unable to Decode delegations from database")
return nil, err
}
return addrs, nil
}
// WriteDelegationsByDelegator stores the list of validators delegated by a delegator
func WriteDelegationsByDelegator(db DatabaseWriter, delegator common.Address, indices []staking.DelegationIndex) error {
bytes, err := rlp.EncodeToBytes(indices)
if err != nil {
utils.Logger().Error().Msg("[writeDelegationsByDelegator] Failed to encode")
}
if err := db.Put(delegatorValidatorListKey(delegator), bytes); err != nil {
utils.Logger().Error().Msg("[writeDelegationsByDelegator] Failed to store to database")
}
return err
}
// ReadBlockRewardAccumulator ..
func ReadBlockRewardAccumulator(db DatabaseReader, number uint64) (*big.Int, error) {
data, err := db.Get(blockRewardAccumKey(number))
if err != nil {
return nil, err
}
return new(big.Int).SetBytes(data), nil
}
// WriteBlockRewardAccumulator ..
func WriteBlockRewardAccumulator(db DatabaseWriter, newAccum *big.Int, number uint64) error {
return db.Put(blockRewardAccumKey(number), newAccum.Bytes())
}
//// Resharding ////
// ReadEpochBlockNumber retrieves the epoch block number for the given epoch,
// or nil if the given epoch is not found in the database.
func ReadEpochBlockNumber(db DatabaseReader, epoch *big.Int) (*big.Int, error) {
data, err := db.Get(epochBlockNumberKey(epoch))
if err != nil {
return nil, err
}
return new(big.Int).SetBytes(data), nil
}
// WriteEpochBlockNumber stores the given epoch-number-to-epoch-block-number in the database.
func WriteEpochBlockNumber(db DatabaseWriter, epoch, blockNum *big.Int) error {
return db.Put(epochBlockNumberKey(epoch), blockNum.Bytes())
}
// ReadEpochVrfBlockNums retrieves the VRF block numbers for the given epoch
func ReadEpochVrfBlockNums(db DatabaseReader, epoch *big.Int) ([]byte, error) {
return db.Get(epochVrfBlockNumbersKey(epoch))
}
// WriteEpochVrfBlockNums stores the VRF block numbers for the given epoch
func WriteEpochVrfBlockNums(db DatabaseWriter, epoch *big.Int, data []byte) error {
return db.Put(epochVrfBlockNumbersKey(epoch), data)
}
// ReadEpochVdfBlockNum retrieves the VDF block number for the given epoch
func ReadEpochVdfBlockNum(db DatabaseReader, epoch *big.Int) ([]byte, error) {
return db.Get(epochVdfBlockNumberKey(epoch))
}
// WriteEpochVdfBlockNum stores the VDF block number for the given epoch
func WriteEpochVdfBlockNum(db DatabaseWriter, epoch *big.Int, data []byte) error {
return db.Put(epochVdfBlockNumberKey(epoch), data)
}
//// Resharding ////

@ -39,9 +39,6 @@ var (
// headFastBlockKey tracks the latest known incomplete block's hash duirng fast sync. // headFastBlockKey tracks the latest known incomplete block's hash duirng fast sync.
headFastBlockKey = []byte("LastFast") headFastBlockKey = []byte("LastFast")
// fastTrieProgressKey tracks the number of trie entries imported during fast sync.
fastTrieProgressKey = []byte("TrieSync")
// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td
@ -228,22 +225,10 @@ func cxReceiptSpentKey(shardID uint32, number uint64) []byte {
return append(tmp, encodeBlockNumber(number)...) return append(tmp, encodeBlockNumber(number)...)
} }
// cxReceiptUnspentCheckpointKey = cxReceiptsUnspentCheckpointPrefix + shardID func validatorSnapshotKey(addr common.Address, epoch *big.Int) []byte {
func cxReceiptUnspentCheckpointKey(shardID uint32) []byte {
prefix := cxReceiptUnspentCheckpointPrefix
sKey := make([]byte, 4)
binary.BigEndian.PutUint32(sKey, shardID)
return append(prefix, sKey...)
}
func validatorKey(addr common.Address) []byte {
prefix := validatorPrefix
return append(prefix, addr.Bytes()...)
}
func validatorSnapshotKey(addr common.Address) []byte {
prefix := validatorSnapshotPrefix prefix := validatorSnapshotPrefix
return append(prefix, addr.Bytes()...) tmp := append(prefix, addr.Bytes()...)
return append(tmp, epoch.Bytes()...)
} }
func validatorStatsKey(addr common.Address) []byte { func validatorStatsKey(addr common.Address) []byte {

@ -737,7 +737,7 @@ func (s *PublicBlockChainAPI) GetDelegationByDelegatorAndValidator(ctx context.C
func doEstimateGas(ctx context.Context, b Backend, args CallArgs, gasCap *big.Int) (hexutil.Uint64, error) { func doEstimateGas(ctx context.Context, b Backend, args CallArgs, gasCap *big.Int) (hexutil.Uint64, error) {
// Binary search the gas requirement, as it may be higher than the amount used // Binary search the gas requirement, as it may be higher than the amount used
var ( var (
lo uint64 = params.TxGas - 1 lo = params.TxGas - 1
hi uint64 hi uint64
cap uint64 cap uint64
) )

@ -702,7 +702,7 @@ func (s *PublicBlockChainAPI) GetDelegationByDelegatorAndValidator(ctx context.C
func doEstimateGas(ctx context.Context, b Backend, args CallArgs, gasCap *big.Int) (hexutil.Uint64, error) { func doEstimateGas(ctx context.Context, b Backend, args CallArgs, gasCap *big.Int) (hexutil.Uint64, error) {
// Binary search the gas requirement, as it may be higher than the amount used // Binary search the gas requirement, as it may be higher than the amount used
var ( var (
lo uint64 = params.TxGas - 1 lo = params.TxGas - 1
hi uint64 hi uint64
cap uint64 cap uint64
) )

Loading…
Cancel
Save