Merge pull request #112 from harmony-one/rj_branch

[HAR-85] Delete bft.go and implement valid txs selection
pull/128/head
Rongjian Lan 6 years ago committed by GitHub
commit 683a5d471f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      client/txgen/main.go
  2. 1
      client/txgen/txgen/account_txs_generator.go
  3. 121
      consensus/bft.go
  4. 110
      consensus/consensus.go
  5. 4
      harmony/main.go
  6. 8
      node/node.go
  7. 10
      node/node_handler.go
  8. 13
      node/node_test.go
  9. 40
      node/worker/worker.go

@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"github.com/harmony-one/harmony/utils"
"os"
"path"
"runtime"
@ -77,15 +78,17 @@ func main() {
// Nodes containing utxopools to mirror the shards' data in the network
nodes := []*node.Node{}
clientPeer := config.GetClientPeer()
for shardID := range shardIDLeaderMap {
node := node.New(&consensus.Consensus{ShardID: shardID}, nil, p2p.Peer{})
_, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port)
clientPeer.PubKey = pubKey
node := node.New(&consensus.Consensus{ShardID: shardID}, nil, *clientPeer)
// Assign many fake addresses so we have enough address to play with at first
node.AddTestingAddresses(setting.NumOfAddress)
nodes = append(nodes, node)
}
// Client/txgenerator server node setup
clientPeer := config.GetClientPeer()
consensusObj := consensus.New(*clientPeer, "0", nil, p2p.Peer{})
clientNode := node.New(consensusObj, nil, *clientPeer)

@ -27,6 +27,7 @@ func GenerateSimulatedTransactionsAccount(shardID int, dataNodes []*node.Node, s
for j := 0; j < 10; j++ {
randomUserKey, _ := crypto.GenerateKey()
randomUserAddress := crypto.PubkeyToAddress(randomUserKey.PublicKey)
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(j), randomUserAddress, uint32(shardID), big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i])
txs[i*10+j] = tx
}

@ -1,121 +0,0 @@
package consensus
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
)
// Bft is the struct for Bft protocol.
type Bft struct {
}
// NewFaker returns Bft.
func NewFaker() *Bft {
return &Bft{}
}
// Author implements Engine, returning the header's coinbase as the
// proof-of-work verified author of the block.
func (bft *Bft) Author(header *types.Header) (common.Address, error) {
return header.Coinbase, nil
}
// VerifyHeader checks whether a header conforms to the consensus rules of the
// stock Ethereum bft engine.
func (bft *Bft) VerifyHeader(chain ChainReader, header *types.Header, seal bool) error {
return nil
}
// VerifyHeaders is similar to VerifyHeader, but verifies a batch of headers
// concurrently. The method returns a quit channel to abort the operations and
// a results channel to retrieve the async verifications.
func (bft *Bft) VerifyHeaders(chain ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) {
abort, results := make(chan struct{}), make(chan error, len(headers))
for i := 0; i < len(headers); i++ {
results <- nil
}
return abort, results
}
func (bft *Bft) verifyHeaderWorker(chain ChainReader, headers []*types.Header, seals []bool, index int) error {
var parent *types.Header
if index == 0 {
parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1)
} else if headers[index-1].Hash() == headers[index].ParentHash {
parent = headers[index-1]
}
if parent == nil {
return ErrUnknownAncestor
}
if chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()) != nil {
return nil // known block
}
return bft.verifyHeader(chain, headers[index], parent, false, seals[index])
}
// verifyHeader checks whether a header conforms to the consensus rules of the
// stock Ethereum bft engine.
// See YP section 4.3.4. "Block Header Validity"
func (bft *Bft) verifyHeader(chain ChainReader, header, parent *types.Header, uncle bool, seal bool) error {
return nil
}
// VerifySeal implements consensus.Engine, checking whether the given block satisfies
// the PoW difficulty requirements.
func (bft *Bft) VerifySeal(chain ChainReader, header *types.Header) error {
return nil
}
// Prepare implements consensus.Engine, initializing the difficulty field of a
// header to conform to the ethash protocol. The changes are done inline.
func (bft *Bft) Prepare(chain ChainReader, header *types.Header) error {
return nil
}
// Finalize implements consensus.Engine, accumulating the block and uncle rewards,
// setting the final state and assembling the block.
func (bft *Bft) Finalize(chain ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, receipts []*types.Receipt) (*types.Block, error) {
// Accumulate any block and uncle rewards and commit the final state root
// Header seems complete, assemble into a block and return
accumulateRewards(chain.Config(), state, header)
header.Root = state.IntermediateRoot(false)
return types.NewBlock(header, txs, receipts), nil
}
// SealHash returns the hash of a block prior to it being sealed.
func (bft *Bft) SealHash(header *types.Header) (hash common.Hash) {
hasher := sha3.NewKeccak256()
rlp.Encode(hasher, []interface{}{
header.ParentHash,
header.Coinbase,
header.Root,
header.TxHash,
header.ReceiptHash,
header.Bloom,
header.Difficulty,
header.Number,
header.GasLimit,
header.GasUsed,
header.Time,
header.Extra,
})
hasher.Sum(hash[:0])
return hash
}
// Seal ...
func (bft *Bft) Seal(chain ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
return nil
}
// AccumulateRewards credits the coinbase of the given block with the mining
// reward. The total reward consists of the static block reward and rewards for
// included uncles. The coinbase of each uncle block is also rewarded.
func accumulateRewards(config *params.ChainConfig, state *state.StateDB, header *types.Header) {
}

