From 9ffbf682c075b49188923c65a0bbf39ac188be00 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sat, 22 Feb 2020 11:59:56 -0800 Subject: [PATCH] Refactor offchain data commit; Make block onchain/offchain commit atomic (#2279) * Refactor offchain data; Add epoch to ValidatorSnapshot * Make block onchain/offchain data commit atomically --- core/blockchain.go | 526 ++++------------------------ core/offchain.go | 184 ++++++++++ core/rawdb/accessors_chain.go | 437 ----------------------- core/rawdb/accessors_chain_test.go | 10 +- core/rawdb/accessors_offchain.go | 362 +++++++++++++++++++ core/rawdb/schema.go | 21 +- internal/hmyapi/apiv1/blockchain.go | 2 +- internal/hmyapi/apiv2/blockchain.go | 2 +- 8 files changed, 631 insertions(+), 913 deletions(-) create mode 100644 core/offchain.go create mode 100644 core/rawdb/accessors_offchain.go diff --git a/core/blockchain.go b/core/blockchain.go index d9861904c..19cb26d4a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -603,31 +603,36 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { return nil } -// insert injects a new head block into the current block chain. This method -// assumes that the block is indeed a true head. It will also reset the head -// header and the head fast sync block to this very same block if they are older -// or if they are on a different side chain. -// -// Note, this function assumes that the `mu` mutex is held! -func (bc *BlockChain) insert(block *types.Block) { +// similar to insert, but add to the db writer. +func (bc *BlockChain) insertWithWriter(batch rawdb.DatabaseWriter, block *types.Block) { // If the block is on a side chain or an unknown one, force other heads onto it too updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash() // Add the block to the canonical chain number scheme and mark as the head - rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64()) - rawdb.WriteHeadBlockHash(bc.db, block.Hash()) + rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64()) + rawdb.WriteHeadBlockHash(batch, block.Hash()) bc.currentBlock.Store(block) // If the block is better than our head or is on a different chain, force update heads if updateHeads { bc.hc.SetCurrentHeader(block.Header()) - rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()) + rawdb.WriteHeadFastBlockHash(batch, block.Hash()) bc.currentFastBlock.Store(block) } } +// insert injects a new head block into the current block chain. This method +// assumes that the block is indeed a true head. It will also reset the head +// header and the head fast sync block to this very same block if they are older +// or if they are on a different side chain. +// +// Note, this function assumes that the `mu` mutex is held! +func (bc *BlockChain) insert(block *types.Block) { + bc.insertWithWriter(bc.db, block) +} + // Genesis retrieves the chain's genesis block. func (bc *BlockChain) Genesis() *types.Block { return bc.genesisBlock @@ -1052,17 +1057,19 @@ func (bc *BlockChain) WriteBlockWithState( defer bc.mu.Unlock() currentBlock := bc.CurrentBlock() + if currentBlock == nil || block.ParentHash() != currentBlock.Hash() { + return NonStatTy, errors.New("Hash of parent block doesn't match the current block hash") + } - rawdb.WriteBlock(bc.db, block) - + // Commit state object changes to in-memory trie root, err := state.Commit(bc.chainConfig.IsS3(block.Epoch())) if err != nil { return NonStatTy, err } - triedb := bc.stateCache.TrieDB() - // If we're running an archive node, always flush - if bc.cacheConfig.Disabled { + // Flush trie state into disk if it's archival node or the block is epoch block + triedb := bc.stateCache.TrieDB() + if bc.cacheConfig.Disabled || len(block.Header().ShardState()) > 0 { if err := triedb.Commit(root, false); err != nil { return NonStatTy, err } @@ -1112,208 +1119,31 @@ func (bc *BlockChain) WriteBlockWithState( } } - // Write other block data using a batch. - // TODO: put following into a func - /////////////////////////// START batch := bc.db.NewBatch() - rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) - - //// Cross-shard txns - epoch := block.Header().Epoch() - if bc.chainConfig.HasCrossTxFields(block.Epoch()) { - shardingConfig := shard.Schedule.InstanceForEpoch(epoch) - shardNum := int(shardingConfig.NumShards()) - for i := 0; i < shardNum; i++ { - if i == int(block.ShardID()) { - continue - } + // Write the raw block + rawdb.WriteBlock(batch, block) - shardReceipts := types.CXReceipts(cxReceipts).GetToShardReceipts(uint32(i)) - err := rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts) - if err != nil { - utils.Logger().Error().Err(err).Interface("shardReceipts", shardReceipts).Int("toShardID", i).Msg("WriteCXReceipts cannot write into database") - return NonStatTy, err - } - } - // Mark incomingReceipts in the block as spent - bc.WriteCXReceiptsProofSpent(block.IncomingReceipts()) + // Write offchain data + if status, err := bc.CommitOffChainData( + batch, block, receipts, + cxReceipts, payout, state, root); err != nil { + return status, err } - //// VRF + VDF - //check non zero VRF field in header and add to local db - if len(block.Vrf()) > 0 { - vrfBlockNumbers, _ := bc.ReadEpochVrfBlockNums(block.Header().Epoch()) - if (len(vrfBlockNumbers) > 0) && (vrfBlockNumbers[len(vrfBlockNumbers)-1] == block.NumberU64()) { - utils.Logger().Error(). - Str("number", block.Number().String()). - Str("epoch", block.Header().Epoch().String()). - Msg("VRF block number is already in local db") - } else { - vrfBlockNumbers = append(vrfBlockNumbers, block.NumberU64()) - err = bc.WriteEpochVrfBlockNums(block.Header().Epoch(), vrfBlockNumbers) - if err != nil { - utils.Logger().Error(). - Str("number", block.Number().String()). - Str("epoch", block.Header().Epoch().String()). - Msg("failed to write VRF block number to local db") - return NonStatTy, err - } - } - } + // Write the positional metadata for transaction/receipt lookups and preimages + rawdb.WriteTxLookupEntries(batch, block) + rawdb.WriteCxLookupEntries(batch, block) + rawdb.WritePreimages(batch, block.NumberU64(), state.Preimages()) - //check non zero Vdf in header and add to local db - if len(block.Vdf()) > 0 { - err = bc.WriteEpochVdfBlockNum(block.Header().Epoch(), block.Number()) - if err != nil { - utils.Logger().Error(). - Str("number", block.Number().String()). - Str("epoch", block.Header().Epoch().String()). - Msg("failed to write VDF block number to local db") - return NonStatTy, err - } - } + // Update current block + bc.insertWithWriter(batch, block) - //// Shard State and Validator Update - header := block.Header() - if len(header.ShardState()) > 0 { - // Write shard state for the new epoch - epoch := new(big.Int).Add(header.Epoch(), common.Big1) - shardState, err := block.Header().GetShardState() - if err == nil && shardState.Epoch != nil && bc.chainConfig.IsStaking(shardState.Epoch) { - // After staking, the epoch will be decided by the epoch in the shard state. - epoch = new(big.Int).Set(shardState.Epoch) - } - - newShardState, err := bc.WriteShardStateBytes(batch, epoch, header.ShardState()) - if err != nil { - header.Logger(utils.Logger()).Warn().Err(err).Msg("cannot store shard state") - return NonStatTy, err - } - - // Find all the active validator addresses and store them in db - allActiveValidators := []common.Address{} - processed := make(map[common.Address]struct{}) - for i := range newShardState.Shards { - shard := newShardState.Shards[i] - for j := range shard.Slots { - slot := shard.Slots[j] - if slot.EffectiveStake != nil { // For external validator - _, ok := processed[slot.EcdsaAddress] - if !ok { - processed[slot.EcdsaAddress] = struct{}{} - allActiveValidators = append(allActiveValidators, shard.Slots[j].EcdsaAddress) - } - } - } - } - - // Update active validators - if err := bc.WriteActiveValidatorList(allActiveValidators); err != nil { - return NonStatTy, err - } - - // Update snapshots for all validators - if err := bc.UpdateValidatorSnapshots(); err != nil { - return NonStatTy, err - } - } - - // Do bookkeeping for new staking txns - for _, tx := range block.StakingTransactions() { - err = bc.UpdateStakingMetaData(tx, root) - // keep offchain database consistency with onchain we need revert - // but it should not happend unless local database corrupted - if err != nil { - utils.Logger().Debug().Msgf("oops, UpdateStakingMetaData failed, err: %+v", err) - return NonStatTy, err - } - } - - // Update voting power of validators for all shards - if block.ShardID() == shard.BeaconChainShardID && - len(block.Header().ShardState()) > 0 { - shardState := new(shard.State) - - if shardState, err = shard.DecodeWrapper(block.Header().ShardState()); err == nil { - if err = bc.UpdateValidatorVotingPower(shardState); err != nil { - utils.Logger().Err(err).Msg("[UpdateValidatorVotingPower] Failed to update voting power") - } - } else { - utils.Logger().Err(err).Msg("[UpdateValidatorVotingPower] Failed to decode shard state") - } - } - - //// Writing beacon chain cross links - if header.ShardID() == shard.BeaconChainShardID && - bc.chainConfig.IsCrossLink(block.Epoch()) && - len(header.CrossLinks()) > 0 { - crossLinks := &types.CrossLinks{} - err = rlp.DecodeBytes(header.CrossLinks(), crossLinks) - if err != nil { - header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain/crosslinks] cannot parse cross links") - return NonStatTy, err - } - if !crossLinks.IsSorted() { - header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain/crosslinks] cross links are not sorted") - return NonStatTy, errors.New("proposed cross links are not sorted") - } - for _, crossLink := range *crossLinks { - // Process crosslink - if err := bc.WriteCrossLinks(types.CrossLinks{crossLink}); err == nil { - utils.Logger().Info().Uint64("blockNum", crossLink.BlockNum()).Uint32("shardID", crossLink.ShardID()).Msg("[insertChain/crosslinks] Cross Link Added to Beaconchain") - } - bc.LastContinuousCrossLink(crossLink) - } - - //clean/update local database cache after crosslink inserted into blockchain - num, err := bc.DeleteCommittedFromPendingCrossLinks(*crossLinks) - utils.Logger().Debug().Msgf("DeleteCommittedFromPendingCrossLinks, crosslinks in header %d, pending crosslinks: %d, error: %+v", len(*crossLinks), num, err) - } - - if bc.CurrentHeader().ShardID() == shard.BeaconChainShardID { - if bc.chainConfig.IsStaking(block.Epoch()) { - bc.UpdateBlockRewardAccumulator(payout, block.Number().Uint64()) - } else { - // block reward never accumulate before staking - bc.WriteBlockRewardAccumulator(big.NewInt(0), block.Number().Uint64()) - } - } - - /////////////////////////// END - - // If the total difficulty is higher than our known, add it to the canonical chain - // Second clause in the if statement reduces the vulnerability to selfish mining. - // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf - // TODO: Remove reorg code, it's not used in our code - reorg := true - if reorg { - // Reorganise the chain if the parent is not the head block - if block.ParentHash() != currentBlock.Hash() { - if err := bc.reorg(currentBlock, block); err != nil { - return NonStatTy, err - } - } - // Write the positional metadata for transaction/receipt lookups and preimages - rawdb.WriteTxLookupEntries(batch, block) - rawdb.WritePreimages(batch, block.NumberU64(), state.Preimages()) - - // write the positional metadata for CXReceipts lookups - rawdb.WriteCxLookupEntries(batch, block) - - status = CanonStatTy - } else { - status = SideStatTy - } if err := batch.Write(); err != nil { return NonStatTy, err } - // Set new head. - if status == CanonStatTy { - bc.insert(block) - } bc.futureBlocks.Remove(block.Hash()) - return status, nil + return CanonStatTy, nil } // InsertChain attempts to insert the given batch of blocks in to the canonical @@ -1526,10 +1356,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int, // Only count canonical blocks for GC processing time bc.gcproc += proctime - case SideStatTy: - logger.Debug().Msg("Inserted forked block") - blockInsertTimer.UpdateSince(bstart) - events = append(events, ChainSideEvent{block}) } stats.processed++ @@ -1607,132 +1433,6 @@ func countTransactions(chain []*types.Block) (c int) { return c } -// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them -// to be part of the new canonical chain and accumulates potential missing transactions and post an -// event about them -func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { - var ( - newChain types.Blocks - oldChain types.Blocks - commonBlock *types.Block - deletedTxs types.Transactions - deletedLogs []*types.Log - // collectLogs collects the logs that were generated during the - // processing of the block that corresponds with the given hash. - // These logs are later announced as deleted. - collectLogs = func(hash common.Hash) { - // Coalesce logs and set 'Removed'. - number := bc.hc.GetBlockNumber(hash) - if number == nil { - return - } - receipts := rawdb.ReadReceipts(bc.db, hash, *number) - for _, receipt := range receipts { - for _, log := range receipt.Logs { - del := *log - del.Removed = true - deletedLogs = append(deletedLogs, &del) - } - } - } - ) - - // first reduce whoever is higher bound - if oldBlock.NumberU64() > newBlock.NumberU64() { - // reduce old chain - for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) { - oldChain = append(oldChain, oldBlock) - deletedTxs = append(deletedTxs, oldBlock.Transactions()...) - - collectLogs(oldBlock.Hash()) - } - } else { - // reduce new chain and append new chain blocks for inserting later on - for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) { - newChain = append(newChain, newBlock) - } - } - if oldBlock == nil { - return fmt.Errorf("Invalid old chain") - } - if newBlock == nil { - return fmt.Errorf("Invalid new chain") - } - - for { - if oldBlock.Hash() == newBlock.Hash() { - commonBlock = oldBlock - break - } - - oldChain = append(oldChain, oldBlock) - newChain = append(newChain, newBlock) - deletedTxs = append(deletedTxs, oldBlock.Transactions()...) - collectLogs(oldBlock.Hash()) - - oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) - if oldBlock == nil { - return fmt.Errorf("Invalid old chain") - } - if newBlock == nil { - return fmt.Errorf("Invalid new chain") - } - } - // Ensure the user sees large reorgs - if len(oldChain) > 0 && len(newChain) > 0 { - logEvent := utils.Logger().Debug() - if len(oldChain) > 63 { - logEvent = utils.Logger().Warn() - } - logEvent. - Str("number", commonBlock.Number().String()). - Str("hash", commonBlock.Hash().Hex()). - Int("drop", len(oldChain)). - Str("dropfrom", oldChain[0].Hash().Hex()). - Int("add", len(newChain)). - Str("addfrom", newChain[0].Hash().Hex()). - Msg("Chain split detected") - } else { - utils.Logger().Error(). - Str("oldnum", oldBlock.Number().String()). - Str("oldhash", oldBlock.Hash().Hex()). - Str("newnum", newBlock.Number().String()). - Str("newhash", newBlock.Hash().Hex()). - Msg("Impossible reorg, please file an issue") - } - // Insert the new chain, taking care of the proper incremental order - var addedTxs types.Transactions - for i := len(newChain) - 1; i >= 0; i-- { - // insert the block in the canonical way, re-writing history - bc.insert(newChain[i]) - // write lookup entries for hash based transaction/receipt searches - rawdb.WriteTxLookupEntries(bc.db, newChain[i]) - addedTxs = append(addedTxs, newChain[i].Transactions()...) - } - // calculate the difference between deleted and added transactions - diff := types.TxDifference(deletedTxs, addedTxs) - // When transactions get deleted from the database that means the - // receipts that were created in the fork must also be deleted - batch := bc.db.NewBatch() - for _, tx := range diff { - rawdb.DeleteTxLookupEntry(batch, tx.Hash()) - } - batch.Write() - - if len(deletedLogs) > 0 { - go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) - } - if len(oldChain) > 0 { - go func() { - for _, block := range oldChain { - bc.chainSideFeed.Send(ChainSideEvent{Block: block}) - } - }() - } - - return nil -} - // PostChainEvents iterates over the events generated by a chain insertion and // posts them into the event feed. // TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock. @@ -2129,11 +1829,11 @@ func (bc *BlockChain) WriteEpochVdfBlockNum(epoch *big.Int, blockNum *big.Int) e } // WriteCrossLinks saves the hashes of crosslinks by shardID and blockNum combination key -func (bc *BlockChain) WriteCrossLinks(cls []types.CrossLink) error { +func (bc *BlockChain) WriteCrossLinks(batch rawdb.DatabaseWriter, cls []types.CrossLink) error { var err error for i := 0; i < len(cls); i++ { cl := cls[i] - err = rawdb.WriteCrossLinkShardBlock(bc.db, cl.ShardID(), cl.BlockNum(), cl.Serialize()) + err = rawdb.WriteCrossLinkShardBlock(batch, cl.ShardID(), cl.BlockNum(), cl.Serialize()) } return err } @@ -2163,14 +1863,14 @@ func (bc *BlockChain) ReadCrossLink(shardID uint32, blockNum uint64) (*types.Cro // This function will update the latest crosslink in the sense that // any previous block's crosslink is received up to this point // there is no missing hole between genesis to this crosslink of given shardID -func (bc *BlockChain) LastContinuousCrossLink(cl types.CrossLink) error { +func (bc *BlockChain) LastContinuousCrossLink(batch rawdb.DatabaseWriter, cl types.CrossLink) error { if !bc.Config().IsCrossLink(cl.Epoch()) { return errors.New("Trying to write last continuous cross link with epoch before cross link starting epoch") } cl0, err := bc.ReadShardLastCrossLink(cl.ShardID()) if cl0 == nil { - rawdb.WriteShardLastCrossLink(bc.db, cl.ShardID(), cl.Serialize()) + rawdb.WriteShardLastCrossLink(batch, cl.ShardID(), cl.Serialize()) return nil } if err != nil { @@ -2194,7 +1894,7 @@ func (bc *BlockChain) LastContinuousCrossLink(cl types.CrossLink) error { if err != nil { return err } - return rawdb.WriteShardLastCrossLink(bc.db, cln.ShardID(), cln.Serialize()) + return rawdb.WriteShardLastCrossLink(batch, cln.ShardID(), cln.Serialize()) } return nil } @@ -2444,58 +2144,11 @@ func (bc *BlockChain) CXMerkleProof(toShardID uint32, block *types.Block) (*type return proof, nil } -// LatestCXReceiptsCheckpoint returns the latest checkpoint -func (bc *BlockChain) LatestCXReceiptsCheckpoint(shardID uint32) uint64 { - blockNum, _ := rawdb.ReadCXReceiptsProofUnspentCheckpoint(bc.db, shardID) - return blockNum -} - -// NextCXReceiptsCheckpoint returns the next checkpoint blockNum -func (bc *BlockChain) NextCXReceiptsCheckpoint(currentNum uint64, shardID uint32) uint64 { - lastCheckpoint, _ := rawdb.ReadCXReceiptsProofUnspentCheckpoint(bc.db, shardID) - newCheckpoint := lastCheckpoint - - // the new checkpoint will not exceed currentNum+1 - for num := lastCheckpoint; num <= currentNum+1; num++ { - newCheckpoint = num - by, _ := rawdb.ReadCXReceiptsProofSpent(bc.db, shardID, num) - if by == rawdb.NAByte { - // TODO chao: check if there is IncompingReceiptsHash in crosslink header - // if the rootHash is non-empty, it means incomingReceipts are not delivered - // otherwise, it means there is no cross-shard transactions for this block - continue - } - if by == rawdb.SpentByte { - continue - } - // the first unspent blockHash found, break the loop - break - } - return newCheckpoint -} - -// updateCXReceiptsCheckpoints will update the checkpoint and clean spent receipts upto checkpoint -func (bc *BlockChain) updateCXReceiptsCheckpoints(shardID uint32, currentNum uint64) { - lastCheckpoint, err := rawdb.ReadCXReceiptsProofUnspentCheckpoint(bc.db, shardID) - if err != nil { - utils.Logger().Warn().Msg("[updateCXReceiptsCheckpoints] Cannot get lastCheckpoint") - } - newCheckpoint := bc.NextCXReceiptsCheckpoint(currentNum, shardID) - if lastCheckpoint == newCheckpoint { - return - } - utils.Logger().Debug().Uint64("lastCheckpoint", lastCheckpoint).Uint64("newCheckpont", newCheckpoint).Msg("[updateCXReceiptsCheckpoints]") - for num := lastCheckpoint; num < newCheckpoint; num++ { - rawdb.DeleteCXReceiptsProofSpent(bc.db, shardID, num) - } - rawdb.WriteCXReceiptsProofUnspentCheckpoint(bc.db, shardID, newCheckpoint) -} - // WriteCXReceiptsProofSpent mark the CXReceiptsProof list with given unspent status // true: unspent, false: spent -func (bc *BlockChain) WriteCXReceiptsProofSpent(cxps []*types.CXReceiptsProof) { +func (bc *BlockChain) WriteCXReceiptsProofSpent(db rawdb.DatabaseWriter, cxps []*types.CXReceiptsProof) { for _, cxp := range cxps { - rawdb.WriteCXReceiptsProofSpent(bc.db, cxp) + rawdb.WriteCXReceiptsProofSpent(db, cxp) } } @@ -2504,31 +2157,12 @@ func (bc *BlockChain) IsSpent(cxp *types.CXReceiptsProof) bool { shardID := cxp.MerkleProof.ShardID blockNum := cxp.MerkleProof.BlockNum.Uint64() by, _ := rawdb.ReadCXReceiptsProofSpent(bc.db, shardID, blockNum) - if by == rawdb.SpentByte || cxp.MerkleProof.BlockNum.Uint64() < bc.LatestCXReceiptsCheckpoint(cxp.MerkleProof.ShardID) { + if by == rawdb.SpentByte { return true } return false } -// UpdateCXReceiptsCheckpointsByBlock cleans checkpoints and update latest checkpoint based on incomingReceipts of the given block -func (bc *BlockChain) UpdateCXReceiptsCheckpointsByBlock(block *types.Block) { - m := make(map[uint32]uint64) - for _, cxp := range block.IncomingReceipts() { - shardID := cxp.MerkleProof.ShardID - blockNum := cxp.MerkleProof.BlockNum.Uint64() - if _, ok := m[shardID]; !ok { - m[shardID] = blockNum - } else if m[shardID] < blockNum { - m[shardID] = blockNum - } - } - - for k, v := range m { - utils.Logger().Debug().Uint32("shardID", k).Uint64("blockNum", v).Msg("[CleanCXReceiptsCheckpoints] Cleaning CXReceiptsProof upto") - bc.updateCXReceiptsCheckpoints(k, v) - } -} - // ReadTxLookupEntry returns where the given transaction resides in the chain, // as a (block hash, block number, index in transaction list) triple. // returns 0, 0 if not found @@ -2571,11 +2205,11 @@ func (bc *BlockChain) ReadValidatorSnapshot( return &v, nil } - return rawdb.ReadValidatorSnapshot(bc.db, addr) + return rawdb.ReadValidatorSnapshot(bc.db, addr, bc.CurrentBlock().Epoch()) } // writeValidatorSnapshots writes the snapshot of provided list of validators -func (bc *BlockChain) writeValidatorSnapshots(addrs []common.Address) error { +func (bc *BlockChain) writeValidatorSnapshots(batch rawdb.DatabaseWriter, addrs []common.Address, epoch *big.Int) error { // Read all validator's current data validators := []*staking.ValidatorWrapper{} for i := range addrs { @@ -2586,15 +2220,11 @@ func (bc *BlockChain) writeValidatorSnapshots(addrs []common.Address) error { validators = append(validators, validator) } // Batch write the current data as snapshot - batch := bc.db.NewBatch() for i := range validators { - if err := rawdb.WriteValidatorSnapshot(batch, validators[i]); err != nil { + if err := rawdb.WriteValidatorSnapshot(batch, validators[i], epoch); err != nil { return err } } - if err := batch.Write(); err != nil { - return err - } // Update cache for i := range validators { @@ -2613,7 +2243,7 @@ func (bc *BlockChain) ReadValidatorStats(addr common.Address) (*staking.Validato } // UpdateValidatorVotingPower writes the voting power for the committees -func (bc *BlockChain) UpdateValidatorVotingPower(state *shard.State) error { +func (bc *BlockChain) UpdateValidatorVotingPower(batch rawdb.DatabaseWriter, state *shard.State) error { if state == nil { return errors.New("[UpdateValidatorVotingPower] Nil shard state") } @@ -2630,8 +2260,6 @@ func (bc *BlockChain) UpdateValidatorVotingPower(state *shard.State) error { networkWide := votepower.AggregateRosters(rosters) - batch := bc.db.NewBatch() - for key, value := range networkWide { statsFromDB, err := rawdb.ReadValidatorStats(bc.db, key) if statsFromDB == nil { @@ -2649,18 +2277,15 @@ func (bc *BlockChain) UpdateValidatorVotingPower(state *shard.State) error { } } - if err := batch.Write(); err != nil { - return err - } - // TODO: Update cache return nil } // deleteValidatorSnapshots deletes the snapshot staking information of given validator address +// TODO: delete validator snapshots from X epochs ago func (bc *BlockChain) deleteValidatorSnapshots(addrs []common.Address) error { batch := bc.db.NewBatch() for i := range addrs { - rawdb.DeleteValidatorSnapshot(batch, addrs[i]) + rawdb.DeleteValidatorSnapshot(batch, addrs[i], bc.CurrentBlock().Epoch()) } if err := batch.Write(); err != nil { return err @@ -2672,8 +2297,8 @@ func (bc *BlockChain) deleteValidatorSnapshots(addrs []common.Address) error { } // UpdateValidatorSnapshots updates the content snapshot of all validators -// Note: this should only be called within the blockchain insertBlock process. -func (bc *BlockChain) UpdateValidatorSnapshots() error { +// Note: this should only be called within the blockchain insert process. +func (bc *BlockChain) UpdateValidatorSnapshots(batch rawdb.DatabaseWriter, epoch *big.Int) error { allValidators, err := bc.ReadValidatorList() if err != nil { return err @@ -2685,7 +2310,7 @@ func (bc *BlockChain) UpdateValidatorSnapshots() error { // return err //} - return bc.writeValidatorSnapshots(allValidators) + return bc.writeValidatorSnapshots(batch, allValidators, epoch) } // ReadValidatorList reads the addresses of current all validators @@ -2702,9 +2327,9 @@ func (bc *BlockChain) ReadValidatorList() ([]common.Address, error) { } // WriteValidatorList writes the list of validator addresses to database -// Note: this should only be called within the blockchain insertBlock process. -func (bc *BlockChain) WriteValidatorList(addrs []common.Address) error { - err := rawdb.WriteValidatorList(bc.db, addrs, false) +// Note: this should only be called within the blockchain insert process. +func (bc *BlockChain) WriteValidatorList(db rawdb.DatabaseWriter, addrs []common.Address) error { + err := rawdb.WriteValidatorList(db, addrs, false) if err != nil { return err } @@ -2729,9 +2354,9 @@ func (bc *BlockChain) ReadActiveValidatorList() ([]common.Address, error) { } // WriteActiveValidatorList writes the list of active validator addresses to database -// Note: this should only be called within the blockchain insertBlock process. -func (bc *BlockChain) WriteActiveValidatorList(addrs []common.Address) error { - err := rawdb.WriteValidatorList(bc.db, addrs, true) +// Note: this should only be called within the blockchain insert process. +func (bc *BlockChain) WriteActiveValidatorList(batch rawdb.DatabaseWriter, addrs []common.Address) error { + err := rawdb.WriteValidatorList(batch, addrs, true) if err != nil { return err } @@ -2756,8 +2381,8 @@ func (bc *BlockChain) ReadDelegationsByDelegator(delegator common.Address) ([]st } // writeDelegationsByDelegator writes the list of validator addresses to database -func (bc *BlockChain) writeDelegationsByDelegator(delegator common.Address, indices []staking.DelegationIndex) error { - err := rawdb.WriteDelegationsByDelegator(bc.db, delegator, indices) +func (bc *BlockChain) writeDelegationsByDelegator(batch rawdb.DatabaseWriter, delegator common.Address, indices []staking.DelegationIndex) error { + err := rawdb.WriteDelegationsByDelegator(batch, delegator, indices) if err != nil { return err } @@ -2769,8 +2394,8 @@ func (bc *BlockChain) writeDelegationsByDelegator(delegator common.Address, indi } // UpdateStakingMetaData updates the validator's and the delegator's meta data according to staking transaction -// Note: this should only be called within the blockchain insertBlock process. -func (bc *BlockChain) UpdateStakingMetaData(tx *staking.StakingTransaction, root common.Hash) error { +// Note: this should only be called within the blockchain insert process. +func (bc *BlockChain) UpdateStakingMetaData(batch rawdb.DatabaseWriter, tx *staking.StakingTransaction, root common.Hash) error { // TODO: simply the logic here in staking/types/transaction.go payload, err := tx.RLPEncodeStakeMsg() if err != nil { @@ -2784,7 +2409,6 @@ func (bc *BlockChain) UpdateStakingMetaData(tx *staking.StakingTransaction, root switch tx.StakingType() { case staking.DirectiveCreateValidator: createValidator := decodePayload.(*staking.CreateValidator) - // TODO: batch add validator list instead of one by one list, err := bc.ReadValidatorList() if err != nil { return err @@ -2796,7 +2420,7 @@ func (bc *BlockChain) UpdateStakingMetaData(tx *staking.StakingTransaction, root list = utils.AppendIfMissing(list, createValidator.ValidatorAddress) if len(list) > beforeLen { - if err = bc.WriteValidatorList(list); err != nil { + if err = bc.WriteValidatorList(batch, list); err != nil { return err } } @@ -2809,16 +2433,16 @@ func (bc *BlockChain) UpdateStakingMetaData(tx *staking.StakingTransaction, root validator.Snapshot.Epoch = epoch - if err := rawdb.WriteValidatorSnapshot(bc.db, validator); err != nil { + if err := rawdb.WriteValidatorSnapshot(batch, validator, epoch); err != nil { return err } // Add self delegation into the index - return bc.addDelegationIndex(createValidator.ValidatorAddress, createValidator.ValidatorAddress, root) + return bc.addDelegationIndex(batch, createValidator.ValidatorAddress, createValidator.ValidatorAddress, root) case staking.DirectiveEditValidator: case staking.DirectiveDelegate: delegate := decodePayload.(*staking.Delegate) - return bc.addDelegationIndex(delegate.DelegatorAddress, delegate.ValidatorAddress, root) + return bc.addDelegationIndex(batch, delegate.DelegatorAddress, delegate.ValidatorAddress, root) case staking.DirectiveUndelegate: case staking.DirectiveCollectRewards: default: @@ -2839,8 +2463,8 @@ func (bc *BlockChain) ReadBlockRewardAccumulator(number uint64) (*big.Int, error // WriteBlockRewardAccumulator directly writes the BlockRewardAccumulator value // Note: this should only be called once during staking launch. -func (bc *BlockChain) WriteBlockRewardAccumulator(reward *big.Int, number uint64) error { - err := rawdb.WriteBlockRewardAccumulator(bc.db, reward, number) +func (bc *BlockChain) WriteBlockRewardAccumulator(batch rawdb.DatabaseWriter, reward *big.Int, number uint64) error { + err := rawdb.WriteBlockRewardAccumulator(batch, reward, number) if err != nil { return err } @@ -2849,19 +2473,19 @@ func (bc *BlockChain) WriteBlockRewardAccumulator(reward *big.Int, number uint64 } //UpdateBlockRewardAccumulator .. -// Note: this should only be called within the blockchain insertBlock process. -func (bc *BlockChain) UpdateBlockRewardAccumulator(diff *big.Int, number uint64) error { +// Note: this should only be called within the blockchain insert process. +func (bc *BlockChain) UpdateBlockRewardAccumulator(batch rawdb.DatabaseWriter, diff *big.Int, number uint64) error { current, err := bc.ReadBlockRewardAccumulator(number - 1) if err != nil { // one-off fix for pangaea, return after pangaea enter staking. current = big.NewInt(0) - bc.WriteBlockRewardAccumulator(current, number) + bc.WriteBlockRewardAccumulator(batch, current, number) } - return bc.WriteBlockRewardAccumulator(new(big.Int).Add(current, diff), number) + return bc.WriteBlockRewardAccumulator(batch, new(big.Int).Add(current, diff), number) } // Note this should read from the state of current block in concern (root == newBlock.root) -func (bc *BlockChain) addDelegationIndex(delegatorAddress, validatorAddress common.Address, root common.Hash) error { +func (bc *BlockChain) addDelegationIndex(batch rawdb.DatabaseWriter, delegatorAddress, validatorAddress common.Address, root common.Hash) error { // Get existing delegations delegations, err := bc.ReadDelegationsByDelegator(delegatorAddress) if err != nil { @@ -2890,7 +2514,7 @@ func (bc *BlockChain) addDelegationIndex(delegatorAddress, validatorAddress comm }) } } - return bc.writeDelegationsByDelegator(delegatorAddress, delegations) + return bc.writeDelegationsByDelegator(batch, delegatorAddress, delegations) } // ValidatorCandidates returns the up to date validator candidates for next epoch diff --git a/core/offchain.go b/core/offchain.go new file mode 100644 index 000000000..d3656d57f --- /dev/null +++ b/core/offchain.go @@ -0,0 +1,184 @@ +package core + +import ( + "errors" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/core/rawdb" + "github.com/harmony-one/harmony/core/state" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/shard" +) + +// CommitOffChainData write off chain data of a block onto db writer. +func (bc *BlockChain) CommitOffChainData( + batch rawdb.DatabaseWriter, block *types.Block, receipts []*types.Receipt, + cxReceipts []*types.CXReceipt, payout *big.Int, state *state.DB, root common.Hash) (status WriteStatus, err error) { + //// Write receipts of the block + rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) + + //// Cross-shard txns + epoch := block.Header().Epoch() + if bc.chainConfig.HasCrossTxFields(block.Epoch()) { + shardingConfig := shard.Schedule.InstanceForEpoch(epoch) + shardNum := int(shardingConfig.NumShards()) + for i := 0; i < shardNum; i++ { + if i == int(block.ShardID()) { + continue + } + + shardReceipts := types.CXReceipts(cxReceipts).GetToShardReceipts(uint32(i)) + err := rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts) + if err != nil { + utils.Logger().Error().Err(err).Interface("shardReceipts", shardReceipts).Int("toShardID", i).Msg("WriteCXReceipts cannot write into database") + return NonStatTy, err + } + } + // Mark incomingReceipts in the block as spent + bc.WriteCXReceiptsProofSpent(batch, block.IncomingReceipts()) + } + + //// VRF + VDF + //check non zero VRF field in header and add to local db + //if len(block.Vrf()) > 0 { + // vrfBlockNumbers, _ := bc.ReadEpochVrfBlockNums(block.Header().Epoch()) + // if (len(vrfBlockNumbers) > 0) && (vrfBlockNumbers[len(vrfBlockNumbers)-1] == block.NumberU64()) { + // utils.Logger().Error(). + // Str("number", block.Number().String()). + // Str("epoch", block.Header().Epoch().String()). + // Msg("VRF block number is already in local db") + // } else { + // vrfBlockNumbers = append(vrfBlockNumbers, block.NumberU64()) + // err = bc.WriteEpochVrfBlockNums(block.Header().Epoch(), vrfBlockNumbers) + // if err != nil { + // utils.Logger().Error(). + // Str("number", block.Number().String()). + // Str("epoch", block.Header().Epoch().String()). + // Msg("failed to write VRF block number to local db") + // return NonStatTy, err + // } + // } + //} + // + ////check non zero Vdf in header and add to local db + //if len(block.Vdf()) > 0 { + // err = bc.WriteEpochVdfBlockNum(block.Header().Epoch(), block.Number()) + // if err != nil { + // utils.Logger().Error(). + // Str("number", block.Number().String()). + // Str("epoch", block.Header().Epoch().String()). + // Msg("failed to write VDF block number to local db") + // return NonStatTy, err + // } + //} + + //// Shard State and Validator Update + header := block.Header() + if len(header.ShardState()) > 0 { + // Write shard state for the new epoch + epoch := new(big.Int).Add(header.Epoch(), common.Big1) + shardState, err := block.Header().GetShardState() + if err == nil && shardState.Epoch != nil && bc.chainConfig.IsStaking(shardState.Epoch) { + // After staking, the epoch will be decided by the epoch in the shard state. + epoch = new(big.Int).Set(shardState.Epoch) + } + + newShardState, err := bc.WriteShardStateBytes(batch, epoch, header.ShardState()) + if err != nil { + header.Logger(utils.Logger()).Warn().Err(err).Msg("cannot store shard state") + return NonStatTy, err + } + + // Find all the active validator addresses and store them in db + allActiveValidators := []common.Address{} + processed := make(map[common.Address]struct{}) + for i := range newShardState.Shards { + shard := newShardState.Shards[i] + for j := range shard.Slots { + slot := shard.Slots[j] + if slot.EffectiveStake != nil { // For external validator + _, ok := processed[slot.EcdsaAddress] + if !ok { + processed[slot.EcdsaAddress] = struct{}{} + allActiveValidators = append(allActiveValidators, shard.Slots[j].EcdsaAddress) + } + } + } + } + + // Update active validators + if err := bc.WriteActiveValidatorList(batch, allActiveValidators); err != nil { + return NonStatTy, err + } + + // Update snapshots for all validators + if err := bc.UpdateValidatorSnapshots(batch, epoch); err != nil { + return NonStatTy, err + } + } + + // Do bookkeeping for new staking txns + for _, tx := range block.StakingTransactions() { + err = bc.UpdateStakingMetaData(batch, tx, root) + // keep offchain database consistency with onchain we need revert + // but it should not happend unless local database corrupted + if err != nil { + utils.Logger().Debug().Msgf("oops, UpdateStakingMetaData failed, err: %+v", err) + return NonStatTy, err + } + } + + // Update voting power of validators for all shards + if block.ShardID() == shard.BeaconChainShardID && + len(block.Header().ShardState()) > 0 { + shardState := new(shard.State) + + if shardState, err = shard.DecodeWrapper(block.Header().ShardState()); err == nil { + if err = bc.UpdateValidatorVotingPower(batch, shardState); err != nil { + utils.Logger().Err(err).Msg("[UpdateValidatorVotingPower] Failed to update voting power") + } + } else { + utils.Logger().Err(err).Msg("[UpdateValidatorVotingPower] Failed to decode shard state") + } + } + + //// Writing beacon chain cross links + if header.ShardID() == shard.BeaconChainShardID && + bc.chainConfig.IsCrossLink(block.Epoch()) && + len(header.CrossLinks()) > 0 { + crossLinks := &types.CrossLinks{} + err = rlp.DecodeBytes(header.CrossLinks(), crossLinks) + if err != nil { + header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain/crosslinks] cannot parse cross links") + return NonStatTy, err + } + if !crossLinks.IsSorted() { + header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain/crosslinks] cross links are not sorted") + return NonStatTy, errors.New("proposed cross links are not sorted") + } + for _, crossLink := range *crossLinks { + // Process crosslink + if err := bc.WriteCrossLinks(batch, types.CrossLinks{crossLink}); err == nil { + utils.Logger().Info().Uint64("blockNum", crossLink.BlockNum()).Uint32("shardID", crossLink.ShardID()).Msg("[insertChain/crosslinks] Cross Link Added to Beaconchain") + } + bc.LastContinuousCrossLink(batch, crossLink) + } + + //clean/update local database cache after crosslink inserted into blockchain + num, err := bc.DeleteCommittedFromPendingCrossLinks(*crossLinks) + utils.Logger().Debug().Msgf("DeleteCommittedFromPendingCrossLinks, crosslinks in header %d, pending crosslinks: %d, error: %+v", len(*crossLinks), num, err) + } + + if bc.CurrentHeader().ShardID() == shard.BeaconChainShardID { + if bc.chainConfig.IsStaking(block.Epoch()) { + bc.UpdateBlockRewardAccumulator(batch, payout, block.Number().Uint64()) + } else { + // block reward never accumulate before staking + bc.WriteBlockRewardAccumulator(batch, big.NewInt(0), block.Number().Uint64()) + } + } + return CanonStatTy, nil +} diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index d670d6295..917cd3267 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -25,10 +25,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/shard" - staking "github.com/harmony-one/harmony/staking/types" ) // MsgNoShardStateFromDB error message for shard state reading failure @@ -122,24 +119,6 @@ func WriteHeadFastBlockHash(db DatabaseWriter, hash common.Hash) { } } -// ReadFastTrieProgress retrieves the number of tries nodes fast synced to allow -// reporting correct numbers across restarts. -func ReadFastTrieProgress(db DatabaseReader) uint64 { - data, _ := db.Get(fastTrieProgressKey) - if len(data) == 0 { - return 0 - } - return new(big.Int).SetBytes(data).Uint64() -} - -// WriteFastTrieProgress stores the fast sync trie process counter to support -// retrieving it across restarts. -func WriteFastTrieProgress(db DatabaseWriter, count uint64) { - if err := db.Put(fastTrieProgressKey, new(big.Int).SetUint64(count).Bytes()); err != nil { - utils.Logger().Error().Msg("Failed to store fast sync trie progress") - } -} - // ReadHeaderRLP retrieves a block header in its raw RLP database encoding. func ReadHeaderRLP(db DatabaseReader, hash common.Hash, number uint64) rlp.RawValue { data, _ := db.Get(headerKey(number, hash)) @@ -351,31 +330,6 @@ func ReadBlock(db DatabaseReader, hash common.Hash, number uint64) *types.Block func WriteBlock(db DatabaseWriter, block *types.Block) { WriteBody(db, block.Hash(), block.NumberU64(), block.Body()) WriteHeader(db, block.Header()) - // TODO ek – maybe roll the below into WriteHeader() - epoch := block.Header().Epoch() - if epoch == nil { - // backward compatibility - return - } - epochBlockNum := block.Number() - writeOne := func() { - if err := WriteEpochBlockNumber(db, epoch, epochBlockNum); err != nil { - utils.Logger().Error().Err(err).Msg("Failed to write epoch block number") - } - } - // A block may be a genesis block AND end-of-epoch block at the same time. - if epochBlockNum.Sign() == 0 { - // Genesis block; record this block's epoch and block numbers. - writeOne() - } - - // TODO: don't change epoch based on shard state presence - if len(block.Header().ShardState()) > 0 && block.NumberU64() != 0 { - // End-of-epoch block; record the next epoch after this block. - epoch = new(big.Int).Add(epoch, common.Big1) - epochBlockNum = new(big.Int).Add(epochBlockNum, common.Big1) - writeOne() - } } // DeleteBlock removes all block data associated with a hash. @@ -412,394 +366,3 @@ func FindCommonAncestor(db DatabaseReader, a, b *block.Header) *block.Header { } return a } - -// ReadShardState retrieves sharding state. -func ReadShardState( - db DatabaseReader, epoch *big.Int, -) (*shard.State, error) { - data, err := db.Get(shardStateKey(epoch)) - if err != nil { - return nil, ctxerror.New(MsgNoShardStateFromDB, - "epoch", epoch, - ).WithCause(err) - } - ss, err2 := shard.DecodeWrapper(data) - if err2 != nil { - return nil, ctxerror.New("cannot decode sharding state", - "epoch", epoch, - ).WithCause(err2) - } - return ss, nil -} - -// WriteShardStateBytes stores sharding state into database. -func WriteShardStateBytes(db DatabaseWriter, epoch *big.Int, data []byte) (err error) { - if err = db.Put(shardStateKey(epoch), data); err != nil { - return ctxerror.New("cannot write sharding state", - "epoch", epoch, - ).WithCause(err) - } - utils.Logger().Info().Str("epoch", epoch.String()).Int("size", len(data)).Msg("wrote sharding state") - return nil -} - -// ReadLastCommits retrieves LastCommits. -func ReadLastCommits(db DatabaseReader) ([]byte, error) { - var data []byte - data, err := db.Get(lastCommitsKey) - if err != nil { - return nil, ctxerror.New("cannot read last commits from rawdb").WithCause(err) - } - return data, nil -} - -// WriteLastCommits stores last commits into database. -func WriteLastCommits( - db DatabaseWriter, data []byte, -) (err error) { - if err = db.Put(lastCommitsKey, data); err != nil { - return ctxerror.New("cannot write last commits").WithCause(err) - } - utils.Logger().Info(). - Int("size", len(data)). - Msg("wrote last commits") - return nil -} - -// ReadEpochBlockNumber retrieves the epoch block number for the given epoch, -// or nil if the given epoch is not found in the database. -func ReadEpochBlockNumber(db DatabaseReader, epoch *big.Int) (*big.Int, error) { - data, err := db.Get(epochBlockNumberKey(epoch)) - if err != nil { - return nil, err - } - return new(big.Int).SetBytes(data), nil -} - -// WriteEpochBlockNumber stores the given epoch-number-to-epoch-block-number in the database. -func WriteEpochBlockNumber(db DatabaseWriter, epoch, blockNum *big.Int) error { - return db.Put(epochBlockNumberKey(epoch), blockNum.Bytes()) -} - -// ReadEpochVrfBlockNums retrieves the VRF block numbers for the given epoch -func ReadEpochVrfBlockNums(db DatabaseReader, epoch *big.Int) ([]byte, error) { - return db.Get(epochVrfBlockNumbersKey(epoch)) -} - -// WriteEpochVrfBlockNums stores the VRF block numbers for the given epoch -func WriteEpochVrfBlockNums(db DatabaseWriter, epoch *big.Int, data []byte) error { - return db.Put(epochVrfBlockNumbersKey(epoch), data) -} - -// ReadEpochVdfBlockNum retrieves the VDF block number for the given epoch -func ReadEpochVdfBlockNum(db DatabaseReader, epoch *big.Int) ([]byte, error) { - return db.Get(epochVdfBlockNumberKey(epoch)) -} - -// WriteEpochVdfBlockNum stores the VDF block number for the given epoch -func WriteEpochVdfBlockNum(db DatabaseWriter, epoch *big.Int, data []byte) error { - return db.Put(epochVdfBlockNumberKey(epoch), data) -} - -// ReadCrossLinkShardBlock retrieves the blockHash given shardID and blockNum -func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64) ([]byte, error) { - return db.Get(crosslinkKey(shardID, blockNum)) -} - -// WriteCrossLinkShardBlock stores the blockHash given shardID and blockNum -func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte) error { - return db.Put(crosslinkKey(shardID, blockNum), data) -} - -// DeleteCrossLinkShardBlock deletes the blockHash given shardID and blockNum -func DeleteCrossLinkShardBlock(db DatabaseDeleter, shardID uint32, blockNum uint64) error { - return db.Delete(crosslinkKey(shardID, blockNum)) -} - -// ReadShardLastCrossLink read the last cross link of a shard -func ReadShardLastCrossLink(db DatabaseReader, shardID uint32) ([]byte, error) { - return db.Get(shardLastCrosslinkKey(shardID)) -} - -// WriteShardLastCrossLink stores the last cross link of a shard -func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) error { - return db.Put(shardLastCrosslinkKey(shardID), data) -} - -// ReadPendingCrossLinks retrieves last pending crosslinks. -func ReadPendingCrossLinks(db DatabaseReader) ([]byte, error) { - return db.Get(pendingCrosslinkKey) -} - -// WritePendingCrossLinks stores last pending crosslinks into database. -func WritePendingCrossLinks(db DatabaseWriter, bytes []byte) error { - return db.Put(pendingCrosslinkKey, bytes) -} - -// DeletePendingCrossLinks stores last pending crosslinks into database. -func DeletePendingCrossLinks(db DatabaseDeleter) error { - return db.Delete(pendingCrosslinkKey) -} - -// ReadPendingSlashingCandidates retrieves last pending slashing candidates -func ReadPendingSlashingCandidates(db DatabaseReader) ([]byte, error) { - return db.Get(pendingSlashingKey) -} - -// WritePendingSlashingCandidates stores last pending slashing candidates into database. -func WritePendingSlashingCandidates(db DatabaseWriter, bytes []byte) error { - return db.Put(pendingSlashingKey, bytes) -} - -// DeletePendingSlashingCandidates stores last pending slashing candidates into database. -func DeletePendingSlashingCandidates(db DatabaseDeleter) error { - return db.Delete(pendingSlashingKey) -} - -// ReadCXReceipts retrieves all the transactions of receipts given destination shardID, number and blockHash -func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) (types.CXReceipts, error) { - data, err := db.Get(cxReceiptKey(shardID, number, hash)) - if err != nil || len(data) == 0 { - utils.Logger().Info().Err(err).Uint64("number", number).Int("dataLen", len(data)).Msg("ReadCXReceipts") - return nil, err - } - cxReceipts := types.CXReceipts{} - if err := rlp.DecodeBytes(data, &cxReceipts); err != nil { - return nil, err - } - return cxReceipts, nil -} - -// WriteCXReceipts stores all the transaction receipts given destination shardID, blockNumber and blockHash -func WriteCXReceipts(db DatabaseWriter, shardID uint32, number uint64, hash common.Hash, receipts types.CXReceipts) error { - bytes, err := rlp.EncodeToBytes(receipts) - if err != nil { - utils.Logger().Error().Msg("[WriteCXReceipts] Failed to encode cross shard tx receipts") - } - // Store the receipt slice - if err := db.Put(cxReceiptKey(shardID, number, hash), bytes); err != nil { - utils.Logger().Error().Msg("[WriteCXReceipts] Failed to store cxreceipts") - } - return err -} - -// ReadCXReceiptsProofSpent check whether a CXReceiptsProof is unspent -func ReadCXReceiptsProofSpent(db DatabaseReader, shardID uint32, number uint64) (byte, error) { - data, err := db.Get(cxReceiptSpentKey(shardID, number)) - if err != nil || len(data) == 0 { - return NAByte, ctxerror.New("[ReadCXReceiptsProofSpent] Cannot find the key", "shardID", shardID, "number", number).WithCause(err) - } - return data[0], nil -} - -// WriteCXReceiptsProofSpent write CXReceiptsProof as spent into database -func WriteCXReceiptsProofSpent(dbw DatabaseWriter, cxp *types.CXReceiptsProof) error { - shardID := cxp.MerkleProof.ShardID - blockNum := cxp.MerkleProof.BlockNum.Uint64() - return dbw.Put(cxReceiptSpentKey(shardID, blockNum), []byte{SpentByte}) -} - -// DeleteCXReceiptsProofSpent removes unspent indicator of a given blockHash -func DeleteCXReceiptsProofSpent(db DatabaseDeleter, shardID uint32, number uint64) { - if err := db.Delete(cxReceiptSpentKey(shardID, number)); err != nil { - utils.Logger().Error().Msg("Failed to delete receipts unspent indicator") - } -} - -// ReadCXReceiptsProofUnspentCheckpoint returns the last unspent blocknumber -func ReadCXReceiptsProofUnspentCheckpoint(db DatabaseReader, shardID uint32) (uint64, error) { - by, err := db.Get(cxReceiptUnspentCheckpointKey(shardID)) - if err != nil { - return 0, ctxerror.New("[ReadCXReceiptsProofUnspent] Cannot Unspent Checkpoint", "shardID", shardID).WithCause(err) - } - lastCheckpoint := binary.BigEndian.Uint64(by[:]) - return lastCheckpoint, nil -} - -// WriteCXReceiptsProofUnspentCheckpoint check whether a CXReceiptsProof is unspent, true means not spent -func WriteCXReceiptsProofUnspentCheckpoint(db DatabaseWriter, shardID uint32, blockNum uint64) error { - by := make([]byte, 8) - binary.BigEndian.PutUint64(by[:], blockNum) - return db.Put(cxReceiptUnspentCheckpointKey(shardID), by) -} - -// ReadValidatorInformation retrieves staking validator by its address -func ReadValidatorInformation(db DatabaseReader, addr common.Address) (*staking.ValidatorWrapper, error) { - data, err := db.Get(validatorKey(addr)) - if err != nil || len(data) == 0 { - utils.Logger().Info().Err(err).Msg("ReadValidatorInformation") - return nil, err - } - v := staking.ValidatorWrapper{} - if err := rlp.DecodeBytes(data, &v); err != nil { - utils.Logger().Error().Err(err).Str("address", addr.Hex()).Msg("Unable to Decode staking validator from database") - return nil, err - } - return &v, nil -} - -// WriteValidatorData stores validator's information by its address -func WriteValidatorData(db DatabaseWriter, v *staking.ValidatorWrapper) error { - bytes, err := rlp.EncodeToBytes(v) - if err != nil { - utils.Logger().Error().Msg("[WriteValidatorData] Failed to encode") - return err - } - if err := db.Put(validatorKey(v.Address), bytes); err != nil { - utils.Logger().Error().Msg("[WriteValidatorData] Failed to store to database") - return err - } - return err -} - -// ReadValidatorSnapshot retrieves validator's snapshot by its address -func ReadValidatorSnapshot( - db DatabaseReader, addr common.Address, -) (*staking.ValidatorWrapper, error) { - data, err := db.Get(validatorSnapshotKey(addr)) - if err != nil || len(data) == 0 { - utils.Logger().Info().Err(err).Msg("ReadValidatorSnapshot") - return nil, err - } - v := staking.ValidatorWrapper{} - if err := rlp.DecodeBytes(data, &v); err != nil { - utils.Logger().Error().Err(err). - Str("address", addr.Hex()). - Msg("Unable to decode validator snapshot from database") - return nil, err - } - return &v, nil -} - -// WriteValidatorSnapshot stores validator's snapshot by its address -func WriteValidatorSnapshot(db DatabaseWriter, v *staking.ValidatorWrapper) error { - bytes, err := rlp.EncodeToBytes(v) - if err != nil { - utils.Logger().Error().Msg("[WriteValidatorSnapshot] Failed to encode") - return err - } - if err := db.Put(validatorSnapshotKey(v.Address), bytes); err != nil { - utils.Logger().Error().Msg("[WriteValidatorSnapshot] Failed to store to database") - return err - } - return err -} - -// ReadValidatorStats retrieves validator's stats by its address -func ReadValidatorStats( - db DatabaseReader, addr common.Address, -) (*staking.ValidatorStats, error) { - data, err := db.Get(validatorStatsKey(addr)) - if err != nil || len(data) == 0 { - utils.Logger().Info().Err(err).Msg("ReadValidatorStats") - return nil, err - } - stats := staking.ValidatorStats{} - if err := rlp.DecodeBytes(data, &stats); err != nil { - utils.Logger().Error().Err(err). - Str("address", addr.Hex()). - Msg("Unable to decode validator stats from database") - return nil, err - } - return &stats, nil -} - -// WriteValidatorStats stores validator's stats by its address -func WriteValidatorStats( - db DatabaseWriter, addr common.Address, stats *staking.ValidatorStats, -) error { - bytes, err := rlp.EncodeToBytes(stats) - if err != nil { - utils.Logger().Error().Msg("[WriteValidatorStats] Failed to encode") - return err - } - if err := db.Put(validatorStatsKey(addr), bytes); err != nil { - utils.Logger().Error().Msg("[WriteValidatorStats] Failed to store to database") - return err - } - return err -} - -// DeleteValidatorSnapshot removes the validator's snapshot by its address -func DeleteValidatorSnapshot(db DatabaseDeleter, addr common.Address) { - if err := db.Delete(validatorSnapshotKey(addr)); err != nil { - utils.Logger().Error().Msg("Failed to delete snapshot of a validator") - } -} - -// ReadValidatorList retrieves staking validator by its address -// Return only active validators if activeOnly==true, otherwise, return all validators -func ReadValidatorList(db DatabaseReader, activeOnly bool) ([]common.Address, error) { - key := validatorListKey - if activeOnly { - key = activeValidatorListKey - } - data, err := db.Get(key) - if err != nil || len(data) == 0 { - return []common.Address{}, nil - } - addrs := []common.Address{} - if err := rlp.DecodeBytes(data, &addrs); err != nil { - utils.Logger().Error().Err(err).Msg("Unable to Decode validator List from database") - return nil, err - } - return addrs, nil -} - -// WriteValidatorList stores staking validator's information by its address -// Writes only for active validators if activeOnly==true, otherwise, writes for all validators -func WriteValidatorList(db DatabaseWriter, addrs []common.Address, activeOnly bool) error { - key := validatorListKey - if activeOnly { - key = activeValidatorListKey - } - - bytes, err := rlp.EncodeToBytes(addrs) - if err != nil { - utils.Logger().Error().Msg("[WriteValidatorList] Failed to encode") - } - if err := db.Put(key, bytes); err != nil { - utils.Logger().Error().Msg("[WriteValidatorList] Failed to store to database") - } - return err -} - -// ReadDelegationsByDelegator retrieves the list of validators delegated by a delegator -func ReadDelegationsByDelegator(db DatabaseReader, delegator common.Address) ([]staking.DelegationIndex, error) { - data, err := db.Get(delegatorValidatorListKey(delegator)) - if err != nil || len(data) == 0 { - return []staking.DelegationIndex{}, nil - } - addrs := []staking.DelegationIndex{} - if err := rlp.DecodeBytes(data, &addrs); err != nil { - utils.Logger().Error().Err(err).Msg("Unable to Decode delegations from database") - return nil, err - } - return addrs, nil -} - -// WriteDelegationsByDelegator stores the list of validators delegated by a delegator -func WriteDelegationsByDelegator(db DatabaseWriter, delegator common.Address, indices []staking.DelegationIndex) error { - bytes, err := rlp.EncodeToBytes(indices) - if err != nil { - utils.Logger().Error().Msg("[writeDelegationsByDelegator] Failed to encode") - } - if err := db.Put(delegatorValidatorListKey(delegator), bytes); err != nil { - utils.Logger().Error().Msg("[writeDelegationsByDelegator] Failed to store to database") - } - return err -} - -// ReadBlockRewardAccumulator .. -func ReadBlockRewardAccumulator(db DatabaseReader, number uint64) (*big.Int, error) { - data, err := db.Get(blockRewardAccumKey(number)) - if err != nil { - return nil, err - } - return new(big.Int).SetBytes(data), nil -} - -// WriteBlockRewardAccumulator .. -func WriteBlockRewardAccumulator(db DatabaseWriter, newAccum *big.Int, number uint64) error { - return db.Put(blockRewardAccumKey(number), newAccum.Bytes()) -} diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index 637ee830c..d9801c6bb 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -145,11 +145,11 @@ func TestBlockStorage(t *testing.T) { } else if types.DeriveSha(types.Transactions(entry.Transactions())) != types.DeriveSha(block.Transactions()) || types.CalcUncleHash(entry.Uncles()) != types.CalcUncleHash(block.Uncles()) { t.Fatalf("Retrieved body mismatch: have %v, want %v", entry, block.Body()) } - if actual, err := ReadEpochBlockNumber(db, big.NewInt(0)); err != nil { - t.Fatalf("Genesis epoch block number not found, error=%#v", err) - } else if expected := big.NewInt(0); actual.Cmp(expected) != 0 { - t.Fatalf("Genesis epoch block number mismatch: have %v, want %v", actual, expected) - } + //if actual, err := ReadEpochBlockNumber(db, big.NewInt(0)); err != nil { + // t.Fatalf("Genesis epoch block number not found, error=%#v", err) + //} else if expected := big.NewInt(0); actual.Cmp(expected) != 0 { + // t.Fatalf("Genesis epoch block number mismatch: have %v, want %v", actual, expected) + //} //if actual, err := ReadEpochBlockNumber(db, big.NewInt(1)); err != nil { // t.Fatalf("Next epoch block number not found, error=%#v", err) //} else if expected := big.NewInt(1); actual.Cmp(expected) != 0 { diff --git a/core/rawdb/accessors_offchain.go b/core/rawdb/accessors_offchain.go new file mode 100644 index 000000000..f700c37bf --- /dev/null +++ b/core/rawdb/accessors_offchain.go @@ -0,0 +1,362 @@ +package rawdb + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/ctxerror" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/shard" + staking "github.com/harmony-one/harmony/staking/types" +) + +// ReadShardState retrieves shard state of a specific epoch. +func ReadShardState( + db DatabaseReader, epoch *big.Int, +) (*shard.State, error) { + data, err := db.Get(shardStateKey(epoch)) + if err != nil { + return nil, ctxerror.New(MsgNoShardStateFromDB, + "epoch", epoch, + ).WithCause(err) + } + ss, err2 := shard.DecodeWrapper(data) + if err2 != nil { + return nil, ctxerror.New("cannot decode sharding state", + "epoch", epoch, + ).WithCause(err2) + } + return ss, nil +} + +// WriteShardStateBytes stores sharding state into database. +func WriteShardStateBytes(db DatabaseWriter, epoch *big.Int, data []byte) (err error) { + if err = db.Put(shardStateKey(epoch), data); err != nil { + return ctxerror.New("cannot write sharding state", + "epoch", epoch, + ).WithCause(err) + } + utils.Logger().Info().Str("epoch", epoch.String()).Int("size", len(data)).Msg("wrote sharding state") + return nil +} + +// ReadLastCommits retrieves the commit signatures on the current block of blockchain. +func ReadLastCommits(db DatabaseReader) ([]byte, error) { + var data []byte + data, err := db.Get(lastCommitsKey) + if err != nil { + return nil, ctxerror.New("cannot read last commits from rawdb").WithCause(err) + } + return data, nil +} + +// WriteLastCommits stores the commit signatures collected on the newly confirmed block into database. +func WriteLastCommits( + db DatabaseWriter, data []byte, +) (err error) { + if err = db.Put(lastCommitsKey, data); err != nil { + return ctxerror.New("cannot write last commits").WithCause(err) + } + utils.Logger().Info(). + Int("size", len(data)). + Msg("wrote last commits") + return nil +} + +// ReadCrossLinkShardBlock retrieves the blockHash given shardID and blockNum +func ReadCrossLinkShardBlock(db DatabaseReader, shardID uint32, blockNum uint64) ([]byte, error) { + return db.Get(crosslinkKey(shardID, blockNum)) +} + +// WriteCrossLinkShardBlock stores the blockHash given shardID and blockNum +func WriteCrossLinkShardBlock(db DatabaseWriter, shardID uint32, blockNum uint64, data []byte) error { + return db.Put(crosslinkKey(shardID, blockNum), data) +} + +// DeleteCrossLinkShardBlock deletes the blockHash given shardID and blockNum +func DeleteCrossLinkShardBlock(db DatabaseDeleter, shardID uint32, blockNum uint64) error { + return db.Delete(crosslinkKey(shardID, blockNum)) +} + +// ReadShardLastCrossLink read the last cross link of a shard +func ReadShardLastCrossLink(db DatabaseReader, shardID uint32) ([]byte, error) { + return db.Get(shardLastCrosslinkKey(shardID)) +} + +// WriteShardLastCrossLink stores the last cross link of a shard +func WriteShardLastCrossLink(db DatabaseWriter, shardID uint32, data []byte) error { + return db.Put(shardLastCrosslinkKey(shardID), data) +} + +// ReadPendingCrossLinks retrieves last pending crosslinks. +func ReadPendingCrossLinks(db DatabaseReader) ([]byte, error) { + return db.Get(pendingCrosslinkKey) +} + +// WritePendingCrossLinks stores last pending crosslinks into database. +func WritePendingCrossLinks(db DatabaseWriter, bytes []byte) error { + return db.Put(pendingCrosslinkKey, bytes) +} + +// DeletePendingCrossLinks stores last pending crosslinks into database. +func DeletePendingCrossLinks(db DatabaseDeleter) error { + return db.Delete(pendingCrosslinkKey) +} + +// ReadPendingSlashingCandidates retrieves last pending slashing candidates +func ReadPendingSlashingCandidates(db DatabaseReader) ([]byte, error) { + return db.Get(pendingSlashingKey) +} + +// WritePendingSlashingCandidates stores last pending slashing candidates into database. +func WritePendingSlashingCandidates(db DatabaseWriter, bytes []byte) error { + return db.Put(pendingSlashingKey, bytes) +} + +// DeletePendingSlashingCandidates stores last pending slashing candidates into database. +func DeletePendingSlashingCandidates(db DatabaseDeleter) error { + return db.Delete(pendingSlashingKey) +} + +// ReadCXReceipts retrieves all the transactions of receipts given destination shardID, number and blockHash +func ReadCXReceipts(db DatabaseReader, shardID uint32, number uint64, hash common.Hash) (types.CXReceipts, error) { + data, err := db.Get(cxReceiptKey(shardID, number, hash)) + if err != nil || len(data) == 0 { + utils.Logger().Info().Err(err).Uint64("number", number).Int("dataLen", len(data)).Msg("ReadCXReceipts") + return nil, err + } + cxReceipts := types.CXReceipts{} + if err := rlp.DecodeBytes(data, &cxReceipts); err != nil { + return nil, err + } + return cxReceipts, nil +} + +// WriteCXReceipts stores all the transaction receipts given destination shardID, blockNumber and blockHash +func WriteCXReceipts(db DatabaseWriter, shardID uint32, number uint64, hash common.Hash, receipts types.CXReceipts) error { + bytes, err := rlp.EncodeToBytes(receipts) + if err != nil { + utils.Logger().Error().Msg("[WriteCXReceipts] Failed to encode cross shard tx receipts") + } + // Store the receipt slice + if err := db.Put(cxReceiptKey(shardID, number, hash), bytes); err != nil { + utils.Logger().Error().Msg("[WriteCXReceipts] Failed to store cxreceipts") + } + return err +} + +// ReadCXReceiptsProofSpent check whether a CXReceiptsProof is unspent +func ReadCXReceiptsProofSpent(db DatabaseReader, shardID uint32, number uint64) (byte, error) { + data, err := db.Get(cxReceiptSpentKey(shardID, number)) + if err != nil || len(data) == 0 { + return NAByte, ctxerror.New("[ReadCXReceiptsProofSpent] Cannot find the key", "shardID", shardID, "number", number).WithCause(err) + } + return data[0], nil +} + +// WriteCXReceiptsProofSpent write CXReceiptsProof as spent into database +func WriteCXReceiptsProofSpent(dbw DatabaseWriter, cxp *types.CXReceiptsProof) error { + shardID := cxp.MerkleProof.ShardID + blockNum := cxp.MerkleProof.BlockNum.Uint64() + return dbw.Put(cxReceiptSpentKey(shardID, blockNum), []byte{SpentByte}) +} + +// DeleteCXReceiptsProofSpent removes unspent indicator of a given blockHash +func DeleteCXReceiptsProofSpent(db DatabaseDeleter, shardID uint32, number uint64) { + if err := db.Delete(cxReceiptSpentKey(shardID, number)); err != nil { + utils.Logger().Error().Msg("Failed to delete receipts unspent indicator") + } +} + +// ReadValidatorSnapshot retrieves validator's snapshot by its address +func ReadValidatorSnapshot( + db DatabaseReader, addr common.Address, epoch *big.Int, +) (*staking.ValidatorWrapper, error) { + data, err := db.Get(validatorSnapshotKey(addr, epoch)) + if err != nil || len(data) == 0 { + utils.Logger().Info().Err(err).Msg("ReadValidatorSnapshot") + return nil, err + } + v := staking.ValidatorWrapper{} + if err := rlp.DecodeBytes(data, &v); err != nil { + utils.Logger().Error().Err(err). + Str("address", addr.Hex()). + Msg("Unable to decode validator snapshot from database") + return nil, err + } + return &v, nil +} + +// WriteValidatorSnapshot stores validator's snapshot by its address +func WriteValidatorSnapshot(batch DatabaseWriter, v *staking.ValidatorWrapper, epoch *big.Int) error { + bytes, err := rlp.EncodeToBytes(v) + if err != nil { + utils.Logger().Error().Msg("[WriteValidatorSnapshot] Failed to encode") + return err + } + if err := batch.Put(validatorSnapshotKey(v.Address, epoch), bytes); err != nil { + utils.Logger().Error().Msg("[WriteValidatorSnapshot] Failed to store to database") + return err + } + return err +} + +// DeleteValidatorSnapshot removes the validator's snapshot by its address +func DeleteValidatorSnapshot(db DatabaseDeleter, addr common.Address, epoch *big.Int) { + if err := db.Delete(validatorSnapshotKey(addr, epoch)); err != nil { + utils.Logger().Error().Msg("Failed to delete snapshot of a validator") + } +} + +// ReadValidatorStats retrieves validator's stats by its address +func ReadValidatorStats( + db DatabaseReader, addr common.Address, +) (*staking.ValidatorStats, error) { + data, err := db.Get(validatorStatsKey(addr)) + if err != nil || len(data) == 0 { + utils.Logger().Info().Err(err).Msg("ReadValidatorStats") + return nil, err + } + stats := staking.ValidatorStats{} + if err := rlp.DecodeBytes(data, &stats); err != nil { + utils.Logger().Error().Err(err). + Str("address", addr.Hex()). + Msg("Unable to decode validator stats from database") + return nil, err + } + return &stats, nil +} + +// WriteValidatorStats stores validator's stats by its address +func WriteValidatorStats( + batch DatabaseWriter, addr common.Address, stats *staking.ValidatorStats, +) error { + bytes, err := rlp.EncodeToBytes(stats) + if err != nil { + utils.Logger().Error().Msg("[WriteValidatorStats] Failed to encode") + return err + } + if err := batch.Put(validatorStatsKey(addr), bytes); err != nil { + utils.Logger().Error().Msg("[WriteValidatorStats] Failed to store to database") + return err + } + return err +} + +// ReadValidatorList retrieves staking validator by its address +// Return only active validators if activeOnly==true, otherwise, return all validators +func ReadValidatorList(db DatabaseReader, activeOnly bool) ([]common.Address, error) { + key := validatorListKey + if activeOnly { + key = activeValidatorListKey + } + data, err := db.Get(key) + if err != nil || len(data) == 0 { + return []common.Address{}, nil + } + addrs := []common.Address{} + if err := rlp.DecodeBytes(data, &addrs); err != nil { + utils.Logger().Error().Err(err).Msg("Unable to Decode validator List from database") + return nil, err + } + return addrs, nil +} + +// WriteValidatorList stores staking validator's information by its address +// Writes only for active validators if activeOnly==true, otherwise, writes for all validators +func WriteValidatorList(db DatabaseWriter, addrs []common.Address, activeOnly bool) error { + key := validatorListKey + if activeOnly { + key = activeValidatorListKey + } + + bytes, err := rlp.EncodeToBytes(addrs) + if err != nil { + utils.Logger().Error().Msg("[WriteValidatorList] Failed to encode") + } + if err := db.Put(key, bytes); err != nil { + utils.Logger().Error().Msg("[WriteValidatorList] Failed to store to database") + } + return err +} + +// ReadDelegationsByDelegator retrieves the list of validators delegated by a delegator +func ReadDelegationsByDelegator(db DatabaseReader, delegator common.Address) ([]staking.DelegationIndex, error) { + data, err := db.Get(delegatorValidatorListKey(delegator)) + if err != nil || len(data) == 0 { + return []staking.DelegationIndex{}, nil + } + addrs := []staking.DelegationIndex{} + if err := rlp.DecodeBytes(data, &addrs); err != nil { + utils.Logger().Error().Err(err).Msg("Unable to Decode delegations from database") + return nil, err + } + return addrs, nil +} + +// WriteDelegationsByDelegator stores the list of validators delegated by a delegator +func WriteDelegationsByDelegator(db DatabaseWriter, delegator common.Address, indices []staking.DelegationIndex) error { + bytes, err := rlp.EncodeToBytes(indices) + if err != nil { + utils.Logger().Error().Msg("[writeDelegationsByDelegator] Failed to encode") + } + if err := db.Put(delegatorValidatorListKey(delegator), bytes); err != nil { + utils.Logger().Error().Msg("[writeDelegationsByDelegator] Failed to store to database") + } + return err +} + +// ReadBlockRewardAccumulator .. +func ReadBlockRewardAccumulator(db DatabaseReader, number uint64) (*big.Int, error) { + data, err := db.Get(blockRewardAccumKey(number)) + if err != nil { + return nil, err + } + return new(big.Int).SetBytes(data), nil +} + +// WriteBlockRewardAccumulator .. +func WriteBlockRewardAccumulator(db DatabaseWriter, newAccum *big.Int, number uint64) error { + return db.Put(blockRewardAccumKey(number), newAccum.Bytes()) +} + +//// Resharding //// + +// ReadEpochBlockNumber retrieves the epoch block number for the given epoch, +// or nil if the given epoch is not found in the database. +func ReadEpochBlockNumber(db DatabaseReader, epoch *big.Int) (*big.Int, error) { + data, err := db.Get(epochBlockNumberKey(epoch)) + if err != nil { + return nil, err + } + return new(big.Int).SetBytes(data), nil +} + +// WriteEpochBlockNumber stores the given epoch-number-to-epoch-block-number in the database. +func WriteEpochBlockNumber(db DatabaseWriter, epoch, blockNum *big.Int) error { + return db.Put(epochBlockNumberKey(epoch), blockNum.Bytes()) +} + +// ReadEpochVrfBlockNums retrieves the VRF block numbers for the given epoch +func ReadEpochVrfBlockNums(db DatabaseReader, epoch *big.Int) ([]byte, error) { + return db.Get(epochVrfBlockNumbersKey(epoch)) +} + +// WriteEpochVrfBlockNums stores the VRF block numbers for the given epoch +func WriteEpochVrfBlockNums(db DatabaseWriter, epoch *big.Int, data []byte) error { + return db.Put(epochVrfBlockNumbersKey(epoch), data) +} + +// ReadEpochVdfBlockNum retrieves the VDF block number for the given epoch +func ReadEpochVdfBlockNum(db DatabaseReader, epoch *big.Int) ([]byte, error) { + return db.Get(epochVdfBlockNumberKey(epoch)) +} + +// WriteEpochVdfBlockNum stores the VDF block number for the given epoch +func WriteEpochVdfBlockNum(db DatabaseWriter, epoch *big.Int, data []byte) error { + return db.Put(epochVdfBlockNumberKey(epoch), data) +} + +//// Resharding //// diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index b11ce640d..3fc2df222 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -39,9 +39,6 @@ var ( // headFastBlockKey tracks the latest known incomplete block's hash duirng fast sync. headFastBlockKey = []byte("LastFast") - // fastTrieProgressKey tracks the number of trie entries imported during fast sync. - fastTrieProgressKey = []byte("TrieSync") - // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td @@ -228,22 +225,10 @@ func cxReceiptSpentKey(shardID uint32, number uint64) []byte { return append(tmp, encodeBlockNumber(number)...) } -// cxReceiptUnspentCheckpointKey = cxReceiptsUnspentCheckpointPrefix + shardID -func cxReceiptUnspentCheckpointKey(shardID uint32) []byte { - prefix := cxReceiptUnspentCheckpointPrefix - sKey := make([]byte, 4) - binary.BigEndian.PutUint32(sKey, shardID) - return append(prefix, sKey...) -} - -func validatorKey(addr common.Address) []byte { - prefix := validatorPrefix - return append(prefix, addr.Bytes()...) -} - -func validatorSnapshotKey(addr common.Address) []byte { +func validatorSnapshotKey(addr common.Address, epoch *big.Int) []byte { prefix := validatorSnapshotPrefix - return append(prefix, addr.Bytes()...) + tmp := append(prefix, addr.Bytes()...) + return append(tmp, epoch.Bytes()...) } func validatorStatsKey(addr common.Address) []byte { diff --git a/internal/hmyapi/apiv1/blockchain.go b/internal/hmyapi/apiv1/blockchain.go index d9dc145a5..dc16b3f9e 100644 --- a/internal/hmyapi/apiv1/blockchain.go +++ b/internal/hmyapi/apiv1/blockchain.go @@ -737,7 +737,7 @@ func (s *PublicBlockChainAPI) GetDelegationByDelegatorAndValidator(ctx context.C func doEstimateGas(ctx context.Context, b Backend, args CallArgs, gasCap *big.Int) (hexutil.Uint64, error) { // Binary search the gas requirement, as it may be higher than the amount used var ( - lo uint64 = params.TxGas - 1 + lo = params.TxGas - 1 hi uint64 cap uint64 ) diff --git a/internal/hmyapi/apiv2/blockchain.go b/internal/hmyapi/apiv2/blockchain.go index 84377316a..ee68e2aaf 100644 --- a/internal/hmyapi/apiv2/blockchain.go +++ b/internal/hmyapi/apiv2/blockchain.go @@ -702,7 +702,7 @@ func (s *PublicBlockChainAPI) GetDelegationByDelegatorAndValidator(ctx context.C func doEstimateGas(ctx context.Context, b Backend, args CallArgs, gasCap *big.Int) (hexutil.Uint64, error) { // Binary search the gas requirement, as it may be higher than the amount used var ( - lo uint64 = params.TxGas - 1 + lo = params.TxGas - 1 hi uint64 cap uint64 )