[core] node stuck fix (#3378)

* [rawdb] add error handling to all rawdb write. Add fdlimit module. Fix the node stuck

* [core] switch back the batch write condition in InsertReceiptChain

* [core] add comments on isUnrecoverableErr
pull/3384/head
Jacky Wang 4 years ago committed by GitHub
parent 2bd4083b61
commit 2430a85162
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      api/service/syncing/syncing.go
  2. 29
      cmd/harmony/main.go
  3. 71
      common/fdlimit/fdlimit_darwin.go
  4. 45
      common/fdlimit/fdlimit_test.go
  5. 65
      common/fdlimit/fdlimit_unix.go
  6. 167
      core/blockchain.go
  7. 20
      core/genesis.go
  8. 42
      core/headerchain.go
  9. 8
      core/offchain.go
  10. 96
      core/rawdb/accessors_chain.go
  11. 4
      core/rawdb/accessors_chain_test.go
  12. 67
      core/rawdb/accessors_indexes.go
  13. 18
      core/rawdb/accessors_indexes_test.go
  14. 17
      core/rawdb/accessors_metadata.go
  15. 18
      core/rawdb/accessors_offchain.go
  16. 5
      hmy/bloombits.go
  17. 2
      internal/utils/singleton.go

@ -703,7 +703,10 @@ func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc *core.BlockChai
if !verifyAllSig {
utils.Logger().Info().Interface("block", bc.CurrentBlock()).Msg("[SYNC] UpdateBlockAndStatus: Rolling back last 99 blocks!")
for i := uint64(0); i < verifyHeaderBatchSize-1; i++ {
bc.Rollback([]common.Hash{bc.CurrentBlock().Hash()})
if rbErr := bc.Rollback([]common.Hash{bc.CurrentBlock().Hash()}); rbErr != nil {
utils.Logger().Err(rbErr).Msg("[SYNC] UpdateBlockAndStatus: failed to rollback")
return err
}
}
}
return err

@ -20,6 +20,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/service/syncing"
"github.com/harmony-one/harmony/common/fdlimit"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
@ -109,10 +110,13 @@ func runHarmonyNode(cmd *cobra.Command, args []string) {
os.Exit(0)
}
prepareRootCmd(cmd)
if err := prepareRootCmd(cmd); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(128)
}
cfg, err := getHarmonyConfig(cmd)
if err != nil {
fmt.Println(err)
fmt.Fprint(os.Stderr, err)
cmd.Help()
os.Exit(128)
}
@ -122,7 +126,7 @@ func runHarmonyNode(cmd *cobra.Command, args []string) {
setupNodeAndRun(cfg)
}
func prepareRootCmd(cmd *cobra.Command) {
func prepareRootCmd(cmd *cobra.Command) error {
// HACK Force usage of go implementation rather than the C based one. Do the right way, see the
// notes one line 66,67 of https://golang.org/src/net/net.go that say can make the decision at
// build time.
@ -131,6 +135,20 @@ func prepareRootCmd(cmd *cobra.Command) {
runtime.GOMAXPROCS(runtime.NumCPU())
// Set up randomization seed.
rand.Seed(int64(time.Now().Nanosecond()))
// Raise fd limits
return raiseFdLimits()
}
func raiseFdLimits() error {
limit, err := fdlimit.Maximum()
if err != nil {
return errors.Wrap(err, "Failed to retrieve file descriptor allowance")
}
_, err = fdlimit.Raise(uint64(limit))
if err != nil {
return errors.Wrap(err, "Failed to raise file descriptor allowance")
}
return nil
}
func getHarmonyConfig(cmd *cobra.Command) (harmonyConfig, error) {
@ -302,7 +320,10 @@ func setupNodeAndRun(hc harmonyConfig) {
for chain.CurrentBlock().NumberU64() >= uint64(hc.Revert.RevertTo) {
curBlock := chain.CurrentBlock()
rollbacks := []ethCommon.Hash{curBlock.Hash()}
chain.Rollback(rollbacks)
if err := chain.Rollback(rollbacks); err != nil {
fmt.Printf("Revert failed: %v\n", err)
os.Exit(1)
}
lastSig := curBlock.Header().LastCommitSignature()
sigAndBitMap := append(lastSig[:], curBlock.Header().LastCommitBitmap()...)
chain.WriteCommitSig(curBlock.NumberU64()-1, sigAndBitMap)

@ -0,0 +1,71 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package fdlimit
import "syscall"
// hardlimit is the number of file descriptors allowed at max by the kernel.
const hardlimit = 10240
// Raise tries to maximize the file descriptor allowance of this process
// to the maximum hard-limit allowed by the OS.
// Returns the size it was set to (may differ from the desired 'max')
func Raise(max uint64) (uint64, error) {
// Get the current limit
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return 0, err
}
// Try to update the limit to the max allowance
limit.Cur = limit.Max
if limit.Cur > max {
limit.Cur = max
}
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return 0, err
}
// MacOS can silently apply further caps, so retrieve the actually set limit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return 0, err
}
return limit.Cur, nil
}
// Current retrieves the number of file descriptors allowed to be opened by this
// process.
func Current() (int, error) {
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return 0, err
}
return int(limit.Cur), nil
}
// Maximum retrieves the maximum number of file descriptors this process is
// allowed to request for itself.
func Maximum() (int, error) {
// Retrieve the maximum allowed by dynamic OS limits
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return 0, err
}
// Cap it to OPEN_MAX (10240) because macos is a special snowflake
if limit.Max > hardlimit {
limit.Max = hardlimit
}
return int(limit.Max), nil
}