@ -8,7 +8,13 @@ import (
"github.com/dedis/kyber"
"github.com/dedis/kyber/sign/schnorr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/blockchain"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/log"
@ -293,6 +299,110 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys []kyber.Point) int {
return len(consensus.PublicKeys)
}
func NewFaker() *Consensus {
return &Consensus{}
}
// Author implements Engine, returning the header's coinbase as the
// proof-of-work verified author of the block.
func (consensus *Consensus) Author(header *types.Header) (common.Address, error) {
return header.Coinbase, nil
}
// VerifyHeader checks whether a header conforms to the consensus rules of the
// stock bft engine.
func (consensus *Consensus) VerifyHeader(chain ChainReader, header *types.Header, seal bool) error {
return nil
}
// VerifyHeaders is similar to VerifyHeader, but verifies a batch of headers
// concurrently. The method returns a quit channel to abort the operations and
// a results channel to retrieve the async verifications.
func (consensus *Consensus) VerifyHeaders(chain ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) {
abort, results := make(chan struct{}), make(chan error, len(headers))
for i := 0; i < len(headers); i++ {
results <- nil
}
return abort, results
}
func (consensus *Consensus) verifyHeaderWorker(chain ChainReader, headers []*types.Header, seals []bool, index int) error {
var parent *types.Header
if index == 0 {
parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1)
} else if headers[index-1].Hash() == headers[index].ParentHash {
parent = headers[index-1]
}
if parent == nil {
return ErrUnknownAncestor
}
if chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()) != nil {
return nil // known block
}
return consensus.verifyHeader(chain, headers[index], parent, false, seals[index])
}
// verifyHeader checks whether a header conforms to the consensus rules of the
// stock bft engine.
func (consensus *Consensus) verifyHeader(chain ChainReader, header, parent *types.Header, uncle bool, seal bool) error {
return nil
}
// VerifySeal implements consensus.Engine, checking whether the given block satisfies
// the PoW difficulty requirements.
func (consensus *Consensus) VerifySeal(chain ChainReader, header *types.Header) error {
return nil
}
// Finalize implements consensus.Engine, accumulating the block and uncle rewards,
// setting the final state and assembling the block.
func (consensus *Consensus) Finalize(chain ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, receipts []*types.Receipt) (*types.Block, error) {
// Accumulate any block and uncle rewards and commit the final state root
// Header seems complete, assemble into a block and return
accumulateRewards(chain.Config(), state, header)
header.Root = state.IntermediateRoot(false)
return types.NewBlock(header, txs, receipts), nil
}
// SealHash returns the hash of a block prior to it being sealed.
func (consensus *Consensus) SealHash(header *types.Header) (hash common.Hash) {
hasher := sha3.NewKeccak256()
rlp.Encode(hasher, []interface{}{
header.ParentHash,
header.Coinbase,
header.Root,
header.TxHash,
header.ReceiptHash,
header.Bloom,
header.Difficulty,
header.Number,
header.GasLimit,
header.GasUsed,
header.Time,
header.Extra,
})
hasher.Sum(hash[:0])
return hash
}
func (consensus *Consensus) Seal(chain ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
// TODO: implement final block sealing
return nil
}
func (consensus *Consensus) Prepare(chain ChainReader, header *types.Header) error {
// TODO: implement prepare method
return nil
}
// AccumulateRewards credits the coinbase of the given block with the mining
// reward. The total reward consists of the static block reward and rewards for
// included uncles. The coinbase of each uncle block is also rewarded.
func accumulateRewards(config *params.ChainConfig, state *state.StateDB, header *types.Header) {
// TODO: implement mining rewards
}
// GetNodeID returns the nodeID
func (consensus *Consensus) GetNodeID() uint16 {
return consensus.nodeID

@ -84,7 +84,7 @@ func main() {
}
txs := make([]*types.Transaction, 100)
worker := worker.New(params.TestChainConfig, chain, consensus.NewFaker())
worker := worker.New(params.TestChainConfig, chain, consensus.NewFaker(), crypto.PubkeyToAddress(testBankKey.PublicKey))
nonce := worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(testBankKey.PublicKey))
for i := range txs {
randomUserKey, _ := crypto.GenerateKey()
@ -93,5 +93,5 @@ func main() {
txs[i] = tx
}
worker.CommitTransactions(txs, crypto.PubkeyToAddress(testBankKey.PublicKey))
worker.CommitTransactions(txs)
}

