1) Removed unused worker (#4512)

2) Proper error checking
3) Tests for gas 30m
pull/4518/head
Konstantin 1 year ago committed by GitHub
parent 018c33693a
commit d8f122538b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      api/service/legacysync/syncing.go
  2. 7
      api/service/stagedsync/syncing.go
  3. 2
      core/state_processor.go
  4. 10
      core/state_transition.go
  5. 4
      core/state_transition_test.go
  6. 2
      node/node.go
  7. 17
      node/node_newblock.go
  8. 13
      node/node_syncing.go
  9. 7
      node/worker/types.go
  10. 110
      node/worker/worker.go
  11. 10
      node/worker/worker_test.go

@ -25,7 +25,6 @@ import (
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
@ -932,7 +931,7 @@ func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain
}
// generateNewState will construct most recent state from downloaded blocks
func (ss *StateSync) generateNewState(bc core.BlockChain, worker *worker.Worker) error {
func (ss *StateSync) generateNewState(bc core.BlockChain) error {
// update blocks created before node start sync
parentHash := bc.CurrentBlock().Hash()
@ -995,7 +994,7 @@ func (ss *StateSync) generateNewState(bc core.BlockChain, worker *worker.Worker)
}
// ProcessStateSync processes state sync from the blocks received but not yet processed so far
func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.BlockChain, worker *worker.Worker) error {
func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.BlockChain) error {
// Gets consensus hashes.
if err := ss.getConsensusHashes(startHash, size); err != nil {
return errors.Wrap(err, "getConsensusHashes")
@ -1005,7 +1004,7 @@ func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.Blo
if ss.stateSyncTaskQueue.Len() > 0 {
ss.downloadBlocks(bc)
}
return ss.generateNewState(bc, worker)
return ss.generateNewState(bc)
}
func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error {
@ -1076,7 +1075,7 @@ func (ss *StateSync) GetMaxPeerHeight() (uint64, error) {
}
// SyncLoop will keep syncing with peers until catches up
func (ss *StateSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) {
func (ss *StateSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) {
utils.Logger().Info().Msgf("legacy sync is executing ...")
if !isBeacon {
ss.RegisterNodeInfo()
@ -1110,7 +1109,7 @@ func (ss *StateSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeaco
if size > SyncLoopBatchSize {
size = SyncLoopBatchSize
}
err := ss.ProcessStateSync(startHash[:], size, bc, worker)
err := ss.ProcessStateSync(startHash[:], size, bc)
if err != nil {
utils.Logger().Error().Err(err).
Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",

@ -11,7 +11,6 @@ import (
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
@ -163,7 +162,7 @@ func initDB(ctx context.Context, db kv.RwDB) error {
}
// SyncLoop will keep syncing with peers until catches up
func (s *StagedSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) {
func (s *StagedSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) {
utils.Logger().Info().
Uint64("current height", bc.CurrentBlock().NumberU64()).
@ -204,7 +203,7 @@ func (s *StagedSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeaco
}
startTime := time.Now()
if err := s.runSyncCycle(bc, worker, isBeacon, consensus, maxPeersHeight); err != nil {
if err := s.runSyncCycle(bc, isBeacon, consensus, maxPeersHeight); err != nil {
utils.Logger().Error().
Err(err).
Bool("isBeacon", isBeacon).
@ -266,7 +265,7 @@ func (s *StagedSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeaco
}
// runSyncCycle will run one cycle of staged syncing
func (s *StagedSync) runSyncCycle(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, maxPeersHeight uint64) error {
func (s *StagedSync) runSyncCycle(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, maxPeersHeight uint64) error {
canRunCycleInOneTransaction := s.MaxBlocksPerSyncCycle > 0 && s.MaxBlocksPerSyncCycle <= s.MaxMemSyncCycleSize
var tx kv.RwTx
if canRunCycleInOneTransaction {

@ -364,7 +364,7 @@ func ApplyStakingTransaction(
vmenv := vm.NewEVM(context, statedb, config, cfg)
// Apply the transaction to the current state (included in the env)
gas, err = ApplyStakingMessage(vmenv, msg, gp, bc)
gas, err = ApplyStakingMessage(vmenv, msg, gp)
if err != nil {
return nil, 0, err
}

@ -76,7 +76,6 @@ type StateTransition struct {
data []byte
state vm.StateDB
evm *vm.EVM
bc ChainContext
}
// Message represents a message sent to a contract.
@ -131,7 +130,7 @@ func (result *ExecutionResult) Revert() []byte {
}
// NewStateTransition initialises and returns a new state transition object.
func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext) *StateTransition {
func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool) *StateTransition {
return &StateTransition{
gp: gp,
evm: evm,
@ -140,7 +139,6 @@ func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext)
value: msg.Value(),
data: msg.Data(),
state: evm.StateDB,
bc: bc,
}
}
@ -152,12 +150,12 @@ func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext)
// indicates a core error meaning that the message would always fail for that particular
// state and would never be accepted within a block.
func ApplyMessage(evm *vm.EVM, msg Message, gp *GasPool) (ExecutionResult, error) {
return NewStateTransition(evm, msg, gp, nil).TransitionDb()
return NewStateTransition(evm, msg, gp).TransitionDb()
}
// ApplyStakingMessage computes the new state for staking message
func ApplyStakingMessage(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext) (uint64, error) {
return NewStateTransition(evm, msg, gp, bc).StakingTransitionDb()
func ApplyStakingMessage(evm *vm.EVM, msg Message, gp *GasPool) (uint64, error) {
return NewStateTransition(evm, msg, gp).StakingTransitionDb()
}
// to returns the recipient of the message.

@ -76,7 +76,7 @@ func testApplyStakingMessage(test applyStakingMessageTest, t *testing.T) {
vmenv := vm.NewEVM(ctx, db, params.TestChainConfig, vm.Config{})
// run the staking tx
_, err := ApplyStakingMessage(vmenv, msg, gp, chain)
_, err := ApplyStakingMessage(vmenv, msg, gp)
if err != nil {
if test.expectedError == nil {
t.Errorf(fmt.Sprintf("Got error %v but expected none", err))
@ -193,7 +193,7 @@ func TestCollectGasRounding(t *testing.T) {
vmenv := vm.NewEVM(ctx, db, params.TestChainConfig, vm.Config{})
gasPool := new(GasPool).AddGas(math.MaxUint64)
st := NewStateTransition(vmenv, msg, gasPool, nil)
st := NewStateTransition(vmenv, msg, gasPool)
// buy gas to set initial gas to 5: gasLimit * gasPrice
if err := st.buyGas(); err != nil {
t.Fatal(err)

@ -89,7 +89,7 @@ type ISync interface {
AddLastMileBlock(block *types.Block)
GetActivePeerNumber() int
CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error
SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration)
SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration)
IsSynchronized() bool
IsSameBlockchainHeight(bc core.BlockChain) (uint64, bool)
AddNewBlock(peerHash []byte, block *types.Block)

@ -1,7 +1,6 @@
package node
import (
"errors"
"sort"
"strings"
"time"
@ -9,6 +8,7 @@ import (
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/pkg/errors"
staking "github.com/harmony-one/harmony/staking/types"
@ -116,16 +116,18 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
utils.AnalysisStart("ProposeNewBlock", nowEpoch, blockNow)
defer utils.AnalysisEnd("ProposeNewBlock", nowEpoch, blockNow)
node.Worker.UpdateCurrent()
header := node.Worker.GetCurrentHeader()
// Update worker's current header and
// state data in preparation to propose/process new transactions
leaderKey := node.Consensus.GetLeaderPubKey()
env, err := node.Worker.UpdateCurrent()
if err != nil {
return nil, errors.Wrap(err, "failed to update worker")
}
var (
header = env.CurrentHeader()
leaderKey = node.Consensus.GetLeaderPubKey()
coinbase = node.GetAddressForBLSKey(leaderKey.Object, header.Epoch())
beneficiary = coinbase
err error
)
// After staking, all coinbase will be the address of bls pub key
@ -134,8 +136,7 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
coinbase.SetBytes(blsPubKeyBytes[:])
}
emptyAddr := common.Address{}
if coinbase == emptyAddr {
if coinbase == (common.Address{}) {
return nil, errors.New("[ProposeNewBlock] Failed setting coinbase")
}

@ -29,7 +29,6 @@ import (
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
)
@ -269,7 +268,7 @@ func (node *Node) doBeaconSyncing() {
}
// DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up
func (node *Node) DoSyncing(bc core.BlockChain, worker *worker.Worker, willJoinConsensus bool) {
func (node *Node) DoSyncing(bc core.BlockChain, willJoinConsensus bool) {
if node.NodeConfig.IsOffline {
return
}
@ -280,15 +279,15 @@ func (node *Node) DoSyncing(bc core.BlockChain, worker *worker.Worker, willJoinC
for {
select {
case <-ticker.C:
node.doSync(bc, worker, willJoinConsensus)
node.doSync(bc, willJoinConsensus)
case <-node.Consensus.BlockNumLowChan:
node.doSync(bc, worker, willJoinConsensus)
node.doSync(bc, willJoinConsensus)
}
}
}
// doSync keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up
func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinConsensus bool) {
func (node *Node) doSync(bc core.BlockChain, willJoinConsensus bool) {
syncInstance := node.SyncInstance()
if syncInstance.GetActivePeerNumber() < legacysync.NumPeersLowBound {
@ -317,7 +316,7 @@ func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinCons
node.Consensus.BlocksNotSynchronized()
}
isBeacon := bc.ShardID() == shard.BeaconChainShardID
syncInstance.SyncLoop(bc, worker, isBeacon, node.Consensus, legacysync.LoopMinTime)
syncInstance.SyncLoop(bc, isBeacon, node.Consensus, legacysync.LoopMinTime)
if willJoinConsensus {
node.IsSynchronized.Set()
node.Consensus.BlocksSynchronized()
@ -388,7 +387,7 @@ func (node *Node) supportSyncing() {
utils.Logger().Debug().Msg("[SYNC] initialized state for staged sync")
}
go node.DoSyncing(node.Blockchain(), node.Worker, joinConsensus)
go node.DoSyncing(node.Blockchain(), joinConsensus)
}
// InitSyncingServer starts downloader server.

@ -0,0 +1,7 @@
package worker
import "github.com/harmony-one/harmony/block"
type Environment interface {
CurrentHeader() *block.Header
}

@ -7,10 +7,8 @@ import (
"sort"
"time"
"github.com/harmony-one/harmony/consensus/reward"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/reward"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/crypto/hash"
@ -50,6 +48,10 @@ type environment struct {
stakeMsgs []staking.StakeMsg
}
func (env *environment) CurrentHeader() *block.Header {
return env.header
}
// Worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type Worker struct {
@ -62,6 +64,40 @@ type Worker struct {
gasCeil uint64
}
// New create a new worker object.
func New(
chain core.BlockChain, beacon core.BlockChain,
) *Worker {
worker := newWorker(chain.Config(), chain, beacon)
parent := chain.CurrentBlock().Header()
num := parent.Number()
timestamp := time.Now().Unix()
epoch := GetNewEpoch(chain)
header := blockfactory.NewFactory(chain.Config()).NewHeader(epoch).With().
ParentHash(parent.Hash()).
Number(num.Add(num, common.Big1)).
GasLimit(worker.GasFloor(epoch)). //core.CalcGasLimit(parent, worker.gasFloor, worker.gasCeil)).
Time(big.NewInt(timestamp)).
ShardID(chain.ShardID()).
Header()
worker.makeCurrent(parent, header)
return worker
}
func newWorker(config *params.ChainConfig, chain, beacon core.BlockChain) *Worker {
return &Worker{
config: config,
factory: blockfactory.NewFactory(config),
chain: chain,
beacon: beacon,
gasFloor: 80000000,
gasCeil: 120000000,
}
}
// CommitSortedTransactions commits transactions for new block.
func (w *Worker) CommitSortedTransactions(
txs *types.TransactionsByPriceAndNonce,
@ -290,16 +326,16 @@ func (w *Worker) CommitReceipts(receiptsList []*types.CXReceiptsProof) error {
}
// UpdateCurrent updates the current environment with the current state and header.
func (w *Worker) UpdateCurrent() error {
func (w *Worker) UpdateCurrent() (Environment, error) {
parent := w.chain.CurrentHeader()
num := parent.Number()
timestamp := time.Now().Unix()
epoch := w.GetNewEpoch()
epoch := GetNewEpoch(w.chain)
header := w.factory.NewHeader(epoch).With().
ParentHash(parent.Hash()).
Number(num.Add(num, common.Big1)).
GasLimit(core.CalcGasLimit(parent, w.GasFloor(epoch), w.gasCeil)).
GasLimit(core.CalcGasLimit(parent, w.GasFloor(epoch), w.GasCeil())).
Time(big.NewInt(timestamp)).
ShardID(w.chain.ShardID()).
Header()
@ -312,20 +348,28 @@ func (w *Worker) GetCurrentHeader() *block.Header {
}
// makeCurrent creates a new environment for the current cycle.
func (w *Worker) makeCurrent(parent *block.Header, header *block.Header) error {
state, err := w.chain.StateAt(parent.Root())
func (w *Worker) makeCurrent(parent *block.Header, header *block.Header) (*environment, error) {
env, err := makeEnvironment(w.chain, parent, header)
if err != nil {
return err
return nil, err
}
w.current = env
return w.current, nil
}
func makeEnvironment(chain core.BlockChain, parent *block.Header, header *block.Header) (*environment, error) {
state, err := chain.StateAt(parent.Root())
if err != nil {
return nil, err
}
env := &environment{
signer: types.NewEIP155Signer(w.config.ChainID),
ethSigner: types.NewEIP155Signer(w.config.EthCompatibleChainID),
signer: types.NewEIP155Signer(chain.Config().ChainID),
ethSigner: types.NewEIP155Signer(chain.Config().EthCompatibleChainID),
state: state,
header: header,
}
w.current = env
return nil
return env, nil
}
// GetCurrentResult gets the current block processing result.
@ -347,14 +391,14 @@ func (w *Worker) GetCurrentState() *state.DB {
}
// GetNewEpoch gets the current epoch.
func (w *Worker) GetNewEpoch() *big.Int {
parent := w.chain.CurrentBlock()
func GetNewEpoch(chain core.BlockChain) *big.Int {
parent := chain.CurrentBlock()
epoch := new(big.Int).Set(parent.Header().Epoch())
shardState, err := parent.Header().GetShardState()
if err == nil &&
shardState.Epoch != nil &&
w.config.IsStaking(shardState.Epoch) {
chain.Config().IsStaking(shardState.Epoch) {
// For shard state of staking epochs, the shard state will
// have an epoch and it will decide the next epoch for following blocks
epoch = new(big.Int).Set(shardState.Epoch)
@ -563,38 +607,8 @@ func (w *Worker) FinalizeNewBlock(
return block, nil
}
// New create a new worker object.
func New(
chain core.BlockChain, beacon core.BlockChain,
) *Worker {
worker := &Worker{
config: chain.Config(),
factory: blockfactory.NewFactory(chain.Config()),
chain: chain,
beacon: beacon,
}
worker.gasFloor = 80000000
worker.gasCeil = 120000000
parent := worker.chain.CurrentBlock().Header()
num := parent.Number()
timestamp := time.Now().Unix()
epoch := worker.GetNewEpoch()
header := worker.factory.NewHeader(epoch).With().
ParentHash(parent.Hash()).
Number(num.Add(num, common.Big1)).
GasLimit(worker.gasFloor). //core.CalcGasLimit(parent, worker.gasFloor, worker.gasCeil)).
Time(big.NewInt(timestamp)).
ShardID(worker.chain.ShardID()).
Header()
worker.makeCurrent(parent, header)
return worker
}
func (w *Worker) GasFloor(epoch *big.Int) uint64 {
if w.chain.Config().IsBlockGas30M(epoch) {
if w.config.IsBlockGas30M(epoch) {
return 30_000_000
}

@ -6,6 +6,7 @@ import (
"testing"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
@ -100,3 +101,12 @@ func TestCommitTransactions(t *testing.T) {
t.Error("Transaction is not committed")
}
}
func TestGasLimit(t *testing.T) {
w := newWorker(
&params.ChainConfig{
BlockGas30MEpoch: big.NewInt(10),
}, nil, nil)
require.EqualValues(t, 80_000_000, w.GasFloor(big.NewInt(3)))
require.EqualValues(t, 30_000_000, w.GasFloor(big.NewInt(10)))
}

Loading…
Cancel
Save