@ -0,0 +1,45 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package fdlimit
import (
"fmt"
"testing"
)
// TestFileDescriptorLimits simply tests whether the file descriptor allowance
// per this process can be retrieved.
func TestFileDescriptorLimits(t *testing.T) {
target := 4096
hardlimit, err := Maximum()
if err != nil {
t.Fatal(err)
}
if hardlimit < target {
t.Skip(fmt.Sprintf("system limit is less than desired test target: %d < %d", hardlimit, target))
}
if limit, err := Current(); err != nil || limit <= 0 {
t.Fatalf("failed to retrieve file descriptor limit (%d): %v", limit, err)
}
if _, err := Raise(uint64(target)); err != nil {
t.Fatalf("failed to raise file allowance")
}
if limit, err := Current(); err != nil || limit < target {
t.Fatalf("failed to retrieve raised descriptor limit (have %v, want %v): %v", limit, target, err)
}
}

@ -0,0 +1,65 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// +build linux netbsd openbsd solaris
package fdlimit
import "syscall"
// Raise tries to maximize the file descriptor allowance of this process
// to the maximum hard-limit allowed by the OS.
// Returns the size it was set to (may differ from the desired 'max')
func Raise(max uint64) (uint64, error) {
// Get the current limit
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return 0, err
}
// Try to update the limit to the max allowance
limit.Cur = limit.Max
if limit.Cur > max {
limit.Cur = max
}
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return 0, err
}
// MacOS can silently apply further caps, so retrieve the actually set limit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return 0, err
}
return limit.Cur, nil
}
// Current retrieves the number of file descriptors allowed to be opened by this
// process.
func Current() (int, error) {
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return 0, err
}
return int(limit.Cur), nil
}
// Maximum retrieves the maximum number of file descriptors this process is
// allowed to request for itself.
func Maximum() (int, error) {
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return 0, err
}
return int(limit.Max), nil
}