@ -141,14 +141,14 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Trans
// Note the pending transaction list will then contain the rest of the txs
func (node *Node) getTransactionsForNewBlockAccount(maxNumTxs int) (types.Transactions, []*blockchain.CrossShardTxAndProof) {
node.pendingTxMutexAccount.Lock()
selected, unselected, invalid, crossShardTxs := node.pendingTransactionsAccount, types.Transactions{}, types.Transactions{}, []*blockchain.CrossShardTxAndProof{}
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(node.pendingTransactionsAccount, maxNumTxs)
_ = invalid // invalid txs are discard
node.log.Debug("Invalid transactions discarded", "number", len(invalid))
node.pendingTransactionsAccount = unselected
node.log.Debug("Remaining pending transactions", "number", len(node.pendingTransactionsAccount))
node.pendingTxMutexAccount.Unlock()
return selected, crossShardTxs //TODO: replace cross-shard proofs for account model
return selected, nil //TODO: replace cross-shard proofs for account model
}
// StartServer starts a server and process the request by a handler.
@ -305,12 +305,12 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node
}
_ = gspec.MustCommit(database)
chain, _ := core.NewBlockChain(database, nil, gspec.Config, bft.NewFaker(), vm.Config{}, nil)
chain, _ := core.NewBlockChain(database, nil, gspec.Config, node.Consensus, vm.Config{}, nil)
node.Chain = chain
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain)
node.BlockChannelAccount = make(chan *types.Block)
node.Worker = worker.New(params.TestChainConfig, chain, bft.NewFaker())
node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(selfPeer.PubKey))
}
node.SelfPeer = selfPeer

@ -340,12 +340,7 @@ func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) {
if len(node.pendingTransactionsAccount) >= 1000 {
// Normal tx block consensus
selectedTxs, _ := node.getTransactionsForNewBlockAccount(MaxNumberOfTransactionsPerBlock)
err := node.Worker.UpdateCurrent()
if err != nil {
node.log.Debug("Failed updating worker's state", "Error", err)
}
err = node.Worker.CommitTransactions(selectedTxs, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey))
if err == nil {
node.Worker.CommitTransactions(selectedTxs)
block, err := node.Worker.Commit()
if err != nil {
node.log.Debug("Failed commiting new block", "Error", err)
@ -353,9 +348,6 @@ func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) {
newBlock = block
break
}
} else {
node.log.Debug("Failed to create new block", "Error", err)
}
}
// If not enough transactions to run Consensus,
// periodically check whether we have enough transactions to package into block.

