diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index beaf2d97a..8e2a930d8 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -88,7 +88,6 @@ func setUpTXGen() *node.Node { shardID := *shardIDFlag selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: peerPubKey} - gsif, err := consensus.NewGenesisStakeInfoFinder() // Nodes containing blockchain data to mirror the shards' data in the network myhost, err := p2pimpl.NewHost(&selfPeer, nodePriKey) @@ -103,7 +102,6 @@ func setUpTXGen() *node.Node { chainDBFactory := &shardchain.MemDBFactory{} txGen := node.New(myhost, consensusObj, chainDBFactory, false) //Changed it : no longer archival node. txGen.Client = client.NewClient(txGen.GetHost(), uint32(shardID)) - consensusObj.SetStakeInfoFinder(gsif) consensusObj.ChainReader = txGen.Blockchain() consensusObj.PublicKeys = nil genesisShardingConfig := core.ShardingSchedule.InstanceForEpoch(big.NewInt(core.GenesisEpoch)) diff --git a/cmd/client/wallet/main.go b/cmd/client/wallet/main.go index c5640b207..f375bdf85 100644 --- a/cmd/client/wallet/main.go +++ b/cmd/client/wallet/main.go @@ -10,14 +10,15 @@ import ( "math/rand" "os" "path" + "regexp" "sync" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/fatih/color" ffi_bls "github.com/harmony-one/bls/ffi/go/bls" + "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/accounts" "github.com/harmony-one/harmony/accounts/keystore" "github.com/harmony-one/harmony/api/client" @@ -520,7 +521,6 @@ func showAllBalances(sender, receiver string, fromS, toS int) { } } } - } func processBalancesCommand() { @@ -532,6 +532,13 @@ func processBalancesCommand() { showAllBalances("", "", -1, -1) } else { address := common2.ParseAddr(*balanceAddressPtr) + valid, errorMessage := validateAddress(*balanceAddressPtr, address, "") + + if !valid && len(errorMessage) > 0 { + fmt.Println(errorMessage) + return + } + fmt.Printf("Account: %s:\n", common2.MustAddressToBech32(address)) for shardID, balanceNonce := range FetchBalance(address) { if balanceNonce != nil { @@ -553,6 +560,12 @@ func formatAddressCommand() { fmt.Println("Please specify the --address to show formats for.") } else { address := common2.ParseAddr(*formatAddressPtr) + valid, errorMessage := validateAddress(*formatAddressPtr, address, "") + + if !valid && len(errorMessage) > 0 { + fmt.Println(errorMessage) + return + } fmt.Printf("account address in Bech32: %s\n", common2.MustAddressToBech32(address)) fmt.Printf("account address in Base16 (deprecated): %s\n", address.Hex()) @@ -665,6 +678,13 @@ func processGetFreeToken() { fmt.Println("Error: --address is required") } else { address := common2.ParseAddr(*freeTokenAddressPtr) + valid, errorMessage := validateAddress(*freeTokenAddressPtr, address, "") + + if !valid && len(errorMessage) > 0 { + fmt.Println(errorMessage) + return + } + GetFreeToken(address) } } @@ -690,24 +710,34 @@ func processTransferCommand() { return } - if shardID == -1 || toShardID == -1 { - fmt.Println("Please specify the shard ID for the transfer (e.g. --shardID=0)") + if !validShard(shardID, walletProfile.Shards) { + fmt.Println("Please specify a valid sender shard ID for the transfer (e.g. --shardID=0)") + return + } + + if !validShard(toShardID, walletProfile.Shards) { + fmt.Println("Please specify a valid receiver shard ID for the transfer (e.g. --toShardID=0)") return } + if amount <= 0 { fmt.Println("Please specify positive amount to transfer") return } - receiverAddress := common2.ParseAddr(receiver) - if len(receiverAddress) != 20 { - fmt.Println("The receiver address is not valid.") + senderAddress := common2.ParseAddr(sender) + valid, errorMessage := validateAddress(sender, senderAddress, "sender") + + if !valid && len(errorMessage) > 0 { + fmt.Println(errorMessage) return } - senderAddress := common2.ParseAddr(sender) - if len(senderAddress) != 20 { - fmt.Println("The sender address is not valid.") + receiverAddress := common2.ParseAddr(receiver) + valid, errorMessage = validateAddress(receiver, receiverAddress, "receiver") + + if !valid && len(errorMessage) > 0 { + fmt.Println(errorMessage) return } @@ -956,3 +986,32 @@ func submitTransaction(tx *types.Transaction, walletNode *node.Node, shardID uin return nil } + +var ( + addressValidationRegexp = regexp.MustCompile(`(?i)^(one[a-zA-Z0-9]{39})|(0x[a-fA-F0-9]{40})`) +) + +func validateAddress(address string, commonAddress common.Address, addressType string) (bool, string) { + var valid = true + var errorMessage string + + if len(addressType) > 0 { + addressType = fmt.Sprintf("%s ", addressType) + } + + matches := addressValidationRegexp.FindAllStringSubmatch(address, -1) + if len(matches) == 0 || len(commonAddress) != 20 { + valid = false + errorMessage = fmt.Sprintf("The %saddress you supplied (%s) is in an invalid format. Please provide a valid address.", addressType, address) + } + + return valid, errorMessage +} + +func validShard(shardID int, shardCount int) bool { + if shardID < 0 || shardID > (shardCount-1) { + return false + } + + return true +} diff --git a/cmd/client/wallet/validation_test.go b/cmd/client/wallet/validation_test.go new file mode 100644 index 000000000..8b2eb3659 --- /dev/null +++ b/cmd/client/wallet/validation_test.go @@ -0,0 +1,51 @@ +package main + +import ( + "testing" + + "github.com/harmony-one/harmony/internal/common" +) + +func TestIsValidAddress(t *testing.T) { + tests := []struct { + str string + exp bool + }{ + {"one1ay37rp2pc3kjarg7a322vu3sa8j9puahg679z3", true}, + {"0x7c41E0668B551f4f902cFaec05B5Bdca68b124CE", true}, + {"onefoofoo", false}, + {"0xbarbar", false}, + {"dsasdadsasaadsas", false}, + {"32312123213213212321", false}, + } + + for _, test := range tests { + valid, _ := validateAddress(test.str, common.ParseAddr(test.str), "sender") + + if valid != test.exp { + t.Errorf("validateAddress(\"%s\") returned %v, expected %v", test.str, valid, test.exp) + } + } +} + +func TestIsValidShard(t *testing.T) { + readProfile("local") + + tests := []struct { + shardID int + exp bool + }{ + {0, true}, + {1, true}, + {-1, false}, + {99, false}, + } + + for _, test := range tests { + valid := validShard(test.shardID, walletProfile.Shards) + + if valid != test.exp { + t.Errorf("validShard(%d) returned %v, expected %v", test.shardID, valid, test.exp) + } + } +} diff --git a/cmd/staking/root.go b/cmd/staking/root.go index f28bc4fb9..325b01d9a 100644 --- a/cmd/staking/root.go +++ b/cmd/staking/root.go @@ -57,29 +57,29 @@ func (s *staker) run(cmd *cobra.Command, args []string) error { p.DeserializeHexStr(testBLSPubKey) pub := shard.BlsPublicKey{} pub.FromLibBLSPublicKey(p) - return staking.DirectiveNewValidator, staking.NewValidator{ - Description: staking.Description{ - Name: "something", - Identity: "something else", - Website: "some site, harmony.one", - SecurityContact: "mr.smith", - Details: "blah blah details", - }, - CommissionRates: staking.CommissionRates{ - Rate: staking.NewDec(100), - MaxRate: staking.NewDec(150), - MaxChangeRate: staking.NewDec(5), - }, - MinSelfDelegation: big.NewInt(10), - StakingAddress: common.Address(dAddr), - PubKey: pub, - Amount: big.NewInt(100), - } - // return message.DirectiveDelegate, message.Delegate{ - // common.Address(dAddr), - // common.Address(dAddr), - // big.NewInt(10), + // return staking.DirectiveNewValidator, staking.NewValidator{ + // Description: staking.Description{ + // Name: "something", + // Identity: "something else", + // Website: "some site, harmony.one", + // SecurityContact: "mr.smith", + // Details: "blah blah details", + // }, + // CommissionRates: staking.CommissionRates{ + // Rate: staking.NewDec(100), + // MaxRate: staking.NewDec(150), + // MaxChangeRate: staking.NewDec(5), + // }, + // MinSelfDelegation: big.NewInt(10), + // StakingAddress: common.Address(dAddr), + // PubKey: pub, + // Amount: big.NewInt(100), // } + return staking.DirectiveDelegate, staking.Delegate{ + common.Address(dAddr), + common.Address(dAddr), + big.NewInt(10), + } } stakingTx, err := staking.NewStakingTransaction(2, 100, gasPrice, stakePayloadMaker) diff --git a/consensus/README.md b/consensus/README.md index 1ce48a4ac..f5e4a7c23 100644 --- a/consensus/README.md +++ b/consensus/README.md @@ -1,12 +1,17 @@ -Consensus package includes the Harmony BFT consensus protocol code, which uses BLS-based multi-signature to cosign the new block. The details are in Harmony's new [consensus protocol design](https://talk.harmony.one/t/bls-based-practical-bft-consensus/131). +Consensus package includes the Harmony BFT consensus protocol code, which uses BLS-based +multi-signature to cosign the new block. The details are +in Harmony's new [consensus protocol design](https://talk.harmony.one/t/bls-based-practical-bft-consensus/131). ## Introduction to Harmony BFT with BLS signatures -Harmony BFT consensus protocol consist of normal mode and view changing mode which is same as the PBFT(practical byzantine fault tolerance) protocol. The difference is we use the BLS aggregated signature to reduce O(N^2) communications to O(N), which is more efficient and scalable to traditional PBFT. For brevity, we will still call the whole process as PBFT. +Harmony BFT consensus protocol consist of normal mode and view changing mode which is same +as the PBFT(practical byzantine fault tolerance) protocol. The difference is we use the +BLS aggregated signature to reduce O(N^2) communications to O(N), which is more efficient +and scalable to traditional PBFT. For brevity, we will still call the whole process as PBFT. ### Normal mode -To reach the consensus of the next block, there are 3 phases: announce(i.e. pre-prepare in PBFT), prepare and commit. +To reach the consensus of the next block, there are 3 phases: announce(i.e. pre-prepare in PBFT), prepare and commit. * Announce(leader): The leader broadcasts ANNOUNCE message along with candidate of the next block. * Prepare(validator): The validator will validate the block sent by leader and send PREPARE message; if the block is invalid, the validator will propose view change. If the prepare timeout, the validator will also propose view change. @@ -42,8 +47,8 @@ type PbftLog struct { messages []*PbftMessage } -// entry point and main loop; -// in each loop, the node will receive PBFT message from peers with timeout, +// entry point and main loop; +// in each loop, the node will receive PBFT message from peers with timeout, // then update its state accordingly. handleMessageUpdate function handles various kinds of messages and update its state // it will also send new PBFT message (if not null) to its peers. // in the same loop, the node will also check whether it should send view changing message to new leader @@ -75,27 +80,9 @@ func (consensus *Consensus) Start(stopChan chan struct{}, stoppedChan chan struc case <-stopChan: return } - } + } } ``` - - - - - - - - - - - - - - - - - - diff --git a/consensus/consensus.go b/consensus/consensus.go index 6a8c3a674..b932d6813 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -8,19 +8,13 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/harmony-one/bls/ffi/go/bls" - - "github.com/harmony-one/harmony/contracts/structs" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" bls_cosi "github.com/harmony-one/harmony/crypto/bls" - "github.com/harmony-one/harmony/internal/ctxerror" - "github.com/harmony-one/harmony/internal/genesis" "github.com/harmony-one/harmony/internal/memprofiling" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" - "github.com/harmony-one/harmony/shard" ) const ( @@ -150,9 +144,6 @@ type Consensus struct { // MessageSender takes are of sending consensus message and the corresponding retry logic. msgSender *MessageSender - // Staking information finder - stakeInfoFinder StakeInfoFinder - // Used to convey to the consensus main loop that block syncing has finished. syncReadyChan chan struct{} // Used to convey to the consensus main loop that node is out of sync @@ -171,18 +162,6 @@ func (consensus *Consensus) SetCommitDelay(delay time.Duration) { consensus.delayCommit = delay } -// StakeInfoFinder returns the stake information finder instance this -// consensus uses, e.g. for block reward distribution. -func (consensus *Consensus) StakeInfoFinder() StakeInfoFinder { - return consensus.stakeInfoFinder -} - -// SetStakeInfoFinder sets the stake information finder instance this -// consensus uses, e.g. for block reward distribution. -func (consensus *Consensus) SetStakeInfoFinder(stakeInfoFinder StakeInfoFinder) { - consensus.stakeInfoFinder = stakeInfoFinder -} - // DisableViewChangeForTestingOnly makes the receiver not propose view // changes when it should, e.g. leader timeout. // @@ -235,19 +214,6 @@ func (consensus *Consensus) GetBlockReward() *big.Int { return consensus.lastBlockReward } -// StakeInfoFinder finds the staking account for the given consensus key. -type StakeInfoFinder interface { - // FindStakeInfoByNodeKey returns a list of staking information matching - // the given node key. Caller may modify the returned slice of StakeInfo - // struct pointers, but must not modify the StakeInfo structs themselves. - FindStakeInfoByNodeKey(key *bls.PublicKey) []*structs.StakeInfo - - // FindStakeInfoByAccount returns a list of staking information matching - // the given account. Caller may modify the returned slice of StakeInfo - // struct pointers, but must not modify the StakeInfo structs themselves. - FindStakeInfoByAccount(addr common.Address) []*structs.StakeInfo -} - // New creates a new Consensus object // TODO: put shardId into chain reader's chain config func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKey) (*Consensus, error) { @@ -255,19 +221,15 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe consensus.host = host consensus.msgSender = NewMessageSender(host) consensus.blockNumLowChan = make(chan struct{}) - // pbft related consensus.PbftLog = NewPbftLog() consensus.phase = Announce consensus.mode = PbftMode{mode: Normal} // pbft timeout consensus.consensusTimeout = createTimeout() - consensus.prepareSigs = map[string]*bls.Sign{} consensus.commitSigs = map[string]*bls.Sign{} - consensus.CommitteePublicKeys = make(map[string]bool) - consensus.validators.Store(leader.ConsensusPubKey.SerializeToHexStr(), leader) if blsPriKey != nil { @@ -283,90 +245,15 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe // as it was displayed on explorer as Height right now consensus.viewID = 0 consensus.ShardID = ShardID - consensus.MsgChan = make(chan []byte) consensus.syncReadyChan = make(chan struct{}) consensus.syncNotReadyChan = make(chan struct{}) consensus.commitFinishChan = make(chan uint64) - consensus.ReadySignal = make(chan struct{}) consensus.lastBlockReward = big.NewInt(0) - // channel for receiving newly generated VDF consensus.RndChannel = make(chan [vdfAndSeedSize]byte) - consensus.uniqueIDInstance = utils.GetUniqueValidatorIDInstance() - memprofiling.GetMemProfiling().Add("consensus.pbftLog", consensus.PbftLog) return &consensus, nil } - -// GenesisStakeInfoFinder is a stake info finder implementation using only -// genesis accounts. -// When used for block reward, it rewards only foundational nodes. -type GenesisStakeInfoFinder struct { - byNodeKey map[shard.BlsPublicKey][]*structs.StakeInfo - byAccount map[common.Address][]*structs.StakeInfo -} - -// FindStakeInfoByNodeKey returns the genesis account matching the given node -// key, as a single-item StakeInfo list. -// It returns nil if the key is not a genesis node key. -func (f *GenesisStakeInfoFinder) FindStakeInfoByNodeKey( - key *bls.PublicKey, -) []*structs.StakeInfo { - var pk shard.BlsPublicKey - if err := pk.FromLibBLSPublicKey(key); err != nil { - utils.Logger().Warn().Err(err).Msg("cannot convert BLS public key") - return nil - } - l, _ := f.byNodeKey[pk] - return l -} - -// FindStakeInfoByAccount returns the genesis account matching the given -// address, as a single-item StakeInfo list. -// It returns nil if the address is not a genesis account. -func (f *GenesisStakeInfoFinder) FindStakeInfoByAccount( - addr common.Address, -) []*structs.StakeInfo { - l, _ := f.byAccount[addr] - return l -} - -// NewGenesisStakeInfoFinder returns a stake info finder that can look up -// genesis nodes. -func NewGenesisStakeInfoFinder() (*GenesisStakeInfoFinder, error) { - f := &GenesisStakeInfoFinder{ - byNodeKey: make(map[shard.BlsPublicKey][]*structs.StakeInfo), - byAccount: make(map[common.Address][]*structs.StakeInfo), - } - for idx, account := range genesis.HarmonyAccounts { - pub := &bls.PublicKey{} - pub.DeserializeHexStr(account.BlsPublicKey) - var blsPublicKey shard.BlsPublicKey - if err := blsPublicKey.FromLibBLSPublicKey(pub); err != nil { - return nil, ctxerror.New("cannot convert BLS public key", - "accountIndex", idx, - ).WithCause(err) - } - addressBytes, err := hexutil.Decode(account.Address) - if err != nil { - return nil, ctxerror.New("cannot decode account address", - "accountIndex", idx, - ).WithCause(err) - } - var address common.Address - address.SetBytes(addressBytes) - stakeInfo := &structs.StakeInfo{ - Account: address, - BlsPublicKey: blsPublicKey, - BlockNum: common.Big0, - LockPeriodCount: big.NewInt(0x7fffffffffffffff), - Amount: common.Big0, - } - f.byNodeKey[blsPublicKey] = append(f.byNodeKey[blsPublicKey], stakeInfo) - f.byAccount[address] = append(f.byAccount[address], stakeInfo) - } - return f, nil -} diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index f930bd2c1..a2a4e038c 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -1269,6 +1269,7 @@ func (consensus *Consensus) GenerateVdfAndProof(newBlock *types.Block, vrfBlockN Int("Num of VRF", len(vrfBlockNumbers)). Msg("[ConsensusMainLoop] VDF computation started") + // TODO ek – limit concurrency go func() { vdf := vdf_go.New(core.ShardingSchedule.VdfDifficulty(), seed) outputChannel := vdf.GetOutputChannel() diff --git a/consensus/engine/consensus_engine.go b/consensus/engine/consensus_engine.go index 00cf6ac7f..f6c6cecb5 100644 --- a/consensus/engine/consensus_engine.go +++ b/consensus/engine/consensus_engine.go @@ -4,12 +4,12 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/harmony-one/harmony/internal/params" - "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/params" "github.com/harmony-one/harmony/shard" + staking "github.com/harmony-one/harmony/staking/types" ) // ChainReader defines a small collection of methods needed to access the local @@ -77,8 +77,12 @@ type Engine interface { // and assembles the final block. // Note: The block header and state database might be updated to reflect any // consensus rules that happen at finalization (e.g. block rewards). - Finalize(chain ChainReader, header *block.Header, state *state.DB, txs []*types.Transaction, - receipts []*types.Receipt, outcxs []*types.CXReceipt, incxs []*types.CXReceiptsProof) (*types.Block, error) + Finalize( + chain ChainReader, header *block.Header, state *state.DB, + txs []*types.Transaction, + stkgTxs []*staking.StakingTransaction, + receipts []*types.Receipt, outcxs []*types.CXReceipt, + incxs []*types.CXReceiptsProof) (*types.Block, error) // Seal generates a new sealing request for the given input block and pushes // the result into the given channel. diff --git a/contracts/structs/structs.go b/contracts/structs/structs.go deleted file mode 100644 index 5a2641b24..000000000 --- a/contracts/structs/structs.go +++ /dev/null @@ -1,35 +0,0 @@ -package structs - -import ( - "math/big" - - "github.com/harmony-one/harmony/shard" - - "github.com/ethereum/go-ethereum/common" -) - -// StakeInfoReturnValue is the struct for the return value of listLockedAddresses func in stake contract. -type StakeInfoReturnValue struct { - LockedAddresses []common.Address - BlsPubicKeys1 [][32]byte - BlsPubicKeys2 [][32]byte - BlsPubicKeys3 [][32]byte // TODO: remove third part as know we use 48 bytes pub key - BlockNums []*big.Int - LockPeriodCounts []*big.Int // The number of locking period the token will be locked. - Amounts []*big.Int -} - -// StakeInfo stores the staking information for a staker. -type StakeInfo struct { - Account common.Address - BlsPublicKey shard.BlsPublicKey - BlockNum *big.Int - LockPeriodCount *big.Int // The number of locking period the token will be locked. - Amount *big.Int -} - -// PlayersInfo stores the result of getPlayers. -type PlayersInfo struct { - Players []common.Address - Balances []*big.Int -} diff --git a/core/block_validator.go b/core/block_validator.go index fcf4cc8c3..a293e460c 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -24,11 +24,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/harmony-one/harmony/block" - "github.com/harmony-one/harmony/internal/ctxerror" - consensus_engine "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/core/values" + "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/params" ) @@ -58,7 +58,7 @@ func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engin func (v *BlockValidator) ValidateBody(block *types.Block) error { // Check whether the block's known, and if not, that it's linkable if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) { - return ErrKnownBlock + return values.ErrKnownBlock } if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) { diff --git a/core/blockchain.go b/core/blockchain.go index 1118de59e..677f1f86d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -35,19 +35,18 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" - "github.com/harmony-one/harmony/internal/params" - lru "github.com/hashicorp/golang-lru" - "github.com/harmony-one/harmony/block" consensus_engine "github.com/harmony-one/harmony/consensus/engine" - "github.com/harmony-one/harmony/contracts/structs" "github.com/harmony-one/harmony/core/rawdb" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/core/values" "github.com/harmony-one/harmony/core/vm" "github.com/harmony-one/harmony/internal/ctxerror" + "github.com/harmony-one/harmony/internal/params" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/shard" + lru "github.com/hashicorp/golang-lru" ) var ( @@ -1250,7 +1249,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty err = bc.Validator().ValidateBody(block) } switch { - case err == ErrKnownBlock: + case err == values.ErrKnownBlock: // Block and state both already known. However if the current block is below // this number we did a rollback and we should reimport it nonetheless. if bc.CurrentBlock().NumberU64() >= block.NumberU64() { @@ -1909,15 +1908,12 @@ func (bc *BlockChain) GetVrfByNumber(number uint64) []byte { // GetShardState returns the shard state for the given epoch, // creating one if needed. -func (bc *BlockChain) GetShardState( - epoch *big.Int, - stakeInfo *map[common.Address]*structs.StakeInfo, -) (shard.State, error) { +func (bc *BlockChain) GetShardState(epoch *big.Int) (shard.State, error) { shardState, err := bc.ReadShardState(epoch) if err == nil { // TODO ek – distinguish ErrNotFound return shardState, err } - shardState, err = CalculateNewShardState(bc, epoch, stakeInfo) + shardState, err = CalculateNewShardState(bc, epoch) if err != nil { return nil, err } diff --git a/core/chain_makers.go b/core/chain_makers.go index 7b6a89bde..963f82d8a 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -23,15 +23,15 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" - blockfactory "github.com/harmony-one/harmony/block/factory" - "github.com/harmony-one/harmony/internal/params" - "github.com/harmony-one/harmony/block" + blockfactory "github.com/harmony-one/harmony/block/factory" consensus_engine "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" + "github.com/harmony-one/harmony/internal/params" "github.com/harmony-one/harmony/shard" + staking "github.com/harmony-one/harmony/staking/types" ) // BlockGen creates blocks for testing. @@ -46,6 +46,7 @@ type BlockGen struct { gasPool *GasPool txs []*types.Transaction + stkTxs staking.StakingTransactions receipts []*types.Receipt uncles []*block.Header @@ -184,9 +185,10 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse if gen != nil { gen(i, b) } + if b.engine != nil { // Finalize and seal the block - block, err := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.receipts, nil, nil) + block, err := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.stkTxs, b.receipts, nil, nil) if err != nil { panic(err) } diff --git a/core/gaspool.go b/core/gaspool.go index e3795c1ee..a2f291d7f 100644 --- a/core/gaspool.go +++ b/core/gaspool.go @@ -19,6 +19,8 @@ package core import ( "fmt" "math" + + "github.com/harmony-one/harmony/core/values" ) // GasPool tracks the amount of gas available during execution of the transactions @@ -38,7 +40,7 @@ func (gp *GasPool) AddGas(amount uint64) *GasPool { // available and returns an error otherwise. func (gp *GasPool) SubGas(amount uint64) error { if uint64(*gp) < amount { - return ErrGasLimitReached + return values.ErrGasLimitReached } *(*uint64)(gp) -= amount return nil diff --git a/staking/types/decimal.go b/core/numeric/decimal.go similarity index 99% rename from staking/types/decimal.go rename to core/numeric/decimal.go index 890308de8..263651633 100644 --- a/staking/types/decimal.go +++ b/core/numeric/decimal.go @@ -1,4 +1,4 @@ -package types +package numeric import ( "encoding/json" diff --git a/staking/types/decimal_test.go b/core/numeric/decimal_test.go similarity index 99% rename from staking/types/decimal_test.go rename to core/numeric/decimal_test.go index 5475c379f..a1a4fcafe 100644 --- a/staking/types/decimal_test.go +++ b/core/numeric/decimal_test.go @@ -1,4 +1,4 @@ -package types +package numeric import ( "math/big" diff --git a/core/resharding.go b/core/resharding.go index dfb8e104b..2686549f7 100644 --- a/core/resharding.go +++ b/core/resharding.go @@ -9,8 +9,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/bls/ffi/go/bls" - - "github.com/harmony-one/harmony/contracts/structs" common2 "github.com/harmony-one/harmony/internal/common" shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" "github.com/harmony-one/harmony/internal/ctxerror" @@ -158,10 +156,7 @@ func GetShardingStateFromBlockChain(bc *BlockChain, epoch *big.Int) (*ShardingSt } // CalculateNewShardState get sharding state from previous epoch and calculate sharding state for new epoch -func CalculateNewShardState( - bc *BlockChain, epoch *big.Int, - stakeInfo *map[common.Address]*structs.StakeInfo, -) (shard.State, error) { +func CalculateNewShardState(bc *BlockChain, epoch *big.Int) (shard.State, error) { if epoch.Cmp(big.NewInt(GenesisEpoch)) == 0 { return CalculateInitShardState(), nil } @@ -171,42 +166,10 @@ func CalculateNewShardState( return nil, ctxerror.New("cannot retrieve previous sharding state"). WithCause(err) } - newNodeList := ss.UpdateShardingState(stakeInfo) utils.Logger().Info().Float64("percentage", CuckooRate).Msg("Cuckoo Rate") - ss.Reshard(newNodeList, CuckooRate) return ss.shardState, nil } -// UpdateShardingState remove the unstaked nodes and returns the newly staked node Ids. -func (ss *ShardingState) UpdateShardingState(stakeInfo *map[common.Address]*structs.StakeInfo) []shard.NodeID { - oldBlsPublicKeys := make(map[shard.BlsPublicKey]bool) // map of bls public keys - for _, shard := range ss.shardState { - newNodeList := shard.NodeList - for _, nodeID := range shard.NodeList { - oldBlsPublicKeys[nodeID.BlsPublicKey] = true - _, ok := (*stakeInfo)[nodeID.EcdsaAddress] - if ok { - // newNodeList = append(newNodeList, nodeID) - } else { - // TODO: Remove the node if it's no longer staked - } - } - shard.NodeList = newNodeList - } - - newAddresses := []shard.NodeID{} - for addr, info := range *stakeInfo { - _, ok := oldBlsPublicKeys[info.BlsPublicKey] - if !ok { - newAddresses = append(newAddresses, shard.NodeID{ - EcdsaAddress: addr, - BlsPublicKey: info.BlsPublicKey, - }) - } - } - return newAddresses -} - // TODO ek – shardingSchedule should really be part of a general-purpose network // configuration. We are OK for the time being, // until the day we should let one node process join multiple networks. diff --git a/core/state_processor.go b/core/state_processor.go index b64d5452a..f8acbb208 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -21,7 +21,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/harmony-one/harmony/block" consensus_engine "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core/state" @@ -30,6 +29,7 @@ import ( "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/params" "github.com/harmony-one/harmony/internal/utils" + staking "github.com/harmony-one/harmony/staking/types" ) // StateProcessor is a basic Processor, which takes care of transitioning @@ -94,7 +94,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.DB, cfg vm.C } // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) - _, err := p.engine.Finalize(p.bc, header, statedb, block.Transactions(), receipts, outcxs, incxs) + _, err := p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.StakingTransactions(), receipts, outcxs, incxs) if err != nil { return nil, nil, nil, 0, ctxerror.New("cannot finalize block").WithCause(err) } @@ -180,6 +180,18 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo return receipt, cxReceipt, gas, err } +// ApplyStakingTransaction attempts to apply a staking transaction to the given state database +// and uses the input parameters for its environment. It returns the receipt +// for the staking transaction, gas used and an error if the transaction failed, +// indicating the block was invalid. +// staking transaction will use the code field in the account to store the staking information +// TODO chao: Add receipts for staking tx +func ApplyStakingTransaction( + config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.DB, + header *block.Header, tx *staking.StakingTransaction, usedGas *uint64, cfg vm.Config) (receipt *types.Receipt, gasUsed uint64, oops error) { + return nil, 0, nil +} + // ApplyIncomingReceipt will add amount into ToAddress in the receipt func ApplyIncomingReceipt(config *params.ChainConfig, db *state.DB, header *block.Header, cxp *types.CXReceiptsProof) error { if cxp == nil { @@ -197,7 +209,7 @@ func ApplyIncomingReceipt(config *params.ChainConfig, db *state.DB, header *bloc db.CreateAccount(*cx.To) } db.AddBalance(*cx.To, cx.Amount) - db.IntermediateRoot(config.IsS3(header.Epoch())).Bytes() + db.IntermediateRoot(config.IsS3(header.Epoch())) } return nil } diff --git a/core/state_transition.go b/core/state_transition.go index c75865d42..b9c8925f2 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -22,9 +22,9 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/harmony-one/harmony/internal/params" - + "github.com/harmony-one/harmony/core/values" "github.com/harmony-one/harmony/core/vm" + "github.com/harmony-one/harmony/internal/params" "github.com/harmony-one/harmony/internal/utils" ) @@ -171,9 +171,9 @@ func (st *StateTransition) preCheck() error { nonce := st.state.GetNonce(st.msg.From()) if nonce < st.msg.Nonce() { - return ErrNonceTooHigh + return values.ErrNonceTooHigh } else if nonce > st.msg.Nonce() { - return ErrNonceTooLow + return values.ErrNonceTooLow } } return st.buyGas() diff --git a/core/tx_pool.go b/core/tx_pool.go index c1da5ca2e..cea4131f6 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -18,7 +18,6 @@ package core import ( "context" - "errors" "fmt" "math" "math/big" @@ -30,11 +29,11 @@ import ( "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/metrics" - "github.com/harmony-one/harmony/internal/params" - "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/core/values" + "github.com/harmony-one/harmony/internal/params" "github.com/harmony-one/harmony/internal/utils" ) @@ -43,44 +42,6 @@ const ( chainHeadChanSize = 10 ) -var ( - // ErrInvalidSender is returned if the transaction contains an invalid signature. - ErrInvalidSender = errors.New("invalid sender") - - // ErrNonceTooLow is returned if the nonce of a transaction is lower than the - // one present in the local chain. - ErrNonceTooLow = errors.New("nonce too low") - - // ErrUnderpriced is returned if a transaction's gas price is below the minimum - // configured for the transaction pool. - ErrUnderpriced = errors.New("transaction underpriced") - - // ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced - // with a different one without the required price bump. - ErrReplaceUnderpriced = errors.New("replacement transaction underpriced") - - // ErrInsufficientFunds is returned if the total cost of executing a transaction - // is higher than the balance of the user's account. - ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value") - - // ErrIntrinsicGas is returned if the transaction is specified to use less gas - // than required to start the invocation. - ErrIntrinsicGas = errors.New("intrinsic gas too low") - - // ErrGasLimit is returned if a transaction's requested gas limit exceeds the - // maximum allowance of the current block. - ErrGasLimit = errors.New("exceeds block gas limit") - - // ErrNegativeValue is a sanity error to ensure noone is able to specify a - // transaction with a negative value. - ErrNegativeValue = errors.New("negative value") - - // ErrOversizedData is returned if the input data of a transaction is greater - // than some meaningful limit a user might use. This is not a consensus error - // making the transaction invalid, rather a DOS protection. - ErrOversizedData = errors.New("oversized data") -) - var ( evictionInterval = time.Minute // Time interval to check for evictable transactions statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats @@ -602,42 +563,42 @@ func (pool *TxPool) local() map[common.Address]types.Transactions { func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // Heuristic limit, reject transactions over 32KB to prevent DOS attacks if tx.Size() > 32*1024 { - return ErrOversizedData + return values.ErrOversizedData } // Transactions can't be negative. This may never happen using RLP decoded // transactions but may occur if you create a transaction using the RPC. if tx.Value().Sign() < 0 { - return ErrNegativeValue + return values.ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. if pool.currentMaxGas < tx.Gas() { - return ErrGasLimit + return values.ErrGasLimit } // Make sure the transaction is signed properly from, err := types.Sender(pool.signer, tx) if err != nil { - return ErrInvalidSender + return values.ErrInvalidSender } // Drop non-local transactions under our own minimal accepted gas price local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { - return ErrUnderpriced + return values.ErrUnderpriced } // Ensure the transaction adheres to nonce ordering if pool.currentState.GetNonce(from) > tx.Nonce() { - return ErrNonceTooLow + return values.ErrNonceTooLow } // Transactor should have enough funds to cover the costs // cost == V + GP * GL if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { - return ErrInsufficientFunds + return values.ErrInsufficientFunds } intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) if err != nil { return err } if tx.Gas() < intrGas { - return ErrIntrinsicGas + return values.ErrIntrinsicGas } return nil } @@ -673,7 +634,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { Str("price", tx.GasPrice().String()). Msg("Discarding underpriced transaction") underpricedTxCounter.Inc(1) - return false, ErrUnderpriced + return false, values.ErrUnderpriced } // New transaction is better than our worse ones, make room for it drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) @@ -693,7 +654,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { pendingDiscardCounter.Inc(1) - return false, ErrReplaceUnderpriced + return false, values.ErrReplaceUnderpriced } // New transaction is better, replace old one if old != nil { @@ -759,7 +720,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er if !inserted { // An older transaction was better, discard this queuedDiscardCounter.Inc(1) - return false, ErrReplaceUnderpriced + return false, values.ErrReplaceUnderpriced } // Discard any previous transaction and mark this if old != nil { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index e6a850fbd..d15cc21bf 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -30,11 +30,11 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" - blockfactory "github.com/harmony-one/harmony/block/factory" "github.com/harmony-one/harmony/common/denominations" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/core/values" "github.com/harmony-one/harmony/internal/params" ) @@ -235,27 +235,27 @@ func TestInvalidTransactions(t *testing.T) { from, _ := deriveSender(tx) pool.currentState.AddBalance(from, big.NewInt(1)) - if err := pool.AddRemote(tx); err != ErrInsufficientFunds { - t.Error("expected", ErrInsufficientFunds) + if err := pool.AddRemote(tx); err != values.ErrInsufficientFunds { + t.Error("expected", values.ErrInsufficientFunds) } balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(new(big.Int).SetUint64(tx.Gas()), tx.GasPrice())) pool.currentState.AddBalance(from, balance) - if err := pool.AddRemote(tx); err != ErrIntrinsicGas { - t.Error("expected", ErrIntrinsicGas, "got", err) + if err := pool.AddRemote(tx); err != values.ErrIntrinsicGas { + t.Error("expected", values.ErrIntrinsicGas, "got", err) } pool.currentState.SetNonce(from, 1) pool.currentState.AddBalance(from, big.NewInt(0xffffffffffffff)) tx = transaction(0, 100000, key) - if err := pool.AddRemote(tx); err != ErrNonceTooLow { - t.Error("expected", ErrNonceTooLow) + if err := pool.AddRemote(tx); err != values.ErrNonceTooLow { + t.Error("expected", values.ErrNonceTooLow) } tx = transaction(1, 100000, key) pool.gasPrice = big.NewInt(1000) - if err := pool.AddRemote(tx); err != ErrUnderpriced { - t.Error("expected", ErrUnderpriced, "got", err) + if err := pool.AddRemote(tx); err != values.ErrUnderpriced { + t.Error("expected", values.ErrUnderpriced, "got", err) } if err := pool.AddLocal(tx); err != nil { t.Error("expected", nil, "got", err) @@ -325,8 +325,8 @@ func TestTransactionNegativeValue(t *testing.T) { tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, 0, big.NewInt(-1), 100, big.NewInt(1), nil), types.HomesteadSigner{}, key) from, _ := deriveSender(tx) pool.currentState.AddBalance(from, big.NewInt(1)) - if err := pool.AddRemote(tx); err != ErrNegativeValue { - t.Error("expected", ErrNegativeValue, "got", err) + if err := pool.AddRemote(tx); err != values.ErrNegativeValue { + t.Error("expected", values.ErrNegativeValue, "got", err) } } diff --git a/core/types/block.go b/core/types/block.go index 3ecd12c30..2b776982b 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -30,10 +30,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" - "github.com/harmony-one/taggedrlp" - "github.com/pkg/errors" - "github.com/rs/zerolog" - "github.com/harmony-one/harmony/block" blockfactory "github.com/harmony-one/harmony/block/factory" v0 "github.com/harmony-one/harmony/block/v0" @@ -42,6 +38,10 @@ import ( "github.com/harmony-one/harmony/crypto/hash" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/shard" + staking "github.com/harmony-one/harmony/staking/types" + "github.com/harmony-one/taggedrlp" + "github.com/pkg/errors" + "github.com/rs/zerolog" ) // Constants for block. @@ -181,7 +181,7 @@ func init() { BodyRegistry.MustRegister("v1", new(BodyV1)) } -// Block represents an entire block in the Ethereum blockchain. +// Block represents an entire block in the Harmony blockchain. type Block struct { header *block.Header uncles []*block.Header @@ -347,6 +347,11 @@ func (b *Block) Transactions() Transactions { return b.transactions } +// StakingTransactions returns stakingTransactions. +func (b *Block) StakingTransactions() staking.StakingTransactions { + return staking.StakingTransactions{} +} + // IncomingReceipts returns verified outgoing receipts func (b *Block) IncomingReceipts() CXReceiptsProofs { return b.incomingReceipts diff --git a/core/values/blockchain.go b/core/values/blockchain.go new file mode 100644 index 000000000..1225e2277 --- /dev/null +++ b/core/values/blockchain.go @@ -0,0 +1,10 @@ +package values + +const ( + // BeaconChainShardID is the ShardID of the BeaconChain + BeaconChainShardID = 0 + // VotingPowerReduceBlockThreshold roughly corresponds to 3 hours + VotingPowerReduceBlockThreshold = 1350 + // VotingPowerFullReduce roughly corresponds to 12 hours + VotingPowerFullReduce = 4 * VotingPowerReduceBlockThreshold +) diff --git a/core/error.go b/core/values/error.go similarity index 51% rename from core/error.go rename to core/values/error.go index 02a9f10b4..80a905984 100644 --- a/core/error.go +++ b/core/values/error.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package core +package values import ( "github.com/pkg/errors" @@ -31,6 +31,10 @@ var ( // ErrBlacklistedHash is returned if a block to import is on the blacklist. ErrBlacklistedHash = errors.New("blacklisted hash") + // ErrNonceTooLow is returned if the nonce of a transaction is lower than the + // one present in the local chain. + ErrNonceTooLow = errors.New("nonce too low") + // ErrNonceTooHigh is returned if the nonce of a transaction is higher than the // next one expected based on the local chain. ErrNonceTooHigh = errors.New("nonce too high") @@ -40,4 +44,36 @@ var ( // ErrInvalidChainID when ChainID of signer does not match that of running node ErrInvalidChainID = errors.New("invalid chain id for signer") + + // ErrInvalidSender is returned if the transaction contains an invalid signature. + ErrInvalidSender = errors.New("invalid sender") + + // ErrUnderpriced is returned if a transaction's gas price is below the minimum + // configured for the transaction pool. + ErrUnderpriced = errors.New("transaction underpriced") + + // ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced + // with a different one without the required price bump. + ErrReplaceUnderpriced = errors.New("replacement transaction underpriced") + + // ErrInsufficientFunds is returned if the total cost of executing a transaction + // is higher than the balance of the user's account. + ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value") + + // ErrIntrinsicGas is returned if the transaction is specified to use less gas + // than required to start the invocation. + ErrIntrinsicGas = errors.New("intrinsic gas too low") + + // ErrGasLimit is returned if a transaction's requested gas limit exceeds the + // maximum allowance of the current block. + ErrGasLimit = errors.New("exceeds block gas limit") + + // ErrNegativeValue is a sanity error to ensure noone is able to specify a + // transaction with a negative value. + ErrNegativeValue = errors.New("negative value") + + // ErrOversizedData is returned if the input data of a transaction is greater + // than some meaningful limit a user might use. This is not a consensus error + // making the transaction invalid, rather a DOS protection. + ErrOversizedData = errors.New("oversized data") ) diff --git a/crypto/vdf/vdf.go b/crypto/vdf/vdf.go index 30792982d..10cc90c07 100644 --- a/crypto/vdf/vdf.go +++ b/crypto/vdf/vdf.go @@ -37,6 +37,7 @@ func (vdf *VDF) Execute() { tempResult = sha3.Sum256(tempResult[:]) } vdf.output = tempResult + // TODO ek – limit concurrency go func() { vdf.outputChan <- vdf.output }() diff --git a/drand/drand_leader.go b/drand/drand_leader.go index 3ec7c10f4..f9d7346be 100644 --- a/drand/drand_leader.go +++ b/drand/drand_leader.go @@ -38,6 +38,7 @@ func (dRand *DRand) WaitForEpochBlock(blockChannel chan *types.Block, stopChan c zeros := [32]byte{} if core.IsEpochBlock(newBlock) && !bytes.Equal(pRnd[:], zeros[:]) { // The epoch block should contain the randomness preimage pRnd + // TODO ek – limit concurrency go func() { vdf := vdf.New(vdfDifficulty, pRnd) outputChannel := vdf.GetOutputChannel() diff --git a/internal/chain/engine.go b/internal/chain/engine.go index c2e90c282..4992a8bff 100644 --- a/internal/chain/engine.go +++ b/internal/chain/engine.go @@ -6,9 +6,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/bls/ffi/go/bls" - "github.com/pkg/errors" - "golang.org/x/crypto/sha3" - "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core" @@ -17,6 +14,9 @@ import ( "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/shard" + staking "github.com/harmony-one/harmony/staking/types" + "github.com/pkg/errors" + "golang.org/x/crypto/sha3" ) type engineImpl struct{} @@ -149,7 +149,11 @@ func (e *engineImpl) VerifySeal(chain engine.ChainReader, header *block.Header) // Finalize implements Engine, accumulating the block rewards, // setting the final state and assembling the block. -func (e *engineImpl) Finalize(chain engine.ChainReader, header *block.Header, state *state.DB, txs []*types.Transaction, receipts []*types.Receipt, outcxs []*types.CXReceipt, incxs []*types.CXReceiptsProof) (*types.Block, error) { +func (e *engineImpl) Finalize( + chain engine.ChainReader, header *block.Header, state *state.DB, txs []*types.Transaction, + stkgTxs []*staking.StakingTransaction, + receipts []*types.Receipt, outcxs []*types.CXReceipt, + incxs []*types.CXReceiptsProof) (*types.Block, error) { // Accumulate any block and uncle rewards and commit the final state root // Header seems complete, assemble into a block and return if err := AccumulateRewards(chain, state, header); err != nil { diff --git a/internal/chain/reward.go b/internal/chain/reward.go index 28b1ebc76..5ee12a212 100644 --- a/internal/chain/reward.go +++ b/internal/chain/reward.go @@ -5,7 +5,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/bls/ffi/go/bls" - "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/common/denominations" "github.com/harmony-one/harmony/consensus/engine" @@ -85,6 +84,7 @@ func AccumulateRewards( accounts = append(accounts, member.EcdsaAddress) } } + numAccounts := big.NewInt(int64(len(accounts))) last := new(big.Int) for i, account := range accounts { diff --git a/internal/hmyapi/filters/api.go b/internal/hmyapi/filters/api.go index d58faa1a7..af989e298 100644 --- a/internal/hmyapi/filters/api.go +++ b/internal/hmyapi/filters/api.go @@ -60,6 +60,7 @@ func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { // timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. // Tt is started when the api is created. func (api *PublicFilterAPI) timeoutLoop() { + // TODO ek – infinite loop; add shutdown/cleanup logic ticker := time.NewTicker(5 * time.Minute) for { <-ticker.C diff --git a/internal/hmyapi/transactionpool.go b/internal/hmyapi/transactionpool.go index 16e236258..236225fed 100644 --- a/internal/hmyapi/transactionpool.go +++ b/internal/hmyapi/transactionpool.go @@ -9,9 +9,9 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/harmony-one/harmony/accounts" - "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/rawdb" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/core/values" internal_common "github.com/harmony-one/harmony/internal/common" staking "github.com/harmony-one/harmony/staking/types" "github.com/pkg/errors" @@ -179,7 +179,7 @@ func (s *PublicTransactionPoolAPI) SendRawStakingTransaction( } c := s.b.ChainConfig().ChainID if tx.ChainID().Cmp(c) != 0 { - e := errors.Wrapf(core.ErrInvalidChainID, "current chain id:%s", c.String()) + e := errors.Wrapf(values.ErrInvalidChainID, "current chain id:%s", c.String()) return common.Hash{}, e } return SubmitStakingTransaction(ctx, s.b, tx) @@ -194,7 +194,7 @@ func (s *PublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, encod } c := s.b.ChainConfig().ChainID if tx.ChainID().Cmp(c) != 0 { - e := errors.Wrapf(core.ErrInvalidChainID, "current chain id:%s", c.String()) + e := errors.Wrapf(values.ErrInvalidChainID, "current chain id:%s", c.String()) return common.Hash{}, e } return SubmitTransaction(ctx, s.b, tx) diff --git a/internal/memprofiling/lib.go b/internal/memprofiling/lib.go index 6820b86a6..7c7bba2fa 100644 --- a/internal/memprofiling/lib.go +++ b/internal/memprofiling/lib.go @@ -79,6 +79,7 @@ func (m *MemProfiling) Stop() { // PeriodicallyScanMemSize scans memsize of the observed objects every 30 seconds. func (m *MemProfiling) PeriodicallyScanMemSize() { go func() { + // TODO ek – infinite loop; add shutdown/cleanup logic for { select { case <-time.After(memSizeScanTime): @@ -98,6 +99,7 @@ func (m *MemProfiling) PeriodicallyScanMemSize() { // MaybeCallGCPeriodically runs GC manually every gcTime minutes. This is one of the options to mitigate the OOM issue. func MaybeCallGCPeriodically() { go func() { + // TODO ek – infinite loop; add shutdown/cleanup logic for { select { case <-time.After(gcTime): @@ -108,6 +110,7 @@ func MaybeCallGCPeriodically() { } }() go func() { + // TODO ek – infinite loop; add shutdown/cleanup logic for { select { case <-time.After(memStatTime): diff --git a/internal/profiler/profiler.go b/internal/profiler/profiler.go index 619a73034..08f2987f1 100644 --- a/internal/profiler/profiler.go +++ b/internal/profiler/profiler.go @@ -43,6 +43,7 @@ func (profiler *Profiler) Config(shardID uint32, metricsReportURL string) { // LogMemory logs memory. func (profiler *Profiler) LogMemory() { + // TODO ek – infinite loop; add shutdown/cleanup logic for { // log mem usage info, _ := profiler.proc.MemoryInfo() @@ -63,6 +64,7 @@ func (profiler *Profiler) LogMemory() { // LogCPU logs CPU metrics. func (profiler *Profiler) LogCPU() { + // TODO ek – infinite loop; add shutdown/cleanup logic for { // log cpu usage percent, _ := profiler.proc.CPUPercent() diff --git a/msgq/msgq.go b/msgq/msgq.go new file mode 100644 index 000000000..a80ad1cf7 --- /dev/null +++ b/msgq/msgq.go @@ -0,0 +1,63 @@ +// Package msgq implements a simple, finite-sized message queue. It can be used +// as a building block for a message processor pool. +package msgq + +import ( + "github.com/libp2p/go-libp2p-core/peer" + "github.com/pkg/errors" +) + +type message struct { + content []byte + sender peer.ID +} + +// MessageAdder enqueues a received message for processing. It returns without +// blocking, and may return a queue overrun error. +type MessageAdder interface { + AddMessage(content []byte, sender peer.ID) error +} + +// MessageHandler is a message handler. +type MessageHandler interface { + HandleMessage(content []byte, sender peer.ID) +} + +// Queue is a finite-sized message queue. +type Queue struct { + ch chan message +} + +// New returns a new message queue of the given size. +func New(size int) *Queue { + return &Queue{ch: make(chan message, size)} +} + +// AddMessage enqueues a received message for processing. It returns without +// blocking, and may return a queue overrun error. +func (q *Queue) AddMessage(content []byte, sender peer.ID) error { + select { + case q.ch <- message{content, sender}: + default: + return ErrRxOverrun + } + return nil +} + +// HandleMessages dequeues and dispatches incoming messages using the given +// message handler, until the message queue is closed. This function can be +// spawned as a background goroutine, potentially multiple times for a pool. +func (q *Queue) HandleMessages(h MessageHandler) { + for msg := range q.ch { + h.HandleMessage(msg.content, msg.sender) + } +} + +// Close closes the given queue. +func (q *Queue) Close() error { + close(q.ch) + return nil +} + +// ErrRxOverrun signals that a receive queue has been overrun. +var ErrRxOverrun = errors.New("rx overrun") diff --git a/msgq/msgq_test.go b/msgq/msgq_test.go new file mode 100644 index 000000000..37bd11643 --- /dev/null +++ b/msgq/msgq_test.go @@ -0,0 +1,94 @@ +package msgq + +import ( + "fmt" + "testing" + + "github.com/libp2p/go-libp2p-core/peer" +) + +func TestNew(t *testing.T) { + tests := []struct { + name string + cap int + }{ + {"unbuffered", 0}, + {"buffered10", 10}, + {"buffered100", 100}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := New(tt.cap) + if cap(got.ch) != tt.cap { + t.Errorf("New() ch cap %d, want %d", cap(got.ch), tt.cap) + } + }) + } +} + +func TestQueue_AddMessage(t *testing.T) { + tests := []struct { + name string + cap int + }{ + {"unbuffered", 0}, + {"buffered", 100}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := &Queue{ch: make(chan message, tt.cap)} + for i := 0; i < tt.cap+10; i++ { + var wantErr error + if i >= tt.cap { + wantErr = ErrRxOverrun + } else { + wantErr = nil + } + err := q.AddMessage([]byte{}, peer.ID("")) + if err != wantErr { + t.Fatalf("AddMessage() iter %d, error = %v, want %v", + i, err, wantErr) + } + } + }) + } +} + +type testMessageHandler struct { + t *testing.T + seq int +} + +func (h *testMessageHandler) HandleMessage(content []byte, sender peer.ID) { + got, want := string(content), fmt.Sprint(h.seq) + if got != want { + h.t.Errorf("out-of-sequence message %v, want %v", got, want) + } + h.seq++ +} + +func TestQueue_HandleMessages(t *testing.T) { + ch := make(chan message, 500) + for seq := 0; seq < cap(ch); seq++ { + ch <- message{content: []byte(fmt.Sprint(seq))} + } + close(ch) + q := &Queue{ch: ch} + q.HandleMessages(&testMessageHandler{t: t}) +} + +func TestQueue_Close(t *testing.T) { + q := &Queue{ch: make(chan message, 100)} + err := q.Close() + if err != nil { + t.Errorf("Close() error = %v, want nil", err) + } + select { + case m, ok := <-q.ch: + if ok { + t.Errorf("unexpected message %v", m) + } + default: + t.Error("channel closed but not ready") + } +} diff --git a/node/node.go b/node/node.go index baeb247d5..d36170d61 100644 --- a/node/node.go +++ b/node/node.go @@ -20,6 +20,7 @@ import ( "github.com/harmony-one/harmony/contracts" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/core/values" "github.com/harmony-one/harmony/drand" "github.com/harmony-one/harmony/internal/chain" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" @@ -27,6 +28,7 @@ import ( "github.com/harmony-one/harmony/internal/params" "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/msgq" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" p2p_host "github.com/harmony-one/harmony/p2p/host" @@ -53,6 +55,18 @@ const ( TxPoolLimit = 20000 // NumTryBroadCast is the number of times trying to broadcast NumTryBroadCast = 3 + // ClientRxQueueSize is the number of client messages to queue before tail-dropping. + ClientRxQueueSize = 16384 + // ShardRxQueueSize is the number of shard messages to queue before tail-dropping. + ShardRxQueueSize = 16384 + // GlobalRxQueueSize is the number of global messages to queue before tail-dropping. + GlobalRxQueueSize = 16384 + // ClientRxWorkers is the number of concurrent client message handlers. + ClientRxWorkers = 8 + // ShardRxWorkers is the number of concurrent shard message handlers. + ShardRxWorkers = 32 + // GlobalRxWorkers is the number of concurrent global message handlers. + GlobalRxWorkers = 32 ) func (state State) String() string { @@ -145,6 +159,11 @@ type Node struct { // The p2p host used to send/receive p2p messages host p2p.Host + // Incoming messages to process. + clientRxQueue *msgq.Queue + shardRxQueue *msgq.Queue + globalRxQueue *msgq.Queue + // Service manager. serviceManager *service.Manager @@ -372,7 +391,8 @@ func (node *Node) getTransactionsForNewBlock( } selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase) - selectedStaking, unselectedStaking, invalidStaking := node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, pendingStakingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase) + selectedStaking, unselectedStaking, invalidStaking := + node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, pendingStakingTransactions, coinbase) node.pendingTransactions = make(map[common.Hash]*types.Transaction) for _, unselectedTx := range unselected { @@ -397,8 +417,30 @@ func (node *Node) getTransactionsForNewBlock( return selected, selectedStaking } +func (node *Node) startRxPipeline( + receiver p2p.GroupReceiver, queue *msgq.Queue, numWorkers int, +) { + // consumers + for i := 0; i < numWorkers; i++ { + go queue.HandleMessages(node) + } + // provider + go node.receiveGroupMessage(receiver, queue) +} + // StartServer starts a server and process the requests by a handler. func (node *Node) StartServer() { + // start the goroutine to receive client message + // client messages are sent by clients, like txgen, wallet + node.startRxPipeline(node.clientReceiver, node.clientRxQueue, ClientRxWorkers) + + // start the goroutine to receive group message + node.startRxPipeline(node.shardGroupReceiver, node.shardRxQueue, ShardRxWorkers) + + // start the goroutine to receive global message, used for cross-shard TX + // FIXME (leo): we use beacon client topic as the global topic for now + node.startRxPipeline(node.globalGroupReceiver, node.globalRxQueue, GlobalRxWorkers) + select {} } @@ -465,14 +507,14 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), blockchain) node.CxPool = core.NewCxPool(core.CxPoolSize) node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine) - if node.Blockchain().ShardID() != 0 { + + if node.Blockchain().ShardID() != values.BeaconChainShardID { node.BeaconWorker = worker.New(node.Beaconchain().Config(), beaconChain, chain.Engine) } node.pendingCXReceipts = make(map[string]*types.CXReceiptsProof) node.pendingTransactions = make(map[common.Hash]*types.Transaction) node.pendingStakingTransactions = make(map[common.Hash]*staking.StakingTransaction) - node.Consensus.VerifiedNewBlock = make(chan *types.Block) // the sequence number is the next block number to be added in consensus protocol, which is always one more than current chain header block node.Consensus.SetBlockNum(blockchain.CurrentBlock().NumberU64() + 1) @@ -486,20 +528,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc } else { node.AddContractKeyAndAddress(scFaucet) } - - //if node.Consensus.ShardID == 0 { - // // Contracts only exist in beacon chain - // if node.isFirstTime { - // // Setup one time smart contracts - // node.CurrentStakes = make(map[common.Address]*structs.StakeInfo) - // node.AddStakingContractToPendingTransactions() //This will save the latest information about staked nodes in current staked - // } else { - // node.AddContractKeyAndAddress(scStaking) - // } - //} - node.ContractCaller = contracts.NewContractCaller(node.Blockchain(), node.Blockchain().Config()) - // Create test keys. Genesis will later need this. var err error node.TestBankKeys, err = CreateTestBankKeys(TestAccountNumber) @@ -513,16 +542,9 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc Interface("genesis block header", node.Blockchain().GetHeaderByNumber(0)). Msg("Genesis block hash") - // start the goroutine to receive client message - // client messages are sent by clients, like txgen, wallet - go node.ReceiveClientGroupMessage() - - // start the goroutine to receive group message - go node.ReceiveGroupMessage() - - // start the goroutine to receive global message, used for cross-shard TX - // FIXME (leo): we use beacon client topic as the global topic for now - go node.ReceiveGlobalMessage() + node.clientRxQueue = msgq.New(ClientRxQueueSize) + node.shardRxQueue = msgq.New(ShardRxQueueSize) + node.globalRxQueue = msgq.New(GlobalRxQueueSize) // Setup initial state of syncing. node.peerRegistrationRecord = make(map[string]*syncConfig) diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index d996ab7c7..3b0acfa96 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -69,6 +69,7 @@ func (node *Node) BroadcastCXReceiptsWithShardID(block *types.Block, commitSig [ utils.Logger().Info().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof Found") groupID := nodeconfig.ShardID(toShardID) + // TODO ek – limit concurrency go node.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, block.Header(), commitSig, commitBitmap))) } @@ -243,7 +244,7 @@ func (node *Node) ProcessHeaderMessage(msgPayload []byte) { } func (node *Node) verifyIncomingReceipts(block *types.Block) error { - m := make(map[common.Hash]bool) + m := make(map[common.Hash]struct{}) cxps := block.IncomingReceipts() for _, cxp := range cxps { // double spent @@ -255,7 +256,7 @@ func (node *Node) verifyIncomingReceipts(block *types.Block) error { if _, ok := m[hash]; ok { return ctxerror.New("[verifyIncomingReceipts] Double Spent!") } - m[hash] = true + m[hash] = struct{}{} for _, item := range cxp.Receipts { if item.ToShardID != node.Blockchain().ShardID() { diff --git a/node/node_handler.go b/node/node_handler.go index 61eb590cc..b7663a2ea 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -14,6 +14,8 @@ import ( "github.com/ethereum/go-ethereum/rlp" pb "github.com/golang/protobuf/proto" "github.com/harmony-one/bls/ffi/go/bls" + libp2p_peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/harmony-one/harmony/api/proto" proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" "github.com/harmony-one/harmony/api/proto/message" @@ -24,11 +26,11 @@ import ( nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/msgq" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/shard" staking "github.com/harmony-one/harmony/staking/types" - libp2p_peer "github.com/libp2p/go-libp2p-peer" ) const ( @@ -36,71 +38,39 @@ const ( crossLinkBatchSize = 7 ) -// ReceiveGlobalMessage use libp2p pubsub mechanism to receive global broadcast messages -func (node *Node) ReceiveGlobalMessage() { - ctx := context.Background() - for { - if node.globalGroupReceiver == nil { - time.Sleep(100 * time.Millisecond) - continue - } - msg, sender, err := node.globalGroupReceiver.Receive(ctx) - if sender != node.host.GetID() { - //utils.Logger().Info("[PUBSUB]", "received global msg", len(msg), "sender", sender) - if err == nil { - // skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size - go node.messageHandler(msg[5:], sender) - } - } - } -} - -// ReceiveGroupMessage use libp2p pubsub mechanism to receive broadcast messages -func (node *Node) ReceiveGroupMessage() { +// receiveGroupMessage use libp2p pubsub mechanism to receive broadcast messages +func (node *Node) receiveGroupMessage( + receiver p2p.GroupReceiver, rxQueue msgq.MessageAdder, +) { ctx := context.Background() + // TODO ek – infinite loop; add shutdown/cleanup logic for { - if node.shardGroupReceiver == nil { - time.Sleep(100 * time.Millisecond) + msg, sender, err := receiver.Receive(ctx) + if err != nil { + utils.Logger().Warn().Err(err). + Msg("cannot receive from group") continue } - msg, sender, err := node.shardGroupReceiver.Receive(ctx) - if sender != node.host.GetID() { - //utils.Logger().Info("[PUBSUB]", "received group msg", len(msg), "sender", sender) - if err == nil { - // skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size - go node.messageHandler(msg[5:], sender) - } - } - } -} - -// ReceiveClientGroupMessage use libp2p pubsub mechanism to receive broadcast messages for client -func (node *Node) ReceiveClientGroupMessage() { - ctx := context.Background() - for { - if node.clientReceiver == nil { - // check less frequent on client messages - time.Sleep(100 * time.Millisecond) + if sender == node.host.GetID() { continue } - msg, sender, err := node.clientReceiver.Receive(ctx) - if sender != node.host.GetID() { - // utils.Logger().Info("[CLIENT]", "received group msg", len(msg), "sender", sender, "error", err) - if err == nil { - // skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size - go node.messageHandler(msg[5:], sender) - } + //utils.Logger().Info("[PUBSUB]", "received group msg", len(msg), "sender", sender) + // skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size + if err := rxQueue.AddMessage(msg[5:], sender); err != nil { + utils.Logger().Warn().Err(err). + Str("sender", sender.Pretty()). + Msg("cannot enqueue incoming message for processing") } } } -// messageHandler parses the message and dispatch the actions -func (node *Node) messageHandler(content []byte, sender libp2p_peer.ID) { +// HandleMessage parses the message and dispatch the actions. +func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) { msgCategory, err := proto.GetMessageCategory(content) if err != nil { utils.Logger().Error(). Err(err). - Msg("messageHandler get message category failed") + Msg("HandleMessage get message category failed") return } @@ -108,7 +78,7 @@ func (node *Node) messageHandler(content []byte, sender libp2p_peer.ID) { if err != nil { utils.Logger().Error(). Err(err). - Msg("messageHandler get message type failed") + Msg("HandleMessage get message type failed") return } @@ -116,7 +86,7 @@ func (node *Node) messageHandler(content []byte, sender libp2p_peer.ID) { if err != nil { utils.Logger().Error(). Err(err). - Msg("messageHandler get message payload failed") + Msg("HandleMessage get message payload failed") return } diff --git a/node/node_newblock.go b/node/node_newblock.go index b29fee9b2..711cb7225 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -151,8 +151,7 @@ func (node *Node) proposeBeaconShardState(block *types.Block) error { } nextEpoch := new(big.Int).Add(block.Header().Epoch(), common.Big1) // TODO: add logic for EPoS - shardState, err := core.CalculateNewShardState( - node.Blockchain(), nextEpoch, nil) + shardState, err := core.CalculateNewShardState(node.Blockchain(), nextEpoch) if err != nil { return err } diff --git a/node/node_resharding.go b/node/node_resharding.go index 8bab93dcf..fa9bc1e63 100644 --- a/node/node_resharding.go +++ b/node/node_resharding.go @@ -9,14 +9,11 @@ import ( "os/exec" "strconv" "syscall" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/bls/ffi/go/bls" - "github.com/harmony-one/harmony/contracts/structs" - - "time" - proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" @@ -28,7 +25,7 @@ import ( ) // validateNewShardState validate whether the new shard state root matches -func (node *Node) validateNewShardState(block *types.Block, stakeInfo *map[common.Address]*structs.StakeInfo) error { +func (node *Node) validateNewShardState(block *types.Block) error { // Common case first – blocks without resharding proposal header := block.Header() if header.ShardStateHash() == (common.Hash{}) { @@ -61,8 +58,7 @@ func (node *Node) validateNewShardState(block *types.Block, stakeInfo *map[commo // TODO ek – this may be called from regular shards, // for vetting beacon chain blocks received during block syncing. // DRand may or or may not get in the way. Test this out. - expected, err := core.CalculateNewShardState( - node.Blockchain(), nextEpoch, stakeInfo) + expected, err := core.CalculateNewShardState(node.Blockchain(), nextEpoch) if err != nil { return ctxerror.New("cannot calculate expected shard state"). WithCause(err) diff --git a/node/node_syncing.go b/node/node_syncing.go index 0fd1bedf1..5b94f5026 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -162,6 +162,7 @@ func (p *LocalSyncingPeerProvider) SyncingPeers(shardID uint32) (peers []p2p.Pee // DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks func (node *Node) DoBeaconSyncing() { go func(node *Node) { + // TODO ek – infinite loop; add shutdown/cleanup logic for { select { case beaconBlock := <-node.BeaconBlockChannel: @@ -170,6 +171,7 @@ func (node *Node) DoBeaconSyncing() { } }(node) + // TODO ek – infinite loop; add shutdown/cleanup logic for { if node.beaconSync == nil { utils.Logger().Info().Msg("initializing beacon sync") @@ -198,6 +200,7 @@ func (node *Node) DoBeaconSyncing() { // DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up func (node *Node) DoSyncing(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool) { + // TODO ek – infinite loop; add shutdown/cleanup logic SyncingLoop: for { if node.stateSync == nil { diff --git a/node/worker/worker.go b/node/worker/worker.go index f374fc504..b66005b7a 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -13,6 +13,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/core/values" "github.com/harmony-one/harmony/core/vm" shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" "github.com/harmony-one/harmony/internal/ctxerror" @@ -27,11 +28,12 @@ type environment struct { state *state.DB // apply state changes here gasPool *core.GasPool // available gas used to pack transactions - header *block.Header - txs []*types.Transaction - receipts []*types.Receipt - outcxs []*types.CXReceipt // cross shard transaction receipts (source shard) - incxs []*types.CXReceiptsProof // cross shard receipts and its proof (desitinatin shard) + header *block.Header + txs []*types.Transaction + stkingTxs staking.StakingTransactions + receipts []*types.Receipt + outcxs []*types.CXReceipt // cross shard transaction receipts (source shard) + incxs []*types.CXReceiptsProof // cross shard receipts and its proof (desitinatin shard) } // Worker is the main object which takes care of submitting new work to consensus engine @@ -146,12 +148,58 @@ func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Tra // SelectStakingTransactionsForNewBlock selects staking transactions for new block. func (w *Worker) SelectStakingTransactionsForNewBlock( newBlockNum uint64, txs staking.StakingTransactions, - recentTxsStats types.RecentTxsStats, - txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (staking.StakingTransactions, staking.StakingTransactions, staking.StakingTransactions) { - // TODO: implement staking transaction selection - t := staking.StakingTransactions{} - return t, t, t + + // only beaconchain process staking transaction + if w.chain.ShardID() != values.BeaconChainShardID { + return nil, nil, nil + } + + // staking transaction share the same gasPool with normal transactions + if w.current.gasPool == nil { + w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit()) + } + + selected := staking.StakingTransactions{} + unselected := staking.StakingTransactions{} + invalid := staking.StakingTransactions{} + for _, tx := range txs { + snap := w.current.state.Snapshot() + _, err := w.commitStakingTransaction(tx, coinbase) + if err != nil { + w.current.state.RevertToSnapshot(snap) + invalid = append(invalid, tx) + utils.Logger().Error().Err(err).Str("stakingTxId", tx.Hash().Hex()).Msg("Commit staking transaction error") + } else { + selected = append(selected, tx) + utils.Logger().Info().Str("stakingTxId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("StakingTransaction gas limit info") + } + } + + utils.Logger().Info().Uint64("newBlockNum", newBlockNum).Uint64("blockGasLimit", + w.current.header.GasLimit()).Uint64("blockGasUsed", + w.current.header.GasUsed()).Msg("[SelectStakingTransaction] Block gas limit and usage info") + + return selected, unselected, invalid + +} + +func (w *Worker) commitStakingTransaction(tx *staking.StakingTransaction, coinbase common.Address) ([]*types.Log, error) { + snap := w.current.state.Snapshot() + gasUsed := w.current.header.GasUsed() + receipt, _, err := + core.ApplyStakingTransaction(w.config, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &gasUsed, vm.Config{}) + w.current.header.SetGasUsed(gasUsed) + if err != nil { + w.current.state.RevertToSnapshot(snap) + return nil, err + } + if receipt == nil { + return nil, fmt.Errorf("nil staking receipt") + } + w.current.stkingTxs = append(w.current.stkingTxs, tx) + w.current.receipts = append(w.current.receipts, receipt) + return receipt.Logs, nil } func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) { @@ -348,7 +396,9 @@ func (w *Worker) FinalizeNewBlock(sig []byte, signers []byte, viewID uint64, coi s := w.current.state.Copy() copyHeader := types.CopyHeader(w.current.header) - block, err := w.engine.Finalize(w.chain, copyHeader, s, w.current.txs, w.current.receipts, w.current.outcxs, w.current.incxs) + block, err := w.engine.Finalize( + w.chain, copyHeader, s, w.current.txs, w.current.stkingTxs, w.current.receipts, w.current.outcxs, w.current.incxs, + ) if err != nil { return nil, ctxerror.New("cannot finalize block").WithCause(err) } diff --git a/shard/shard_state.go b/shard/shard_state.go index baa7959d1..d05cb1add 100644 --- a/shard/shard_state.go +++ b/shard/shard_state.go @@ -7,10 +7,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/bls/ffi/go/bls" - "golang.org/x/crypto/sha3" - common2 "github.com/harmony-one/harmony/internal/common" "github.com/harmony-one/harmony/internal/ctxerror" + "golang.org/x/crypto/sha3" ) var ( @@ -101,7 +100,7 @@ func CompareBlsPublicKey(k1, k2 BlsPublicKey) int { return bytes.Compare(k1[:], k2[:]) } -// NodeID represents node id (BLS address). +// NodeID represents node id (BLS address) type NodeID struct { EcdsaAddress common.Address `json:"ecdsa_address"` BlsPublicKey BlsPublicKey `json:"bls_pubkey"` diff --git a/staking/types/commission.go b/staking/types/commission.go index 3cd1c8c4d..355059282 100644 --- a/staking/types/commission.go +++ b/staking/types/commission.go @@ -2,6 +2,8 @@ package types import ( "math/big" + + "github.com/harmony-one/harmony/core/numeric" ) type ( @@ -15,14 +17,8 @@ type ( // CommissionRates defines the initial commission rates to be used for creating a // validator. CommissionRates struct { - Rate Dec `json:"rate" yaml:"rate"` // the commission rate charged to delegators, as a fraction - MaxRate Dec `json:"max_rate" yaml:"max_rate"` // maximum commission rate which validator can ever charge, as a fraction - MaxChangeRate Dec `json:"max_change_rate" yaml:"max_change_rate"` // maximum increase of the validator commission every epoch, as a fraction + Rate numeric.Dec `json:"rate" yaml:"rate"` // the commission rate charged to delegators, as a fraction + MaxRate numeric.Dec `json:"max_rate" yaml:"max_rate"` // maximum commission rate which validator can ever charge, as a fraction + MaxChangeRate numeric.Dec `json:"max_change_rate" yaml:"max_change_rate"` // maximum increase of the validator commission every epoch, as a fraction } ) - -// NewCommission returns a new commission object -func NewCommission(rate Dec, maxRate Dec, maxChangeRate Dec, height *big.Int) Commission { - commissionRates := CommissionRates{Rate: rate, MaxRate: maxRate, MaxChangeRate: maxChangeRate} - return Commission{CommissionRates: commissionRates, UpdateHeight: height} -} diff --git a/staking/types/messages.go b/staking/types/messages.go index 8b7e49937..8e6089022 100644 --- a/staking/types/messages.go +++ b/staking/types/messages.go @@ -3,6 +3,7 @@ package types import ( "math/big" + "github.com/harmony-one/harmony/core/numeric" "github.com/harmony-one/harmony/internal/common" "github.com/harmony-one/harmony/shard" "github.com/pkg/errors" @@ -38,7 +39,7 @@ func (d Directive) String() string { // NewValidator - type for creating a new validator type NewValidator struct { - Description `json:"ties" yaml:"ties"` + Description `json:"description" yaml:"description"` CommissionRates `json:"commission" yaml:"commission"` MinSelfDelegation *big.Int `json:"min_self_delegation" yaml:"min_self_delegation"` StakingAddress common.Address `json:"staking_address" yaml:"staking_address"` @@ -50,7 +51,7 @@ type NewValidator struct { type EditValidator struct { Description StakingAddress common.Address `json:"staking_address" yaml:"staking_address"` - CommissionRate Dec `json:"commission_rate" yaml:"commission_rate"` + CommissionRate numeric.Dec `json:"commission_rate" yaml:"commission_rate"` MinSelfDelegation *big.Int `json:"min_self_delegation" yaml:"min_self_delegation"` } diff --git a/staking/types/sign.go b/staking/types/sign.go index 8cc388f05..499db9868 100644 --- a/staking/types/sign.go +++ b/staking/types/sign.go @@ -24,7 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/values" "github.com/harmony-one/harmony/crypto/hash" ) @@ -111,7 +111,7 @@ var big8 = big.NewInt(8) // Sender returns the sender address of the given signer. func (s EIP155Signer) Sender(tx *StakingTransaction) (common.Address, error) { if tx.ChainID().Cmp(s.chainID) != 0 { - return common.Address{}, core.ErrInvalidChainID + return common.Address{}, values.ErrInvalidChainID } V := new(big.Int).Sub(tx.data.V, s.chainIDMul) V.Sub(V, big8) diff --git a/staking/types/transaction.go b/staking/types/transaction.go index 12099ae68..d934dee0c 100644 --- a/staking/types/transaction.go +++ b/staking/types/transaction.go @@ -86,6 +86,11 @@ func (tx *StakingTransaction) WithSignature(signer Signer, sig []byte) (*Staking return cpy, nil } +// Gas returns gas of StakingTransaction. +func (tx *StakingTransaction) Gas() uint64 { + return tx.data.GasLimit +} + // ChainID is what chain this staking transaction for func (tx *StakingTransaction) ChainID() *big.Int { return deriveChainID(tx.data.V) diff --git a/staking/types/validator.go b/staking/types/validator.go index 3fb1104ce..9a76d4f39 100644 --- a/staking/types/validator.go +++ b/staking/types/validator.go @@ -18,14 +18,22 @@ const ( // Validator - data fields for a validator type Validator struct { - Address common.Address `json:"address" yaml:"address"` // ECDSA address of the validator - ValidatingPubKey bls.PublicKey `json:"validating_pub_key" yaml:"validating_pub_key"` // The BLS public key of the validator for consensus - Description Description `json:"description" yaml:"description"` // description for the validator - Active bool `json:"active" yaml:"active"` // Is the validator active in the validating process or not - Stake *big.Int `json:"stake" yaml:"stake"` // The stake put by the validator itself - UnbondingHeight *big.Int `json:"unbonding_height" yaml:"unbonding_height"` // if unbonding, height at which this validator has begun unbonding - Commission Commission `json:"commission" yaml:"commission"` // commission parameters - MinSelfDelegation *big.Int `json:"min_self_delegation" yaml:"min_self_delegation"` // validator's self declared minimum self delegation + // ECDSA address of the validator + Address common.Address `json:"address" yaml:"address"` + // The BLS public key of the validator for consensus + ValidatingPubKey bls.PublicKey `json:"validating_pub_key" yaml:"validating_pub_key"` + // The stake put by the validator itself + Stake *big.Int `json:"stake" yaml:"stake"` + // if unbonding, height at which this validator has begun unbonding + UnbondingHeight *big.Int `json:"unbonding_height" yaml:"unbonding_height"` + // validator's self declared minimum self delegation + MinSelfDelegation *big.Int `json:"min_self_delegation" yaml:"min_self_delegation"` + // commission parameters + Commission `json:"commission" yaml:"commission"` + // description for the validator + Description `json:"description" yaml:"description"` + // Is the validator active in the validating process or not + Active bool `json:"active" yaml:"active"` } // Description - some possible IRL connections