@ -23,6 +23,7 @@ import (
"fmt"
"io"
"math/big"
"os"
"strings"
"sync"
"sync/atomic"
@ -345,7 +346,9 @@ func (bc *BlockChain) loadLastState() error {
// }
//}
currentHeader := currentBlock.Header()
bc.hc.SetCurrentHeader(currentHeader)
if err := bc.hc.SetCurrentHeader(currentHeader); err != nil {
return errors.Wrap(err, "headerChain SetCurrentHeader")
}
// Restore the last known head fast block
bc.currentFastBlock.Store(currentBlock)
@ -395,10 +398,12 @@ func (bc *BlockChain) SetHead(head uint64) error {
defer bc.mu.Unlock()
// Rewind the header chain, deleting all block bodies until then
delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) {
rawdb.DeleteBody(db, hash, num)
delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) error {
return rawdb.DeleteBody(db, hash, num)
}
if err := bc.hc.SetHead(head, delFn); err != nil {
return errors.Wrap(err, "headerChain SetHeader")
}
bc.hc.SetHead(head, delFn)
currentHeader := bc.hc.CurrentHeader()
// Clear out any stale content from the caches
@ -433,8 +438,12 @@ func (bc *BlockChain) SetHead(head uint64) error {
currentBlock := bc.CurrentBlock()
currentFastBlock := bc.CurrentFastBlock()
rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
rawdb.WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash())
if err := rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()); err != nil {
return err
}
if err := rawdb.WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()); err != nil {
return err
}
return bc.loadLastState()
}
@ -516,13 +525,19 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
defer bc.mu.Unlock()
// Prepare the genesis block and reinitialise the chain
rawdb.WriteBlock(bc.db, genesis)
if err := rawdb.WriteBlock(bc.db, genesis); err != nil {
return err
}
bc.genesisBlock = genesis
bc.insert(bc.genesisBlock)
bc.currentBlock.Store(bc.genesisBlock)
if err := bc.insert(bc.genesisBlock); err != nil {
return err
}
bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
if err := bc.hc.SetCurrentHeader(bc.genesisBlock.Header()); err != nil {
return err
}
bc.currentBlock.Store(bc.genesisBlock)
bc.currentFastBlock.Store(bc.genesisBlock)
return nil
@ -587,8 +602,7 @@ func (bc *BlockChain) removeInValidatorList(toRemove map[common.Address]struct{}
newVals = append(newVals, addr)
}
}
bc.WriteValidatorList(bc.db, newVals)
return nil
return bc.WriteValidatorList(bc.db, newVals)
}
// Export writes the active chain to the given writer.
@ -627,24 +641,37 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
return nil
}
// similar to insert, but add to the db writer.
func (bc *BlockChain) insertWithWriter(batch rawdb.DatabaseWriter, block *types.Block) {
// writeHeadBlock writes a new head block
func (bc *BlockChain) writeHeadBlock(block *types.Block) error {
// If the block is on a side chain or an unknown one, force other heads onto it too
updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash()
// Add the block to the canonical chain number scheme and mark as the head
rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
rawdb.WriteHeadBlockHash(batch, block.Hash())
batch := bc.ChainDb().NewBatch()
if err := rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64()); err != nil {
return err
}
if err := rawdb.WriteHeadBlockHash(batch, block.Hash()); err != nil {
return err
}
if err := batch.Write(); err != nil {
return err
}
bc.currentBlock.Store(block)
// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
bc.hc.SetCurrentHeader(block.Header())
rawdb.WriteHeadFastBlockHash(batch, block.Hash())
if err := bc.hc.SetCurrentHeader(block.Header()); err != nil {
return errors.Wrap(err, "HeaderChain SetCurrentHeader")
}
if err := rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()); err != nil {
return err
}
bc.currentFastBlock.Store(block)
}
return nil
}
// insert injects a new head block into the current block chain. This method
@ -653,8 +680,8 @@ func (bc *BlockChain) insertWithWriter(batch rawdb.DatabaseWriter, block *types.
// or if they are on a different side chain.
//
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) insert(block *types.Block) {
bc.insertWithWriter(bc.db, block)
func (bc *BlockChain) insert(block *types.Block) error {
return bc.writeHeadBlock(block)
}
// Genesis retrieves the chain's genesis block.
@ -890,7 +917,7 @@ const (
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (bc *BlockChain) Rollback(chain []common.Hash) {
func (bc *BlockChain) Rollback(chain []common.Hash) error {
bc.mu.Lock()
defer bc.mu.Unlock()
@ -902,7 +929,9 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
if currentHeader != nil && currentHeader.Hash() == hash {
parentHeader := bc.GetHeader(currentHeader.ParentHash(), currentHeader.Number().Uint64()-1)
if parentHeader != nil {
bc.hc.SetCurrentHeader(parentHeader)
if err := bc.hc.SetCurrentHeader(parentHeader); err != nil {
return errors.Wrap(err, "HeaderChain SetCurrentHeader")
}
}
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentFastBlock.Hash() == hash {
@ -916,7 +945,9 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
if newBlock != nil {
bc.currentBlock.Store(newBlock)
rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
if err := rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash()); err != nil {
return err
}
for _, stkTxn := range currentBlock.StakingTransactions() {
if stkTxn.StakingType() == staking.DirectiveCreateValidator {
@ -928,7 +959,7 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
}
}
}
bc.removeInValidatorList(valsToRemove)
return bc.removeInValidatorList(valsToRemove)
}
// SetReceiptsData computes all the non-consensus fields of the receipts
@ -1022,7 +1053,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// Short circuit if the owner header is unknown
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
return 0, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
}
// Skip if the entire data is already known
if bc.HasBlock(block.Hash(), block.NumberU64()) {
@ -1031,12 +1062,21 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// Compute all the non-consensus fields of the receipts
if err := SetReceiptsData(bc.chainConfig, block, receipts); err != nil {
return i, fmt.Errorf("failed to set receipts data: %v", err)
return 0, fmt.Errorf("failed to set receipts data: %v", err)
}
// Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
rawdb.WriteTxLookupEntries(batch, block)
if err := rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()); err != nil {
return 0, err
}
if err := rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
return 0, err
}
if err := rawdb.WriteBlockTxLookUpEntries(batch, block); err != nil {
return 0, err
}
if err := rawdb.WriteBlockStxLookUpEntries(batch, block); err != nil {
return 0, err
}
stats.processed++
@ -1092,7 +1132,9 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil {
return err
}
rawdb.WriteBlock(bc.db, block)
if err := rawdb.WriteBlock(bc.db, block); err != nil {
return err
}
return nil
}
@ -1126,6 +1168,10 @@ func (bc *BlockChain) WriteBlockWithState(
triedb := bc.stateCache.TrieDB()
if bc.cacheConfig.Disabled || len(block.Header().ShardState()) > 0 {
if err := triedb.Commit(root, false); err != nil {
if isUnrecoverableErr(err) {
fmt.Printf("Unrecoverable error when committing triedb: %v\nExitting\n", err)
os.Exit(1)
}
return NonStatTy, err
}
} else {
@ -1181,7 +1227,9 @@ func (bc *BlockChain) WriteBlockWithState(
batch := bc.db.NewBatch()
// Write the raw block
rawdb.WriteBlock(batch, block)
if err := rawdb.WriteBlock(batch, block); err != nil {
return NonStatTy, err
}
// Write offchain data
if status, err := bc.CommitOffChainData(
@ -1192,17 +1240,32 @@ func (bc *BlockChain) WriteBlockWithState(
}
// Write the positional metadata for transaction/receipt lookups and preimages
rawdb.WriteTxLookupEntries(batch, block)
rawdb.WriteCxLookupEntries(batch, block)
rawdb.WritePreimages(batch, block.NumberU64(), state.Preimages())
// Update current block
bc.insertWithWriter(batch, block)
if err := rawdb.WriteBlockTxLookUpEntries(batch, block); err != nil {
return NonStatTy, err
}
if err := rawdb.WriteBlockStxLookUpEntries(batch, block); err != nil {
return NonStatTy, err
}
if err := rawdb.WriteCxLookupEntries(batch, block); err != nil {
return NonStatTy, err
}
if err := rawdb.WritePreimages(batch, block.NumberU64(), state.Preimages()); err != nil {
return NonStatTy, err
}
if err := batch.Write(); err != nil {
if isUnrecoverableErr(err) {
fmt.Printf("Unrecoverable error when writing leveldb: %v\nExitting\n", err)
os.Exit(1)
}
return NonStatTy, err
}
// Update current block
if err := bc.writeHeadBlock(block); err != nil {
return NonStatTy, errors.Wrap(err, "writeHeadBlock")
}
bc.futureBlocks.Remove(block.Hash())
return CanonStatTy, nil
}
@ -2203,10 +2266,13 @@ func (bc *BlockChain) CXMerkleProof(toShardID uint32, block *types.Block) (*type
// WriteCXReceiptsProofSpent mark the CXReceiptsProof list with given unspent status
// true: unspent, false: spent
func (bc *BlockChain) WriteCXReceiptsProofSpent(db rawdb.DatabaseWriter, cxps []*types.CXReceiptsProof) {
func (bc *BlockChain) WriteCXReceiptsProofSpent(db rawdb.DatabaseWriter, cxps []*types.CXReceiptsProof) error {
for _, cxp := range cxps {
rawdb.WriteCXReceiptsProofSpent(db, cxp)
if err := rawdb.WriteCXReceiptsProofSpent(db, cxp); err != nil {
return err
}
}
return nil
}
// IsSpent checks whether a CXReceiptsProof is unspent
@ -2944,3 +3010,26 @@ func (bc *BlockChain) SuperCommitteeForNextEpoch(
}
return nextCommittee, err
}
var (
leveldbErrSpec = "leveldb"
tooManyOpenFilesErrStr = "Too many open files"
)
// isUnrecoverableErr check whether the input error is not recoverable.
// When writing db, there could be some possible errors from storage level (leveldb).
// Known possible leveldb errors are:
// 1. Leveldb is already closed. (leveldb.ErrClosed)
// 2. ldb file missing from disk. (leveldb.ErrNotFound)
// 3. Corrupted db data. (leveldb.errors.ErrCorrupted)
// 4. OS error when open file (too many open files, ...)
// 5. OS error when write file (read-only, not enough disk space, ...)
// Among all the above leveldb errors, only `too many open files` error is known to be recoverable,
// thus the unrecoverable errors refers to error that is
// 1. The error is from the lower storage level (from module leveldb)
// 2. The error is not too many files error.
func isUnrecoverableErr(err error) bool {
isLeveldbErr := strings.Contains(err.Error(), leveldbErrSpec)
isTooManyOpenFiles := strings.Contains(err.Error(), tooManyOpenFilesErrStr)
return isLeveldbErr && !isTooManyOpenFiles
}

@ -282,11 +282,21 @@ func (g *Genesis) Commit(db ethdb.Database) (*types.Block, error) {
return nil, fmt.Errorf("can't commit genesis block with number > 0")
}
rawdb.WriteBlock(db, block)
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), nil)
rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64())
rawdb.WriteHeadBlockHash(db, block.Hash())
rawdb.WriteHeadHeaderHash(db, block.Hash())
if err := rawdb.WriteBlock(db, block); err != nil {
return nil, err
}
if err := rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), nil); err != nil {
return nil, err
}
if err := rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
return nil, err
}
if err := rawdb.WriteHeadBlockHash(db, block.Hash()); err != nil {
return nil, err
}
if err := rawdb.WriteHeadHeaderHash(db, block.Hash()); err != nil {
return nil, err
}
err := rawdb.WriteShardStateBytes(db, block.Header().Epoch(), block.Header().ShardState())