@ -2,6 +2,7 @@ package node
import (
"fmt"
"github.com/harmony-one/harmony/utils"
"os"
"testing"
"time"
@ -16,7 +17,8 @@ import (
)
func TestNewNewNode(test *testing.T) {
leader := p2p.Peer{IP: "1", Port: "2"}
_, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey}
validator := p2p.Peer{IP: "3", Port: "5"}
consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader)
@ -43,7 +45,8 @@ func TestNewNewNode(test *testing.T) {
}
func TestCountNumTransactionsInBlockchain(test *testing.T) {
leader := p2p.Peer{IP: "1", Port: "2"}
_, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey}
validator := p2p.Peer{IP: "3", Port: "5"}
consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader)
@ -77,7 +80,8 @@ func TestAddPeers(test *testing.T) {
ValidatorID: 2,
},
}
leader := p2p.Peer{IP: "1", Port: "2"}
_, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey}
validator := p2p.Peer{IP: "3", Port: "5"}
consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader)
@ -148,7 +152,8 @@ func exitServer() {
}
func TestPingPongHandler(test *testing.T) {
leader := p2p.Peer{IP: "127.0.0.1", Port: "8881"}
_, pubKey := utils.GenKey("127.0.0.1", "8881")
leader := p2p.Peer{IP: "127.0.0.1", Port: "8881", PubKey: pubKey}
// validator := p2p.Peer{IP: "127.0.0.1", Port: "9991"}
consensus := consensus.New(leader, "0", []p2p.Peer{leader}, leader)
node := New(consensus, nil, leader)

@ -1,6 +1,7 @@
package worker
import (
"github.com/harmony-one/harmony/log"
"math/big"
"time"
@ -30,12 +31,41 @@ type Worker struct {
chain *core.BlockChain
current *environment // An environment for current running cycle.
coinbase common.Address
engine consensus.Engine
gasFloor uint64
gasCeil uint64
}
func (w *Worker) SelectTransactionsForNewBlock(txs types.Transactions, maxNumTxs int) (types.Transactions, types.Transactions, types.Transactions) {
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
}
selected := types.Transactions{}
unselected := types.Transactions{}
invalid := types.Transactions{}
for _, tx := range txs {
snap := w.current.state.Snapshot()
_, err := w.commitTransaction(tx, w.coinbase)
if len(selected) > maxNumTxs {
unselected = append(unselected, tx)
} else {
if err != nil {
w.current.state.RevertToSnapshot(snap)
invalid = append(invalid, tx)
} else {
selected = append(selected, tx)
}
}
}
err := w.UpdateCurrent()
if err != nil {
log.Debug("Failed updating worker's state", "Error", err)
}
return selected, unselected, invalid
}
func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
snap := w.current.state.Snapshot()
@ -51,14 +81,13 @@ func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Addres
}
// CommitTransactions commits transactions.
func (w *Worker) CommitTransactions(txs []*types.Transaction, coinbase common.Address) error {
snap := w.current.state.Snapshot()
func (w *Worker) CommitTransactions(txs types.Transactions) error {
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
}
for _, tx := range txs {
_, err := w.commitTransaction(tx, coinbase)
snap := w.current.state.Snapshot()
_, err := w.commitTransaction(tx, w.coinbase)
if err != nil {
w.current.state.RevertToSnapshot(snap)
return err
@ -114,7 +143,7 @@ func (w *Worker) Commit() (*types.Block, error) {
}
// New ...
func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.Engine) *Worker {
func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.Engine, coinbase common.Address) *Worker {
worker := &Worker{
config: config,
chain: chain,
@ -122,6 +151,7 @@ func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.En
}
worker.gasFloor = 0
worker.gasCeil = 1000000000000000
worker.coinbase = coinbase
parent := worker.chain.CurrentBlock()
num := parent.Number()

Loading…
Cancel
Save