@ -390,7 +390,9 @@ func (hc *HeaderChain) GetTdByHash(hash common.Hash) *big.Int {
// WriteTd stores a block's total difficulty into the database, also caching it
// along the way.
func (hc *HeaderChain) WriteTd(hash common.Hash, number uint64, td *big.Int) error {
rawdb.WriteTd(hc.chainDb, hash, number, td)
if err := rawdb.WriteTd(hc.chainDb, hash, number, td); err != nil {
return err
}
hc.tdCache.Add(hash, new(big.Int).Set(td))
return nil
}
@ -446,20 +448,23 @@ func (hc *HeaderChain) CurrentHeader() *block.Header {
}
// SetCurrentHeader sets the current head header of the canonical chain.
func (hc *HeaderChain) SetCurrentHeader(head *block.Header) {
rawdb.WriteHeadHeaderHash(hc.chainDb, head.Hash())
func (hc *HeaderChain) SetCurrentHeader(head *block.Header) error {
if err := rawdb.WriteHeadHeaderHash(hc.chainDb, head.Hash()); err != nil {
return err
}
hc.currentHeader.Store(head)
hc.currentHeaderHash = head.Hash()
return nil
}
// DeleteCallback is a callback function that is called by SetHead before
// each header is deleted.
type DeleteCallback func(rawdb.DatabaseDeleter, common.Hash, uint64)
type DeleteCallback func(rawdb.DatabaseDeleter, common.Hash, uint64) error
// SetHead rewinds the local chain to a new head. Everything above the new head
// will be deleted and the new one set.
func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) error {
height := uint64(0)
if hdr := hc.CurrentHeader(); hdr != nil {
@ -470,18 +475,31 @@ func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
hash := hdr.Hash()
num := hdr.Number().Uint64()
if delFn != nil {
delFn(batch, hash, num)
if err := delFn(batch, hash, num); err != nil {
return err
}
}
if err := rawdb.DeleteHeader(batch, hash, num); err != nil {
return err
}
if err := rawdb.DeleteTd(batch, hash, num); err != nil {
return err
}
rawdb.DeleteHeader(batch, hash, num)
rawdb.DeleteTd(batch, hash, num)
hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash(), hdr.Number().Uint64()-1))
}
// Roll back the canonical chain numbering
for i := height; i > head; i-- {
rawdb.DeleteCanonicalHash(batch, i)
if err := rawdb.DeleteCanonicalHash(batch, i); err != nil {
return err
}
}
if err := rawdb.WriteHeadHeaderHash(batch, hc.currentHeaderHash); err != nil {
return err
}
if err := batch.Write(); err != nil {
return err
}
batch.Write()
// Clear out any stale content from the caches
hc.headerCache.Purge()
@ -493,7 +511,7 @@ func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
}
hc.currentHeaderHash = hc.CurrentHeader().Hash()
rawdb.WriteHeadHeaderHash(hc.chainDb, hc.currentHeaderHash)
return nil
}
// SetGenesis sets a new genesis block header for the chain

@ -32,7 +32,9 @@ func (bc *BlockChain) CommitOffChainData(
state *state.DB,
) (status WriteStatus, err error) {
// Write receipts of the block
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
if err := rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
return NonStatTy, err
}
isBeaconChain := bc.CurrentHeader().ShardID() == shard.BeaconChainShardID
isStaking := bc.chainConfig.IsStaking(block.Epoch())
isPreStaking := bc.chainConfig.IsPreStaking(block.Epoch())
@ -60,7 +62,9 @@ func (bc *BlockChain) CommitOffChainData(
}
}
// Mark incomingReceipts in the block as spent
bc.WriteCXReceiptsProofSpent(batch, block.IncomingReceipts())
if err := bc.WriteCXReceiptsProofSpent(batch, block.IncomingReceipts()); err != nil {
return NonStatTy, err
}
}
// VRF + VDF

@ -48,17 +48,21 @@ func ReadCanonicalHash(db DatabaseReader, number uint64) common.Hash {
}
// WriteCanonicalHash stores the hash assigned to a canonical block number.
func WriteCanonicalHash(db DatabaseWriter, hash common.Hash, number uint64) {
func WriteCanonicalHash(db DatabaseWriter, hash common.Hash, number uint64) error {
if err := db.Put(headerHashKey(number), hash.Bytes()); err != nil {
utils.Logger().Error().Msg("Failed to store number to hash mapping")
return err
}
return nil
}
// DeleteCanonicalHash removes the number to hash canonical mapping.
func DeleteCanonicalHash(db DatabaseDeleter, number uint64) {
func DeleteCanonicalHash(db DatabaseDeleter, number uint64) error {
if err := db.Delete(headerHashKey(number)); err != nil {
utils.Logger().Error().Msg("Failed to delete number to hash mapping")
return err
}
return nil
}
// ReadHeaderNumber returns the header number assigned to a hash.
@ -81,10 +85,12 @@ func ReadHeadHeaderHash(db DatabaseReader) common.Hash {
}
// WriteHeadHeaderHash stores the hash of the current canonical head header.
func WriteHeadHeaderHash(db DatabaseWriter, hash common.Hash) {
func WriteHeadHeaderHash(db DatabaseWriter, hash common.Hash) error {
if err := db.Put(headHeaderKey, hash.Bytes()); err != nil {
utils.Logger().Error().Msg("Failed to store last header's hash")
return err
}
return nil
}
// ReadHeadBlockHash retrieves the hash of the current canonical head block.
@ -97,10 +103,12 @@ func ReadHeadBlockHash(db DatabaseReader) common.Hash {
}
// WriteHeadBlockHash stores the head block's hash.
func WriteHeadBlockHash(db DatabaseWriter, hash common.Hash) {
func WriteHeadBlockHash(db DatabaseWriter, hash common.Hash) error {
if err := db.Put(headBlockKey, hash.Bytes()); err != nil {
utils.Logger().Error().Msg("Failed to store last block's hash")
return err
}
return nil
}
// ReadHeadFastBlockHash retrieves the hash of the current fast-sync head block.
@ -113,10 +121,12 @@ func ReadHeadFastBlockHash(db DatabaseReader) common.Hash {
}
// WriteHeadFastBlockHash stores the hash of the current fast-sync head block.
func WriteHeadFastBlockHash(db DatabaseWriter, hash common.Hash) {
func WriteHeadFastBlockHash(db DatabaseWriter, hash common.Hash) error {
if err := db.Put(headFastBlockKey, hash.Bytes()); err != nil {
utils.Logger().Error().Msg("Failed to store last fast block's hash")
return err
}
return nil
}
// ReadHeaderRLP retrieves a block header in its raw RLP database encoding.
@ -149,7 +159,7 @@ func ReadHeader(db DatabaseReader, hash common.Hash, number uint64) *block.Heade
// WriteHeader stores a block header into the database and also stores the hash-
// to-number mapping.
func WriteHeader(db DatabaseWriter, header *block.Header) {
func WriteHeader(db DatabaseWriter, header *block.Header) error {
// Write the hash -> number mapping
var (
hash = header.Hash()
@ -159,26 +169,33 @@ func WriteHeader(db DatabaseWriter, header *block.Header) {
key := headerNumberKey(hash)
if err := db.Put(key, encoded); err != nil {
utils.Logger().Error().Msg("Failed to store hash to number mapping")
return err
}
// Write the encoded header
data, err := rlp.EncodeToBytes(header)
if err != nil {
utils.Logger().Error().Msg("Failed to RLP encode header")
return err
}
key = headerKey(number, hash)
if err := db.Put(key, data); err != nil {
utils.Logger().Error().Msg("Failed to store header")
return err
}
return nil
}
// DeleteHeader removes all block header data associated with a hash.
func DeleteHeader(db DatabaseDeleter, hash common.Hash, number uint64) {
func DeleteHeader(db DatabaseDeleter, hash common.Hash, number uint64) error {
if err := db.Delete(headerKey(number, hash)); err != nil {
utils.Logger().Error().Msg("Failed to delete header")
return err
}
if err := db.Delete(headerNumberKey(hash)); err != nil {
utils.Logger().Error().Msg("Failed to delete hash to number mapping")
return err
}
return nil
}
// ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding.
@ -188,10 +205,12 @@ func ReadBodyRLP(db DatabaseReader, hash common.Hash, number uint64) rlp.RawValu
}
// WriteBodyRLP stores an RLP encoded block body into the database.
func WriteBodyRLP(db DatabaseWriter, hash common.Hash, number uint64, rlp rlp.RawValue) {
func WriteBodyRLP(db DatabaseWriter, hash common.Hash, number uint64, rlp rlp.RawValue) error {
if err := db.Put(blockBodyKey(number, hash), rlp); err != nil {
utils.Logger().Error().Msg("Failed to store block body")
return err
}
return nil
}
// HasBody verifies the existence of a block body corresponding to the hash.
@ -217,19 +236,22 @@ func ReadBody(db DatabaseReader, hash common.Hash, number uint64) *types.Body {
}
// WriteBody storea a block body into the database.
func WriteBody(db DatabaseWriter, hash common.Hash, number uint64, body *types.Body) {
func WriteBody(db DatabaseWriter, hash common.Hash, number uint64, body *types.Body) error {
data, err := rlp.EncodeToBytes(body)
if err != nil {
utils.Logger().Error().Msg("Failed to RLP encode body")
return err
}
WriteBodyRLP(db, hash, number, data)
return WriteBodyRLP(db, hash, number, data)
}
// DeleteBody removes all block body data associated with a hash.
func DeleteBody(db DatabaseDeleter, hash common.Hash, number uint64) {
func DeleteBody(db DatabaseDeleter, hash common.Hash, number uint64) error {
if err := db.Delete(blockBodyKey(number, hash)); err != nil {
utils.Logger().Error().Msg("Failed to delete block body")
return err
}
return nil
}
// ReadTd retrieves a block's total difficulty corresponding to the hash.
@ -247,21 +269,26 @@ func ReadTd(db DatabaseReader, hash common.Hash, number uint64) *big.Int {
}
// WriteTd stores the total difficulty of a block into the database.
func WriteTd(db DatabaseWriter, hash common.Hash, number uint64, td *big.Int) {
func WriteTd(db DatabaseWriter, hash common.Hash, number uint64, td *big.Int) error {
data, err := rlp.EncodeToBytes(td)
if err != nil {
utils.Logger().Error().Msg("Failed to RLP encode block total difficulty")
return err
}
if err := db.Put(headerTDKey(number, hash), data); err != nil {
utils.Logger().Error().Msg("Failed to store block total difficulty")
return err
}
return nil
}
// DeleteTd removes all block total difficulty data associated with a hash.
func DeleteTd(db DatabaseDeleter, hash common.Hash, number uint64) {
func DeleteTd(db DatabaseDeleter, hash common.Hash, number uint64) error {
if err := db.Delete(headerTDKey(number, hash)); err != nil {
utils.Logger().Error().Msg("Failed to delete block total difficulty")
return err
}
return nil
}
// ReadReceipts retrieves all the transaction receipts belonging to a block.
@ -285,7 +312,7 @@ func ReadReceipts(db DatabaseReader, hash common.Hash, number uint64) types.Rece
}
// WriteReceipts stores all the transaction receipts belonging to a block.
func WriteReceipts(db DatabaseWriter, hash common.Hash, number uint64, receipts types.Receipts) {
func WriteReceipts(db DatabaseWriter, hash common.Hash, number uint64, receipts types.Receipts) error {
// Convert the receipts into their storage form and serialize them
storageReceipts := make([]*types.ReceiptForStorage, len(receipts))
for i, receipt := range receipts {
@ -294,18 +321,23 @@ func WriteReceipts(db DatabaseWriter, hash common.Hash, number uint64, receipts
bytes, err := rlp.EncodeToBytes(storageReceipts)
if err != nil {
utils.Logger().Error().Msg("Failed to encode block receipts")
return err
}
// Store the flattened receipt slice
if err := db.Put(blockReceiptsKey(number, hash), bytes); err != nil {
utils.Logger().Error().Msg("Failed to store block receipts")
return err
}
return nil
}
// DeleteReceipts removes all receipt data associated with a block hash.
func DeleteReceipts(db DatabaseDeleter, hash common.Hash, number uint64) {
func DeleteReceipts(db DatabaseDeleter, hash common.Hash, number uint64) error {
if err := db.Delete(blockReceiptsKey(number, hash)); err != nil {
utils.Logger().Error().Msg("Failed to delete block receipts")
return err
}
return nil
}
// ReadBlock retrieves an entire block corresponding to the hash, assembling it
@ -327,22 +359,38 @@ func ReadBlock(db DatabaseReader, hash common.Hash, number uint64) *types.Block
}
// WriteBlock serializes a block into the database, header and body separately.
func WriteBlock(db DatabaseWriter, block *types.Block) {
WriteBody(db, block.Hash(), block.NumberU64(), block.Body())
WriteHeader(db, block.Header())
func WriteBlock(db DatabaseWriter, block *types.Block) error {
if err := WriteBody(db, block.Hash(), block.NumberU64(), block.Body()); err != nil {
return err
}
if err := WriteHeader(db, block.Header()); err != nil {
return err
}
curSig := block.GetCurrentCommitSig()
if len(curSig) > 96 {
WriteBlockCommitSig(db, block.NumberU64(), curSig)
if err := WriteBlockCommitSig(db, block.NumberU64(), curSig); err != nil {
return err
}
}
return nil
}
// DeleteBlock removes all block data associated with a hash.
func DeleteBlock(db DatabaseDeleter, hash common.Hash, number uint64) {
DeleteReceipts(db, hash, number)
DeleteHeader(db, hash, number)
DeleteBody(db, hash, number)
DeleteTd(db, hash, number)
func DeleteBlock(db DatabaseDeleter, hash common.Hash, number uint64) error {
if err := DeleteReceipts(db, hash, number); err != nil {
return err
}
if err := DeleteHeader(db, hash, number); err != nil {
return err
}
if err := DeleteBody(db, hash, number); err != nil {
return err
}
if err := DeleteTd(db, hash, number); err != nil {
return err
}
return nil
}
// FindCommonAncestor returns the last common ancestor of two block headers

@ -313,7 +313,9 @@ func TestBlockReceiptStorage(t *testing.T) {
t.Fatalf("non existent receipts returned: %v", rs)
}
// Insert the receipt slice into the database and check presence
WriteReceipts(db, hash, 0, receipts)
if err := WriteReceipts(db, hash, 0, receipts); err != nil {
t.Fatalf("write receipts")
}
if rs := ReadReceipts(db, hash, 0); len(rs) == 0 {
t.Fatalf("no receipts returned")
} else {

@ -42,43 +42,49 @@ func ReadTxLookupEntry(db DatabaseReader, hash common.Hash) (common.Hash, uint64
return entry.BlockHash, entry.BlockIndex, entry.Index
}
// WriteTxLookupEntries stores a positional metadata for every transaction from
// a block, enabling hash based transaction and receipt lookups.
func WriteTxLookupEntries(db DatabaseWriter, block *types.Block) {
// TODO: remove this hack with Tx and StakingTx structure unitification later
f := func(i int, tx *types.Transaction, stx *staking.StakingTransaction) {
isStaking := (stx != nil && tx == nil)
// WriteBlockTxLookUpEntries writes all look up entries of block's transactions
func WriteBlockTxLookUpEntries(db DatabaseWriter, block *types.Block) error {
for i, tx := range block.Transactions() {
entry := TxLookupEntry{
BlockHash: block.Hash(),
BlockIndex: block.NumberU64(),
Index: uint64(i),
}
data, err := rlp.EncodeToBytes(entry)
val, err := rlp.EncodeToBytes(entry)
if err != nil {
utils.Logger().Error().Err(err).Bool("isStaking", isStaking).Msg("Failed to encode transaction lookup entry")
return err
}
key := txLookupKey(tx.Hash())
if err := db.Put(key, val); err != nil {
return err
}
}
return nil
}
var putErr error
if isStaking {
putErr = db.Put(txLookupKey(stx.Hash()), data)
} else {
putErr = db.Put(txLookupKey(tx.Hash()), data)
// WriteBlockStxLookUpEntries writes all look up entries of block's staking transactions
func WriteBlockStxLookUpEntries(db DatabaseWriter, block *types.Block) error {
for i, stx := range block.StakingTransactions() {
entry := TxLookupEntry{
BlockHash: block.Hash(),
BlockIndex: block.NumberU64(),
Index: uint64(i),
}
if putErr != nil {
utils.Logger().Error().Err(err).Bool("isStaking", isStaking).Msg("Failed to store transaction lookup entry")
val, err := rlp.EncodeToBytes(entry)
if err != nil {
return err
}
key := txLookupKey(stx.Hash())
if err := db.Put(key, val); err != nil {
return err
}
}
for i, tx := range block.Transactions() {
f(i, tx, nil)
}
for i, tx := range block.StakingTransactions() {
f(i, nil, tx)
}
return nil
}
// DeleteTxLookupEntry removes all transaction data associated with a hash.
func DeleteTxLookupEntry(db DatabaseDeleter, hash common.Hash) {
db.Delete(txLookupKey(hash))
func DeleteTxLookupEntry(db DatabaseDeleter, hash common.Hash) error {
return db.Delete(txLookupKey(hash))
}
// ReadTransaction retrieves a specific transaction from the database, along with
@ -111,8 +117,6 @@ func ReadTransaction(db DatabaseReader, hash common.Hash) (*types.Transaction, c
// ReadStakingTransaction retrieves a specific staking transaction from the database, along with
// its added positional metadata.
// TODO remove this duplicate function that is inevitable at the moment until the optimization on staking txn with
// unification of txn vs staking txn data structure.
func ReadStakingTransaction(db DatabaseReader, hash common.Hash) (*staking.StakingTransaction, common.Hash, uint64, uint64) {
blockHash, blockNumber, txIndex := ReadTxLookupEntry(db, hash)
if blockHash == (common.Hash{}) {
@ -167,10 +171,12 @@ func ReadBloomBits(db DatabaseReader, bit uint, section uint64, head common.Hash
// WriteBloomBits stores the compressed bloom bits vector belonging to the given
// section and bit index.
func WriteBloomBits(db DatabaseWriter, bit uint, section uint64, head common.Hash, bits []byte) {
func WriteBloomBits(db DatabaseWriter, bit uint, section uint64, head common.Hash, bits []byte) error {
if err := db.Put(bloomBitsKey(bit, section, head), bits); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to store bloom bits")
return err
}
return nil
}
// ReadCxLookupEntry retrieves the positional metadata associated with a transaction hash
@ -192,7 +198,7 @@ func ReadCxLookupEntry(db DatabaseReader, hash common.Hash) (common.Hash, uint64
// WriteCxLookupEntries stores a positional metadata for every transaction from
// a block, enabling hash based transaction and receipt lookups.
func WriteCxLookupEntries(db DatabaseWriter, block *types.Block) {
func WriteCxLookupEntries(db DatabaseWriter, block *types.Block) error {
previousSum := 0
for _, cxp := range block.IncomingReceipts() {
for j, cx := range cxp.Receipts {
@ -204,18 +210,21 @@ func WriteCxLookupEntries(db DatabaseWriter, block *types.Block) {
data, err := rlp.EncodeToBytes(entry)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to encode transaction lookup entry")
return err
}
if err := db.Put(cxLookupKey(cx.TxHash), data); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to store transaction lookup entry")
return err
}
}
previousSum += len(cxp.Receipts)
}
return nil
}
// DeleteCxLookupEntry removes all transaction data associated with a hash.
func DeleteCxLookupEntry(db DatabaseDeleter, hash common.Hash) {
db.Delete(cxLookupKey(hash))
func DeleteCxLookupEntry(db DatabaseDeleter, hash common.Hash) error {
return db.Delete(cxLookupKey(hash))
}
// ReadCXReceipt retrieves a specific transaction from the database, along with

@ -74,7 +74,12 @@ func TestLookupStorage(t *testing.T) {
}
// Insert all the transactions into the database, and verify contents
WriteBlock(db, block)
WriteTxLookupEntries(db, block)
if err := WriteBlockTxLookUpEntries(db, block); err != nil {
t.Fatalf("WriteBlockTxLookUpEntries: %v", err)
}
if err := WriteBlockStxLookUpEntries(db, block); err != nil {
t.Fatalf("WriteBlockStxLookUpEntries: %v", err)
}
for i, tx := range txs {
if txn, hash, number, index := ReadTransaction(db, tx.Hash()); txn == nil {
@ -126,8 +131,15 @@ func TestMixedLookupStorage(t *testing.T) {
header := blockfactory.NewTestHeader().With().Number(big.NewInt(314)).Header()
block := types.NewBlock(header, txs, types.Receipts{&types.Receipt{}, &types.Receipt{}}, nil, nil, stxs)
WriteBlock(db, block)
WriteTxLookupEntries(db, block)
if err := WriteBlock(db, block); err != nil {
t.Fatalf("WriteBlock: %v", err)
}
if err := WriteBlockTxLookUpEntries(db, block); err != nil {
t.Fatalf("WriteBlockStxLookUpEntries: %v", err)
}
if err := WriteBlockStxLookUpEntries(db, block); err != nil {
t.Fatalf("WriteBlockStxLookUpEntries: %v", err)
}
if recTx, _, _, _ := ReadStakingTransaction(db, tx.Hash()); recTx != nil {
t.Fatal("got staking transactions with plain tx hash")

@ -18,11 +18,11 @@ package rawdb
import (
"encoding/json"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/utils"
)
@ -37,11 +37,13 @@ func ReadDatabaseVersion(db DatabaseReader) int {
}
// WriteDatabaseVersion stores the version number of the database
func WriteDatabaseVersion(db DatabaseWriter, version int) {
func WriteDatabaseVersion(db DatabaseWriter, version int) error {
enc, _ := rlp.EncodeToBytes(version)
if err := db.Put(databaseVerisionKey, enc); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to store the database version")
return err
}
return nil
}
// ReadChainConfig retrieves the consensus settings based on the given genesis hash.
@ -59,17 +61,20 @@ func ReadChainConfig(db DatabaseReader, hash common.Hash) *params.ChainConfig {
}
// WriteChainConfig writes the chain config settings to the database.
func WriteChainConfig(db DatabaseWriter, hash common.Hash, cfg *params.ChainConfig) {
func WriteChainConfig(db DatabaseWriter, hash common.Hash, cfg *params.ChainConfig) error {
if cfg == nil {
return
return errors.New("nil config")
}
data, err := json.Marshal(cfg)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to JSON encode chain config")
return err
}
if err := db.Put(configKey(hash), data); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to store chain config")
return err
}
return nil
}
// ReadPreimage retrieves a single preimage of the provided hash.
@ -80,12 +85,14 @@ func ReadPreimage(db DatabaseReader, hash common.Hash) []byte {
// WritePreimages writes the provided set of preimages to the database. `number` is the
// current block number, and is used for debug messages only.
func WritePreimages(db DatabaseWriter, number uint64, preimages map[common.Hash][]byte) {
func WritePreimages(db DatabaseWriter, number uint64, preimages map[common.Hash][]byte) error {
for hash, preimage := range preimages {
if err := db.Put(preimageKey(hash), preimage); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to store trie preimage")
return err
}
}
preimageCounter.Inc(int64(len(preimages)))
preimageHitCounter.Inc(int64(len(preimages)))
return nil
}

@ -124,14 +124,20 @@ func ReadCXReceiptsProofSpent(db DatabaseReader, shardID uint32, number uint64)
func WriteCXReceiptsProofSpent(dbw DatabaseWriter, cxp *types.CXReceiptsProof) error {
shardID := cxp.MerkleProof.ShardID
blockNum := cxp.MerkleProof.BlockNum.Uint64()
return dbw.Put(cxReceiptSpentKey(shardID, blockNum), []byte{SpentByte})
if err := dbw.Put(cxReceiptSpentKey(shardID, blockNum), []byte{SpentByte}); err != nil {
utils.Logger().Error().Msg("Failed to write CX receipt proof")
return err
}
return nil
}
// DeleteCXReceiptsProofSpent removes unspent indicator of a given blockHash
func DeleteCXReceiptsProofSpent(db DatabaseDeleter, shardID uint32, number uint64) {
func DeleteCXReceiptsProofSpent(db DatabaseDeleter, shardID uint32, number uint64) error {
if err := db.Delete(cxReceiptSpentKey(shardID, number)); err != nil {
utils.Logger().Error().Msg("Failed to delete receipts unspent indicator")
return err
}
return nil
}
// ReadValidatorSnapshot retrieves validator's snapshot by its address
@ -169,17 +175,21 @@ func WriteValidatorSnapshot(batch DatabaseWriter, v *staking.ValidatorWrapper, e
}
// DeleteValidatorSnapshot removes the validator's snapshot by its address
func DeleteValidatorSnapshot(db DatabaseDeleter, addr common.Address, epoch *big.Int) {
func DeleteValidatorSnapshot(db DatabaseDeleter, addr common.Address, epoch *big.Int) error {
if err := db.Delete(validatorSnapshotKey(addr, epoch)); err != nil {
utils.Logger().Error().Msg("Failed to delete snapshot of a validator")
return err
}
return nil
}
// DeleteValidatorStats ..
func DeleteValidatorStats(db DatabaseDeleter, addr common.Address) {
func DeleteValidatorStats(db DatabaseDeleter, addr common.Address) error {
if err := db.Delete(validatorStatsKey(addr)); err != nil {
utils.Logger().Error().Msg("Failed to delete stats of a validator")
return err
}
return nil
}
// ReadValidatorStats retrieves validator's stats by its address,

@ -134,7 +134,10 @@ func (b *BloomIndexer) Commit() error {
if err != nil {
return err
}
rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits))
err = rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits))
if err != nil {
return err
}
}
return batch.Write()
}

@ -40,7 +40,7 @@ var (
func SetLogContext(_port, _ip string) {
port = _port
ip = _ip
setZeroLogContext(_port, _ip)
setZeroLogContext(_ip, _port)
}
// SetLogVerbosity specifies the verbosity of global logger

Loading…
Cancel
Save